您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
167 行
6.4 KiB
167 行
6.4 KiB
from abc import ABC, abstractmethod
|
|
import numpy as np
|
|
|
|
from typing import List, Dict, NamedTuple, Iterable, Tuple
|
|
from mlagents_envs.base_env import (
|
|
DecisionSteps,
|
|
TerminalSteps,
|
|
BehaviorSpec,
|
|
BehaviorName,
|
|
ActionTuple,
|
|
)
|
|
from mlagents_envs.side_channel.stats_side_channel import EnvironmentStats
|
|
|
|
from mlagents.trainers.policy import Policy
|
|
from mlagents.trainers.agent_processor import AgentManager, AgentManagerQueue
|
|
from mlagents.trainers.action_info import ActionInfo
|
|
from mlagents_envs.logging_util import get_logger
|
|
from mlagents_envs.exception import UnityActionException
|
|
|
|
AllStepResult = Dict[BehaviorName, Tuple[DecisionSteps, TerminalSteps]]
|
|
AllGroupSpec = Dict[BehaviorName, BehaviorSpec]
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class EnvironmentStep(NamedTuple):
|
|
current_all_step_result: AllStepResult
|
|
worker_id: int
|
|
brain_name_to_action_info: Dict[BehaviorName, ActionInfo]
|
|
environment_stats: EnvironmentStats
|
|
|
|
@property
|
|
def name_behavior_ids(self) -> Iterable[BehaviorName]:
|
|
return self.current_all_step_result.keys()
|
|
|
|
@staticmethod
|
|
def empty(worker_id: int) -> "EnvironmentStep":
|
|
return EnvironmentStep({}, worker_id, {}, {})
|
|
|
|
|
|
class EnvManager(ABC):
|
|
def __init__(self):
|
|
self.policies: Dict[BehaviorName, Policy] = {}
|
|
self.agent_managers: Dict[BehaviorName, AgentManager] = {}
|
|
self.first_step_infos: List[EnvironmentStep] = []
|
|
|
|
def set_policy(self, brain_name: BehaviorName, policy: Policy) -> None:
|
|
self.policies[brain_name] = policy
|
|
if brain_name in self.agent_managers:
|
|
self.agent_managers[brain_name].policy = policy
|
|
|
|
def set_agent_manager(
|
|
self, brain_name: BehaviorName, manager: AgentManager
|
|
) -> None:
|
|
self.agent_managers[brain_name] = manager
|
|
|
|
@abstractmethod
|
|
def _step(self) -> List[EnvironmentStep]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _reset_env(self, config: Dict = None) -> List[EnvironmentStep]:
|
|
pass
|
|
|
|
def reset(self, config: Dict = None) -> int:
|
|
for manager in self.agent_managers.values():
|
|
manager.end_episode()
|
|
# Save the first step infos, after the reset.
|
|
# They will be processed on the first advance().
|
|
self.first_step_infos = self._reset_env(config)
|
|
return len(self.first_step_infos)
|
|
|
|
@abstractmethod
|
|
def set_env_parameters(self, config: Dict = None) -> None:
|
|
"""
|
|
Sends environment parameter settings to C# via the
|
|
EnvironmentParametersSideChannel.
|
|
:param config: Dict of environment parameter keys and values
|
|
"""
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def training_behaviors(self) -> Dict[BehaviorName, BehaviorSpec]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def close(self):
|
|
pass
|
|
|
|
def get_steps(self) -> List[EnvironmentStep]:
|
|
"""
|
|
Updates the policies, steps the environments, and returns the step information from the environments.
|
|
Calling code should pass the returned EnvironmentSteps to process_steps() after calling this.
|
|
:return: The list of EnvironmentSteps
|
|
"""
|
|
# If we had just reset, process the first EnvironmentSteps.
|
|
# Note that we do it here instead of in reset() so that on the very first reset(),
|
|
# we can create the needed AgentManagers before calling advance() and processing the EnvironmentSteps.
|
|
if self.first_step_infos:
|
|
self._process_step_infos(self.first_step_infos)
|
|
self.first_step_infos = []
|
|
# Get new policies if found. Always get the latest policy.
|
|
for brain_name in self.agent_managers.keys():
|
|
_policy = None
|
|
try:
|
|
# We make sure to empty the policy queue before continuing to produce steps.
|
|
# This halts the trainers until the policy queue is empty.
|
|
while True:
|
|
_policy = self.agent_managers[brain_name].policy_queue.get_nowait()
|
|
except AgentManagerQueue.Empty:
|
|
if _policy is not None:
|
|
# policy_queue contains Policy, but we need a TFPolicy here
|
|
self.set_policy(brain_name, _policy) # type: ignore
|
|
# Step the environments
|
|
new_step_infos = self._step()
|
|
return new_step_infos
|
|
|
|
def process_steps(self, new_step_infos: List[EnvironmentStep]) -> int:
|
|
# Add to AgentProcessor
|
|
num_step_infos = self._process_step_infos(new_step_infos)
|
|
return num_step_infos
|
|
|
|
def _process_step_infos(self, step_infos: List[EnvironmentStep]) -> int:
|
|
for step_info in step_infos:
|
|
for name_behavior_id in step_info.name_behavior_ids:
|
|
if name_behavior_id not in self.agent_managers:
|
|
logger.warning(
|
|
"Agent manager was not created for behavior id {}.".format(
|
|
name_behavior_id
|
|
)
|
|
)
|
|
continue
|
|
decision_steps, terminal_steps = step_info.current_all_step_result[
|
|
name_behavior_id
|
|
]
|
|
self.agent_managers[name_behavior_id].add_experiences(
|
|
decision_steps,
|
|
terminal_steps,
|
|
step_info.worker_id,
|
|
step_info.brain_name_to_action_info.get(
|
|
name_behavior_id, ActionInfo.empty()
|
|
),
|
|
)
|
|
|
|
self.agent_managers[name_behavior_id].record_environment_stats(
|
|
step_info.environment_stats, step_info.worker_id
|
|
)
|
|
return len(step_infos)
|
|
|
|
@staticmethod
|
|
def action_tuple_from_numpy_dict(action_dict: Dict[str, np.ndarray]) -> ActionTuple:
|
|
if "continuous_action" in action_dict:
|
|
continuous = action_dict["continuous_action"]
|
|
if "discrete_action" in action_dict:
|
|
discrete = action_dict["discrete_action"]
|
|
action_tuple = ActionTuple(continuous, discrete)
|
|
else:
|
|
action_tuple = ActionTuple.create_continuous(continuous)
|
|
elif "discrete_action" in action_dict:
|
|
discrete = action_dict["discrete_action"]
|
|
action_tuple = ActionTuple.create_discrete(discrete)
|
|
else:
|
|
raise UnityActionException(
|
|
"The action dict must contain entries for either continuous_action or discrete_action."
|
|
)
|
|
return action_tuple
|