浏览代码

Test for team obs in agentprocessor

/develop/coma2/samenet
Ervin Teng 4 年前
当前提交
44073593
共有 2 个文件被更改,包括 90 次插入14 次删除
  1. 4
      ml-agents/mlagents/trainers/tests/mock_brain.py
  2. 100
      ml-agents/mlagents/trainers/tests/test_agent_processor.py

4
ml-agents/mlagents/trainers/tests/mock_brain.py


observation_specs: List[ObservationSpec],
action_spec: ActionSpec,
done: bool = False,
grouped: bool = False,
) -> Tuple[DecisionSteps, TerminalSteps]:
"""
Creates a mock Tuple[DecisionSteps, TerminalSteps] with observations.

reward = np.array(num_agents * [1.0], dtype=np.float32)
interrupted = np.array(num_agents * [False], dtype=np.bool)
agent_id = np.arange(num_agents, dtype=np.int32)
group_id = np.array(num_agents * [0], dtype=np.int32)
_gid = 1 if grouped else 0
group_id = np.array(num_agents * [_gid], dtype=np.int32)
group_reward = np.array(num_agents * [0.0], dtype=np.float32)
behavior_spec = BehaviorSpec(observation_specs, action_spec)
if done:

100
ml-agents/mlagents/trainers/tests/test_agent_processor.py


from unittest import mock
import pytest
from typing import List
import mlagents.trainers.tests.mock_brain as mb
import numpy as np
from mlagents.trainers.agent_processor import (

return mock_policy
def _create_action_info(num_agents: int, agent_ids: List[str]) -> ActionInfo:
fake_action_outputs = {
"action": ActionTuple(
continuous=np.array([[0.1]] * num_agents, dtype=np.float32)
),
"entropy": np.array([1.0], dtype=np.float32),
"learning_rate": 1.0,
"log_probs": LogProbsTuple(
continuous=np.array([[0.1]] * num_agents, dtype=np.float32)
),
}
fake_action_info = ActionInfo(
action=ActionTuple(continuous=np.array([[0.1]] * num_agents, dtype=np.float32)),
env_action=ActionTuple(
continuous=np.array([[0.1]] * num_agents, dtype=np.float32)
),
value=[0.1] * num_agents,
outputs=fake_action_outputs,
agent_ids=agent_ids,
)
return fake_action_info
@pytest.mark.parametrize("num_vis_obs", [0, 1, 2], ids=["vec", "1 viz", "2 viz"])
def test_agentprocessor(num_vis_obs):
policy = create_mock_policy()

stats_reporter=StatsReporter("testcat"),
)
fake_action_outputs = {
"action": ActionTuple(continuous=np.array([[0.1], [0.1]])),
"entropy": np.array([1.0], dtype=np.float32),
"learning_rate": 1.0,
"log_probs": LogProbsTuple(continuous=np.array([[0.1], [0.1]])),
}
mock_decision_steps, mock_terminal_steps = mb.create_mock_steps(
num_agents=2,
observation_specs=create_observation_specs_with_shapes(

)
fake_action_info = ActionInfo(
action=ActionTuple(continuous=np.array([[0.1], [0.1]])),
env_action=ActionTuple(continuous=np.array([[0.1], [0.1]])),
value=[0.1, 0.1],
outputs=fake_action_outputs,
agent_ids=mock_decision_steps.agent_id,
)
fake_action_info = _create_action_info(2, mock_decision_steps.agent_id)
processor.publish_trajectory_queue(tqueue)
# This is like the initial state after the env reset
processor.add_experiences(

# Assert that the trajectory is of length 5
trajectory = tqueue.put.call_args_list[0][0][0]
assert len(trajectory.steps) == 5
# Make sure ungrouped agents don't have team obs
for step in trajectory.steps:
assert len(step.group_status) == 0
# Assert that the AgentProcessor is empty
assert len(processor.experience_buffers[0]) == 0

)
# Assert that the AgentProcessor is still empty
assert len(processor.experience_buffers[0]) == 0
def test_group_statuses():
policy = create_mock_policy()
tqueue = mock.Mock()
name_behavior_id = "test_brain_name"
processor = AgentProcessor(
policy,
name_behavior_id,
max_trajectory_length=5,
stats_reporter=StatsReporter("testcat"),
)
mock_decision_steps, mock_terminal_steps = mb.create_mock_steps(
num_agents=4,
observation_specs=create_observation_specs_with_shapes([(8,)]),
action_spec=ActionSpec.create_continuous(2),
grouped=True,
)
fake_action_info = _create_action_info(4, mock_decision_steps.agent_id)
processor.publish_trajectory_queue(tqueue)
# This is like the initial state after the env reset
processor.add_experiences(
mock_decision_steps, mock_terminal_steps, 0, ActionInfo.empty()
)
for _ in range(2):
processor.add_experiences(
mock_decision_steps, mock_terminal_steps, 0, fake_action_info
)
# Make terminal steps for some dead agents
mock_decision_steps_2, mock_terminal_steps_2 = mb.create_mock_steps(
num_agents=2,
observation_specs=create_observation_specs_with_shapes([(8,)]),
action_spec=ActionSpec.create_continuous(2),
done=True,
grouped=True,
)
processor.add_experiences(
mock_decision_steps_2, mock_terminal_steps_2, 0, fake_action_info
)
fake_action_info = _create_action_info(4, mock_decision_steps.agent_id)
for _ in range(3):
processor.add_experiences(
mock_decision_steps, mock_terminal_steps, 0, fake_action_info
)
# Assert that four trajectories have been added to the Trainer
assert len(tqueue.put.call_args_list) == 4
# Last trajectory should be the longest
trajectory = tqueue.put.call_args_list[0][0][-1]
# Make sure trajectory has the right Groupmate Experiences
for step in trajectory.steps[0:3]:
assert len(step.group_status) == 3
# After 2 agents has died
for step in trajectory.steps[3:]:
assert len(step.group_status) == 1
def test_agent_deletion():

正在加载...
取消
保存