|
|
|
|
|
|
from typing import Dict, NamedTuple, List, Any, Optional, Callable, Set |
|
|
|
import cloudpickle |
|
|
|
import enum |
|
|
|
import time |
|
|
|
|
|
|
|
from mlagents_envs.environment import UnityEnvironment |
|
|
|
from mlagents_envs.exception import ( |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging_util.get_logger(__name__) |
|
|
|
WORKER_SHUTDOWN_TIMEOUT_S = 10 |
|
|
|
|
|
|
|
|
|
|
|
class EnvironmentCommand(enum.Enum): |
|
|
|
|
|
|
RESET = 4 |
|
|
|
CLOSE = 5 |
|
|
|
ENV_EXITED = 6 |
|
|
|
CLOSED = 7 |
|
|
|
|
|
|
|
|
|
|
|
class EnvironmentRequest(NamedTuple): |
|
|
|
|
|
|
self.previous_step: EnvironmentStep = EnvironmentStep.empty(worker_id) |
|
|
|
self.previous_all_action_info: Dict[str, ActionInfo] = {} |
|
|
|
self.waiting = False |
|
|
|
self.closed = False |
|
|
|
|
|
|
|
def send(self, cmd: EnvironmentCommand, payload: Any = None) -> None: |
|
|
|
try: |
|
|
|
|
|
|
except (BrokenPipeError, EOFError): |
|
|
|
raise UnityCommunicationException("UnityEnvironment worker: recv failed.") |
|
|
|
|
|
|
|
def close(self): |
|
|
|
def request_close(self): |
|
|
|
try: |
|
|
|
self.conn.send(EnvironmentRequest(EnvironmentCommand.CLOSE)) |
|
|
|
except (BrokenPipeError, EOFError): |
|
|
|
|
|
|
pass |
|
|
|
logger.debug(f"UnityEnvWorker {self.worker_id} joining process.") |
|
|
|
self.process.join() |
|
|
|
|
|
|
|
|
|
|
|
def worker( |
|
|
|
|
|
|
EnvironmentResponse(EnvironmentCommand.ENV_EXITED, worker_id, ex) |
|
|
|
) |
|
|
|
_send_response(EnvironmentCommand.ENV_EXITED, ex) |
|
|
|
except Exception as ex: |
|
|
|
logger.error( |
|
|
|
f"UnityEnvironment worker {worker_id}: environment raised an unexpected exception." |
|
|
|
) |
|
|
|
step_queue.put( |
|
|
|
EnvironmentResponse(EnvironmentCommand.ENV_EXITED, worker_id, ex) |
|
|
|
) |
|
|
|
_send_response(EnvironmentCommand.ENV_EXITED, ex) |
|
|
|
# If this worker has put an item in the step queue that hasn't been processed by the EnvManager, the process |
|
|
|
# will hang until the item is processed. We avoid this behavior by using Queue.cancel_join_thread() |
|
|
|
# See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.cancel_join_thread for |
|
|
|
# more info. |
|
|
|
step_queue.cancel_join_thread() |
|
|
|
step_queue.close() |
|
|
|
parent_conn.close() |
|
|
|
step_queue.put(EnvironmentResponse(EnvironmentCommand.CLOSED, worker_id, None)) |
|
|
|
step_queue.close() |
|
|
|
|
|
|
|
|
|
|
|
class SubprocessEnvManager(EnvManager): |
|
|
|
|
|
|
super().__init__() |
|
|
|
self.env_workers: List[UnityEnvWorker] = [] |
|
|
|
self.step_queue: Queue = Queue() |
|
|
|
self.workers_alive = 0 |
|
|
|
for worker_idx in range(n_env): |
|
|
|
self.env_workers.append( |
|
|
|
self.create_worker( |
|
|
|
|
|
|
self.workers_alive += 1 |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def create_worker( |
|
|
|
|
|
|
|
|
|
|
def close(self) -> None: |
|
|
|
logger.debug("SubprocessEnvManager closing.") |
|
|
|
for env_worker in self.env_workers: |
|
|
|
env_worker.request_close() |
|
|
|
# Pull messages out of the queue until every worker has CLOSED or we time out. |
|
|
|
deadline = time.time() + WORKER_SHUTDOWN_TIMEOUT_S |
|
|
|
while self.workers_alive > 0 and time.time() < deadline: |
|
|
|
try: |
|
|
|
step: EnvironmentResponse = self.step_queue.get_nowait() |
|
|
|
env_worker = self.env_workers[step.worker_id] |
|
|
|
if step.cmd == EnvironmentCommand.CLOSED and not env_worker.closed: |
|
|
|
env_worker.closed = True |
|
|
|
self.workers_alive -= 1 |
|
|
|
# Discard all other messages. |
|
|
|
except EmptyQueueException: |
|
|
|
pass |
|
|
|
# Sanity check to kill zombie workers and report an issue if they occur. |
|
|
|
if self.workers_alive > 0: |
|
|
|
logger.error("SubprocessEnvManager had workers that didn't signal shutdown") |
|
|
|
for env_worker in self.env_workers: |
|
|
|
if not env_worker.closed and env_worker.process.is_alive(): |
|
|
|
env_worker.process.terminate() |
|
|
|
logger.error( |
|
|
|
"A SubprocessEnvManager worker did not shut down correctly so it was forcefully terminated." |
|
|
|
) |
|
|
|
for env_worker in self.env_workers: |
|
|
|
env_worker.close() |
|
|
|
|
|
|
|
def _postprocess_steps( |
|
|
|
self, env_steps: List[EnvironmentResponse] |
|
|
|