浏览代码

Only send previous action and current BrainInfo (#3187)

This PR makes it so that the env_manager only sends one current BrainInfo and the previous actions (if any) to the AgentManager. The list of agents was added to the ActionInfo and used appropriately.
/asymm-envs
GitHub 5 年前
当前提交
4c241a80
共有 11 个文件被更改,包括 100 次插入79 次删除
  1. 3
      ml-agents/mlagents/trainers/action_info.py
  2. 76
      ml-agents/mlagents/trainers/agent_processor.py
  3. 3
      ml-agents/mlagents/trainers/env_manager.py
  4. 10
      ml-agents/mlagents/trainers/simple_env_manager.py
  5. 8
      ml-agents/mlagents/trainers/subprocess_env_manager.py
  6. 12
      ml-agents/mlagents/trainers/tests/test_agent_processor.py
  7. 9
      ml-agents/mlagents/trainers/tests/test_policy.py
  8. 4
      ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py
  9. 14
      ml-agents/mlagents/trainers/tests/test_trainer_controller.py
  10. 7
      ml-agents/mlagents/trainers/tf_policy.py
  11. 33
      ml-agents/mlagents/trainers/trainer_controller.py

3
ml-agents/mlagents/trainers/action_info.py


from typing import NamedTuple, Any, Dict
from typing import NamedTuple, Any, Dict, List
import numpy as np
ActionInfoOutputs = Dict[str, np.ndarray]

action: Any
value: Any
outputs: ActionInfoOutputs
agents: List[str]

76
ml-agents/mlagents/trainers/agent_processor.py


from mlagents.trainers.brain import BrainInfo
from mlagents.trainers.tf_policy import TFPolicy
from mlagents.trainers.policy import Policy
from mlagents.trainers.action_info import ActionInfoOutputs
from mlagents.trainers.action_info import ActionInfo, ActionInfoOutputs
from mlagents.trainers.stats import StatsReporter
T = TypeVar("T")

"""
self.experience_buffers: Dict[str, List[AgentExperience]] = defaultdict(list)
self.last_brain_info: Dict[str, BrainInfo] = {}
# last_take_action_outputs stores the action a_t taken before the current observation s_(t+1), while
# grabbing previous_action from the policy grabs the action PRIOR to that, a_(t-1).
# Note: this is needed until we switch to AgentExperiences as the data input type.
# We still need some info from the policy (memories, previous actions)
# that really should be gathered by the env-manager.
# Note: In the future this policy reference will be the policy of the env_manager and not the trainer.
# We can in that case just grab the action from the policy rather than having it passed in.
self.policy = policy
self.episode_steps: Counter = Counter()
self.episode_rewards: Dict[str, float] = defaultdict(float)

self.behavior_id = behavior_id
def add_experiences(
self,
curr_info: BrainInfo,
next_info: BrainInfo,
take_action_outputs: ActionInfoOutputs,
self, curr_info: BrainInfo, previous_action: ActionInfo
:param next_info: next BrainInfo.
:param take_action_outputs: The outputs of the Policy's get_action method.
:param previous_action: The return value of the Policy's get_action method.
take_action_outputs = previous_action.outputs
self.stats_reporter.add_stat(
"Policy/Entropy", take_action_outputs["entropy"].mean()
)
for _entropy in take_action_outputs["entropy"]:
self.stats_reporter.add_stat("Policy/Entropy", _entropy)
for agent_id in curr_info.agents:
self.last_brain_info[agent_id] = curr_info
for agent_id in previous_action.agents:
tmp_environment_reward = next_info.rewards
tmp_environment_reward = curr_info.rewards
for next_idx, agent_id in enumerate(next_info.agents):
for agent_idx, agent_id in enumerate(curr_info.agents):
if stored_info is not None:
stored_take_action_outputs = self.last_take_action_outputs[agent_id]
idx = stored_info.agents.index(agent_id)
stored_take_action_outputs = self.last_take_action_outputs.get(
agent_id, None
)
if stored_info is not None and stored_take_action_outputs is not None:
prev_idx = stored_info.agents.index(agent_id)
if not stored_info.local_done[idx]:
if not stored_info.local_done[prev_idx]:
obs.append(stored_info.visual_observations[i][idx])
obs.append(stored_info.visual_observations[i][prev_idx])
obs.append(stored_info.vector_observations[idx])
obs.append(stored_info.vector_observations[prev_idx])
done = next_info.local_done[next_idx]
max_step = next_info.max_reached[next_idx]
done = curr_info.local_done[agent_idx]
max_step = curr_info.max_reached[agent_idx]
action = stored_take_action_outputs["action"][idx]
action = stored_take_action_outputs["action"][prev_idx]
action_pre = stored_take_action_outputs["pre_action"][idx]
action_pre = stored_take_action_outputs["pre_action"][prev_idx]
action_probs = stored_take_action_outputs["log_probs"][idx]
action_masks = stored_info.action_masks[idx]
action_probs = stored_take_action_outputs["log_probs"][prev_idx]
action_masks = stored_info.action_masks[prev_idx]
reward=tmp_environment_reward[next_idx],
reward=tmp_environment_reward[agent_idx],
done=done,
action=action,
action_probs=action_probs,

)
# Add the value outputs if needed
self.experience_buffers[agent_id].append(experience)
self.episode_rewards[agent_id] += tmp_environment_reward[next_idx]
self.episode_rewards[agent_id] += tmp_environment_reward[agent_idx]
next_info.local_done[next_idx]
curr_info.local_done[agent_idx]
or (
len(self.experience_buffers[agent_id])
>= self.max_trajectory_length

next_obs = []
for i, _ in enumerate(next_info.visual_observations):
next_obs.append(next_info.visual_observations[i][next_idx])
for i, _ in enumerate(curr_info.visual_observations):
next_obs.append(curr_info.visual_observations[i][agent_idx])
next_obs.append(next_info.vector_observations[next_idx])
next_obs.append(curr_info.vector_observations[agent_idx])
trajectory = Trajectory(
steps=self.experience_buffers[agent_id],
agent_id=agent_id,

for traj_queue in self.trajectory_queues:
traj_queue.put(trajectory)
self.experience_buffers[agent_id] = []
if next_info.local_done[next_idx]:
if curr_info.local_done[agent_idx]:
self.stats_reporter.add_stat(
"Environment/Cumulative Reward",
self.episode_rewards.get(agent_id, 0),

)
del self.episode_steps[agent_id]
del self.episode_rewards[agent_id]
elif not next_info.local_done[next_idx]:
elif not curr_info.local_done[agent_idx]:
self.last_brain_info[agent_id] = curr_info
curr_info.agents, take_action_outputs["action"]
previous_action.agents, take_action_outputs["action"]
)
def publish_trajectory_queue(

3
ml-agents/mlagents/trainers/env_manager.py


class EnvironmentStep(NamedTuple):
previous_all_brain_info: AllBrainInfo
return self.brain_name_to_action_info.keys()
return self.current_all_brain_info.keys()
class EnvManager(ABC):

10
ml-agents/mlagents/trainers/simple_env_manager.py


super().__init__()
self.shared_float_properties = float_prop_channel
self.env = env
self.previous_step: EnvironmentStep = EnvironmentStep({}, {}, {})
self.previous_step: EnvironmentStep = EnvironmentStep({}, {})
self.previous_all_action_info: Dict[str, ActionInfo] = {}
def step(self) -> List[EnvironmentStep]:

all_brain_info = self._generate_all_brain_info()
step_brain_info = all_brain_info
step_info = EnvironmentStep(
self.previous_step.current_all_brain_info,
step_brain_info,
self.previous_all_action_info,
)
step_info = EnvironmentStep(step_brain_info, self.previous_all_action_info)
self.previous_step = step_info
return [step_info]

self.shared_float_properties.set_property(k, v)
self.env.reset()
all_brain_info = self._generate_all_brain_info()
self.previous_step = EnvironmentStep({}, all_brain_info, {})
self.previous_step = EnvironmentStep(all_brain_info, {})
return [self.previous_step]
@property

8
ml-agents/mlagents/trainers/subprocess_env_manager.py


self.process = process
self.worker_id = worker_id
self.conn = conn
self.previous_step: EnvironmentStep = EnvironmentStep({}, {}, {})
self.previous_step: EnvironmentStep = EnvironmentStep({}, {})
self.previous_all_action_info: Dict[str, ActionInfo] = {}
self.waiting = False

ew.send("reset", config)
# Next (synchronously) collect the reset observations from each worker in sequence
for ew in self.env_workers:
ew.previous_step = EnvironmentStep({}, ew.recv().payload, {})
ew.previous_step = EnvironmentStep(ew.recv().payload, {})
return list(map(lambda ew: ew.previous_step, self.env_workers))
@property

payload: StepResponse = step.payload
env_worker = self.env_workers[step.worker_id]
new_step = EnvironmentStep(
env_worker.previous_step.current_all_brain_info,
payload.all_brain_info,
env_worker.previous_all_action_info,
payload.all_brain_info, env_worker.previous_all_action_info
)
step_infos.append(new_step)
env_worker.previous_step = new_step

12
ml-agents/mlagents/trainers/tests/test_agent_processor.py


AgentManager,
AgentManagerQueue,
)
from mlagents.trainers.action_info import ActionInfo
from mlagents.trainers.trajectory import Trajectory
from mlagents.trainers.stats import StatsReporter

max_trajectory_length=5,
stats_reporter=StatsReporter("testcat"),
)
fake_action_outputs = {
"action": [0.1, 0.1],
"entropy": np.array([1.0], dtype=np.float32),

num_vector_acts=2,
num_vis_observations=num_vis_obs,
)
fake_action_info = ActionInfo(
action=[0.1, 0.1],
value=[0.1, 0.1],
outputs=fake_action_outputs,
agents=mock_braininfo.agents,
)
# This is like the initial state after the env reset
processor.add_experiences(mock_braininfo, ActionInfo([], [], {}, []))
processor.add_experiences(mock_braininfo, mock_braininfo, fake_action_outputs)
processor.add_experiences(mock_braininfo, fake_action_info)
# Assert that two trajectories have been added to the Trainer
assert len(tqueue.put.call_args_list) == 2

9
ml-agents/mlagents/trainers/tests/test_policy.py


policy = TFPolicy(test_seed, basic_mock_brain(), basic_params())
no_agent_brain_info = BrainInfo([], [], [], agents=[])
result = policy.get_action(no_agent_brain_info)
assert result == ActionInfo([], [], {})
assert result == ActionInfo([], [], {}, [])
def test_take_action_returns_nones_on_missing_values():

[], [], [], agents=["an-agent-id"], local_done=[False]
)
result = policy.get_action(brain_info_with_agents)
assert result == ActionInfo(None, None, {})
assert result == ActionInfo(None, None, {}, ["an-agent-id"])
def test_take_action_returns_action_info_when_available():

)
result = policy.get_action(brain_info_with_agents)
expected = ActionInfo(
policy_eval_out["action"], policy_eval_out["value"], policy_eval_out
policy_eval_out["action"],
policy_eval_out["value"],
policy_eval_out,
["an-agent-id"],
)
assert result == expected

4
ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py


self.assertEqual(
manager.env_workers[i].previous_step.current_all_brain_info, i
)
self.assertEqual(
manager.env_workers[i].previous_step.previous_all_brain_info,
last_steps[i].current_all_brain_info,
)
assert res == [
manager.env_workers[0].previous_step,
manager.env_workers[1].previous_step,

14
ml-agents/mlagents/trainers/tests/test_trainer_controller.py


action_info_dict = {brain_name: MagicMock()}
brain_info_dict = {brain_name: Mock()}
old_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict)
new_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict)
old_step_info = EnvironmentStep(brain_info_dict, action_info_dict)
new_step_info = EnvironmentStep(brain_info_dict, action_info_dict)
trainer_mock._is_ready_update = MagicMock(return_value=True)
env_mock = MagicMock()

manager_mock = tc.managers[brain_name]
manager_mock.add_experiences.assert_called_once_with(
new_step_info.previous_all_brain_info[brain_name],
new_step_info.brain_name_to_action_info[brain_name].outputs,
new_step_info.brain_name_to_action_info[brain_name],
)
trainer_mock.advance.assert_called_once()

action_info_dict = {brain_name: MagicMock()}
brain_info_dict = {brain_name: Mock()}
old_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict)
new_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict)
old_step_info = EnvironmentStep(brain_info_dict, action_info_dict)
new_step_info = EnvironmentStep(brain_info_dict, action_info_dict)
trainer_mock._is_ready_update = MagicMock(return_value=False)

env_mock.step.assert_called_once()
manager_mock = tc.managers[brain_name]
manager_mock.add_experiences.assert_called_once_with(
new_step_info.previous_all_brain_info[brain_name],
new_step_info.brain_name_to_action_info[brain_name].outputs,
new_step_info.brain_name_to_action_info[brain_name],
)
trainer_mock.advance.assert_called_once()

7
ml-agents/mlagents/trainers/tf_policy.py


to be passed to add experiences
"""
if len(brain_info.agents) == 0:
return ActionInfo([], [], {})
return ActionInfo([], [], {}, [])
agents_done = [
agent

run_out = self.evaluate(brain_info) # pylint: disable=assignment-from-no-return
self.save_memories(brain_info.agents, run_out.get("memory_out"))
return ActionInfo(
action=run_out.get("action"), value=run_out.get("value"), outputs=run_out
action=run_out.get("action"),
value=run_out.get("value"),
outputs=run_out,
agents=brain_info.agents,
)
def update(self, mini_batch, num_sequences):

33
ml-agents/mlagents/trainers/trainer_controller.py


from mlagents.trainers.trainer import Trainer
from mlagents.trainers.meta_curriculum import MetaCurriculum
from mlagents.trainers.trainer_util import TrainerFactory
from mlagents.trainers.action_info import ActionInfo
from mlagents.trainers.agent_processor import AgentManager, AgentManagerQueue

trainer.subscribe_trajectory_queue(agent_manager.trajectory_queue)
self.managers[name_behavior_id] = agent_manager
def _create_trainers_and_managers(
self, env_manager: EnvManager, behavior_ids: Set[str]
) -> None:
for behavior_id in behavior_ids:
self._create_trainer_and_manager(env_manager, behavior_id)
def start_learning(self, env_manager: EnvManager) -> None:
self._create_model_path(self.model_path)
tf.reset_default_graph()

self._reset_env(env_manager)
initial_step = self._reset_env(env_manager)
# Create the initial set of trainers and managers
initial_brain_behaviors = set(env_manager.external_brains.keys())
self._create_trainers_and_managers(env_manager, initial_brain_behaviors)
last_brain_behavior_ids = initial_brain_behaviors
self._process_step_infos(initial_step)
for name_behavior_id in new_behavior_ids:
self._create_trainer_and_manager(env_manager, name_behavior_id)
self._create_trainers_and_managers(env_manager, new_behavior_ids)
last_brain_behavior_ids = external_brain_behavior_ids
n_steps = self.advance(env_manager)
for _ in range(n_steps):

def end_trainer_episodes(
self, env: EnvManager, lessons_incremented: Dict[str, bool]
) -> None:
self._reset_env(env)
reset_step = self._reset_env(env)
self._process_step_infos(reset_step)
# Reward buffers reset takes place only for curriculum learning
# else no reset.
for trainer in self.trainers.values():

# Step the environment
new_step_infos = env.step()
# Add to AgentProcessor
for step_info in new_step_infos:
num_step_infos = self._process_step_infos(new_step_infos)
return num_step_infos
def _process_step_infos(self, step_infos: List[EnvironmentStep]) -> int:
for step_info in step_infos:
for name_behavior_id in step_info.name_behavior_ids:
if name_behavior_id not in self.managers:
self.logger.warning(

)
continue
self.managers[name_behavior_id].add_experiences(
step_info.previous_all_brain_info[name_behavior_id],
step_info.brain_name_to_action_info[name_behavior_id].outputs,
step_info.brain_name_to_action_info.get(
name_behavior_id, ActionInfo([], [], {}, [])
),
return len(new_step_infos)
return len(step_infos)
@timed
def advance(self, env: EnvManager) -> int:

正在加载...
取消
保存