|
|
|
|
|
|
import logging |
|
|
|
from typing import Dict, NamedTuple, List, Any, Optional, Callable, Set |
|
|
|
import cloudpickle |
|
|
|
|
|
|
|
|
|
|
) |
|
|
|
from mlagents.envs.brain import AllBrainInfo, BrainParameters |
|
|
|
from mlagents.envs.action_info import ActionInfo |
|
|
|
|
|
|
|
logger = logging.getLogger("mlagents.envs") |
|
|
|
|
|
|
|
|
|
|
|
class EnvironmentCommand(NamedTuple): |
|
|
|
|
|
|
try: |
|
|
|
self.conn.send(EnvironmentCommand("close")) |
|
|
|
except (BrokenPipeError, EOFError): |
|
|
|
logger.debug( |
|
|
|
f"UnityEnvWorker {self.worker_id} got exception trying to close." |
|
|
|
) |
|
|
|
logger.debug(f"UnityEnvWorker {self.worker_id} joining process.") |
|
|
|
self.process.join() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("UnityEnvironment worker: environment stopping.") |
|
|
|
step_queue.put(EnvironmentResponse("env_close", worker_id, None)) |
|
|
|
finally: |
|
|
|
# If this worker has put an item in the step queue that hasn't been processed by the EnvManager, the process |
|
|
|
# will hang until the item is processed. We avoid this behavior by using Queue.cancel_join_thread() |
|
|
|
# See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.cancel_join_thread for |
|
|
|
# more info. |
|
|
|
logger.debug(f"Worker {worker_id} closing.") |
|
|
|
step_queue.cancel_join_thread() |
|
|
|
logger.debug(f"Worker {worker_id} done.") |
|
|
|
|
|
|
|
|
|
|
|
class SubprocessEnvManager(EnvManager): |
|
|
|
|
|
|
return self.env_workers[0].recv().payload |
|
|
|
|
|
|
|
def close(self) -> None: |
|
|
|
logger.debug(f"SubprocessEnvManager closing.") |
|
|
|
self.step_queue.close() |
|
|
|
self.step_queue.join_thread() |
|
|
|
for env_worker in self.env_workers: |
|
|
|