Unity 机器学习代理工具包 (ML-Agents) 是一个开源项目,它使游戏和模拟能够作为训练智能代理的环境。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 
 

315 行
12 KiB

import logging
from typing import Dict, NamedTuple, List, Any, Optional, Callable, Set
import cloudpickle
from mlagents.envs.environment import UnityEnvironment
from mlagents.envs.exception import UnityCommunicationException, UnityTimeOutException
from multiprocessing import Process, Pipe, Queue
from multiprocessing.connection import Connection
from queue import Empty as EmptyQueueException
from mlagents.envs.base_env import BaseEnv
from mlagents.envs.env_manager import EnvManager, EnvironmentStep
from mlagents.envs.timers import (
TimerNode,
timed,
hierarchical_timer,
reset_timers,
get_timer_root,
)
from mlagents.envs.brain import AllBrainInfo, BrainParameters
from mlagents.envs.action_info import ActionInfo
from mlagents.envs.side_channel.float_properties_channel import FloatPropertiesChannel
from mlagents.envs.side_channel.engine_configuration_channel import (
EngineConfigurationChannel,
EngineConfig,
)
from mlagents.envs.side_channel.side_channel import SideChannel
from mlagents.envs.brain_conversion_utils import (
step_result_to_brain_info,
group_spec_to_brain_parameters,
)
logger = logging.getLogger("mlagents.envs")
class EnvironmentCommand(NamedTuple):
name: str
payload: Any = None
class EnvironmentResponse(NamedTuple):
name: str
worker_id: int
payload: Any
class StepResponse(NamedTuple):
all_brain_info: AllBrainInfo
timer_root: Optional[TimerNode]
class UnityEnvWorker:
def __init__(self, process: Process, worker_id: int, conn: Connection):
self.process = process
self.worker_id = worker_id
self.conn = conn
self.previous_step: EnvironmentStep = EnvironmentStep(None, {}, None)
self.previous_all_action_info: Dict[str, ActionInfo] = {}
self.waiting = False
def send(self, name: str, payload: Any = None) -> None:
try:
cmd = EnvironmentCommand(name, payload)
self.conn.send(cmd)
except (BrokenPipeError, EOFError):
raise UnityCommunicationException("UnityEnvironment worker: send failed.")
def recv(self) -> EnvironmentResponse:
try:
response: EnvironmentResponse = self.conn.recv()
return response
except (BrokenPipeError, EOFError):
raise UnityCommunicationException("UnityEnvironment worker: recv failed.")
def close(self):
try:
self.conn.send(EnvironmentCommand("close"))
except (BrokenPipeError, EOFError):
logger.debug(
f"UnityEnvWorker {self.worker_id} got exception trying to close."
)
pass
logger.debug(f"UnityEnvWorker {self.worker_id} joining process.")
self.process.join()
def worker(
parent_conn: Connection,
step_queue: Queue,
pickled_env_factory: str,
worker_id: int,
engine_configuration: EngineConfig,
) -> None:
env_factory: Callable[
[int, List[SideChannel]], UnityEnvironment
] = cloudpickle.loads(pickled_env_factory)
shared_float_properties = FloatPropertiesChannel()
engine_configuration_channel = EngineConfigurationChannel()
engine_configuration_channel.set_configuration(engine_configuration)
env: BaseEnv = env_factory(
worker_id, [shared_float_properties, engine_configuration_channel]
)
def _send_response(cmd_name, payload):
parent_conn.send(EnvironmentResponse(cmd_name, worker_id, payload))
def _generate_all_brain_info() -> AllBrainInfo:
all_brain_info = {}
for brain_name in env.get_agent_groups():
all_brain_info[brain_name] = step_result_to_brain_info(
env.get_step_result(brain_name),
env.get_agent_group_spec(brain_name),
worker_id,
)
return all_brain_info
def external_brains():
result = {}
for brain_name in env.get_agent_groups():
result[brain_name] = group_spec_to_brain_parameters(
brain_name, env.get_agent_group_spec(brain_name)
)
return result
try:
while True:
cmd: EnvironmentCommand = parent_conn.recv()
if cmd.name == "step":
all_action_info = cmd.payload
for brain_name, action_info in all_action_info.items():
if len(action_info.action) != 0:
env.set_actions(brain_name, action_info.action)
env.step()
all_brain_info = _generate_all_brain_info()
# The timers in this process are independent from all the processes and the "main" process
# So after we send back the root timer, we can safely clear them.
# Note that we could randomly return timers a fraction of the time if we wanted to reduce
# the data transferred.
# TODO get gauges from the workers and merge them in the main process too.
step_response = StepResponse(all_brain_info, get_timer_root())
step_queue.put(EnvironmentResponse("step", worker_id, step_response))
reset_timers()
elif cmd.name == "external_brains":
_send_response("external_brains", external_brains())
elif cmd.name == "get_properties":
reset_params = {}
for k in shared_float_properties.list_properties():
reset_params[k] = shared_float_properties.get_property(k)
_send_response("get_properties", reset_params)
elif cmd.name == "reset":
for k, v in cmd.payload.items():
shared_float_properties.set_property(k, v)
env.reset()
all_brain_info = _generate_all_brain_info()
_send_response("reset", all_brain_info)
elif cmd.name == "close":
break
except (KeyboardInterrupt, UnityCommunicationException, UnityTimeOutException):
logger.info(f"UnityEnvironment worker {worker_id}: environment stopping.")
step_queue.put(EnvironmentResponse("env_close", worker_id, None))
finally:
# If this worker has put an item in the step queue that hasn't been processed by the EnvManager, the process
# will hang until the item is processed. We avoid this behavior by using Queue.cancel_join_thread()
# See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.cancel_join_thread for
# more info.
logger.debug(f"UnityEnvironment worker {worker_id} closing.")
step_queue.cancel_join_thread()
step_queue.close()
env.close()
logger.debug(f"UnityEnvironment worker {worker_id} done.")
class SubprocessEnvManager(EnvManager):
def __init__(
self,
env_factory: Callable[[int, List[SideChannel]], BaseEnv],
engine_configuration: EngineConfig,
n_env: int = 1,
):
super().__init__()
self.env_workers: List[UnityEnvWorker] = []
self.step_queue: Queue = Queue()
for worker_idx in range(n_env):
self.env_workers.append(
self.create_worker(
worker_idx, self.step_queue, env_factory, engine_configuration
)
)
@staticmethod
def create_worker(
worker_id: int,
step_queue: Queue,
env_factory: Callable[[int, List[SideChannel]], BaseEnv],
engine_configuration: EngineConfig,
) -> UnityEnvWorker:
parent_conn, child_conn = Pipe()
# Need to use cloudpickle for the env factory function since function objects aren't picklable
# on Windows as of Python 3.6.
pickled_env_factory = cloudpickle.dumps(env_factory)
child_process = Process(
target=worker,
args=(
child_conn,
step_queue,
pickled_env_factory,
worker_id,
engine_configuration,
),
)
child_process.start()
return UnityEnvWorker(child_process, worker_id, parent_conn)
def _queue_steps(self) -> None:
for env_worker in self.env_workers:
if not env_worker.waiting:
env_action_info = self._take_step(env_worker.previous_step)
env_worker.previous_all_action_info = env_action_info
env_worker.send("step", env_action_info)
env_worker.waiting = True
def step(self) -> List[EnvironmentStep]:
# Queue steps for any workers which aren't in the "waiting" state.
self._queue_steps()
worker_steps: List[EnvironmentResponse] = []
step_workers: Set[int] = set()
# Poll the step queue for completed steps from environment workers until we retrieve
# 1 or more, which we will then return as StepInfos
while len(worker_steps) < 1:
try:
while True:
step = self.step_queue.get_nowait()
if step.name == "env_close":
raise UnityCommunicationException(
"At least one of the environments has closed."
)
self.env_workers[step.worker_id].waiting = False
if step.worker_id not in step_workers:
worker_steps.append(step)
step_workers.add(step.worker_id)
except EmptyQueueException:
pass
step_infos = self._postprocess_steps(worker_steps)
return step_infos
def reset(self, config: Optional[Dict] = None) -> List[EnvironmentStep]:
while any(ew.waiting for ew in self.env_workers):
if not self.step_queue.empty():
step = self.step_queue.get_nowait()
self.env_workers[step.worker_id].waiting = False
# First enqueue reset commands for all workers so that they reset in parallel
for ew in self.env_workers:
ew.send("reset", config)
# Next (synchronously) collect the reset observations from each worker in sequence
for ew in self.env_workers:
ew.previous_step = EnvironmentStep(None, ew.recv().payload, None)
return list(map(lambda ew: ew.previous_step, self.env_workers))
@property
def external_brains(self) -> Dict[str, BrainParameters]:
self.env_workers[0].send("external_brains")
return self.env_workers[0].recv().payload
@property
def get_properties(self) -> Dict[str, float]:
self.env_workers[0].send("get_properties")
return self.env_workers[0].recv().payload
def close(self) -> None:
logger.debug(f"SubprocessEnvManager closing.")
self.step_queue.close()
self.step_queue.join_thread()
for env_worker in self.env_workers:
env_worker.close()
def _postprocess_steps(
self, env_steps: List[EnvironmentResponse]
) -> List[EnvironmentStep]:
step_infos = []
timer_nodes = []
for step in env_steps:
payload: StepResponse = step.payload
env_worker = self.env_workers[step.worker_id]
new_step = EnvironmentStep(
env_worker.previous_step.current_all_brain_info,
payload.all_brain_info,
env_worker.previous_all_action_info,
)
step_infos.append(new_step)
env_worker.previous_step = new_step
if payload.timer_root:
timer_nodes.append(payload.timer_root)
if timer_nodes:
with hierarchical_timer("workers") as main_timer_node:
for worker_timer_node in timer_nodes:
main_timer_node.merge(
worker_timer_node, root_name="worker_root", is_parallel=True
)
return step_infos
@timed
def _take_step(self, last_step: EnvironmentStep) -> Dict[str, ActionInfo]:
all_action_info: Dict[str, ActionInfo] = {}
for brain_name, brain_info in last_step.current_all_brain_info.items():
if brain_name in self.policies:
all_action_info[brain_name] = self.policies[brain_name].get_action(
brain_info
)
return all_action_info