|
|
|
|
|
|
self.agent_id[name] = self.agent_id[name] + 1 |
|
|
|
|
|
|
|
def _make_batched_step( |
|
|
|
self, name: str, done: bool, reward: float |
|
|
|
self, name: str, done: bool, reward: float, group_reward: float |
|
|
|
m_group_id = np.array([0], dtype=np.int32) |
|
|
|
m_group_reward = np.array([group_reward], dtype=np.float32) |
|
|
|
decision_step = DecisionSteps(m_vector_obs, m_reward, m_agent_id, action_mask) |
|
|
|
decision_step = DecisionSteps( |
|
|
|
m_vector_obs, m_reward, m_agent_id, action_mask, m_group_id, m_group_reward |
|
|
|
) |
|
|
|
terminal_step = TerminalSteps.empty(self.behavior_spec) |
|
|
|
if done: |
|
|
|
self.final_rewards[name].append(self.rewards[name]) |
|
|
|
|
|
|
new_done, |
|
|
|
new_agent_id, |
|
|
|
new_action_mask, |
|
|
|
new_group_id, |
|
|
|
new_group_reward, |
|
|
|
new_vector_obs, new_reward, new_agent_id, new_action_mask |
|
|
|
new_vector_obs, |
|
|
|
new_reward, |
|
|
|
new_agent_id, |
|
|
|
new_action_mask, |
|
|
|
new_group_id, |
|
|
|
new_group_reward, |
|
|
|
m_vector_obs, m_reward, np.array([False], dtype=np.bool), m_agent_id |
|
|
|
m_vector_obs, |
|
|
|
m_reward, |
|
|
|
np.array([False], dtype=np.bool), |
|
|
|
m_agent_id, |
|
|
|
m_group_id, |
|
|
|
m_group_reward, |
|
|
|
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
|
|
|
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
|
|
|
return new_reward, new_done, new_agent_id, new_action_mask |
|
|
|
new_group_id = np.array([0], dtype=np.int32) |
|
|
|
new_group_reward = np.array([0.0], dtype=np.float32) |
|
|
|
return ( |
|
|
|
new_reward, |
|
|
|
new_done, |
|
|
|
new_agent_id, |
|
|
|
new_action_mask, |
|
|
|
new_group_id, |
|
|
|
new_group_reward, |
|
|
|
) |
|
|
|
|
|
|
|
def step(self) -> None: |
|
|
|
assert all(action is not None for action in self.action.values()) |
|
|
|
|
|
|
reward = self._compute_reward(name, done) |
|
|
|
self.rewards[name] += reward |
|
|
|
self.step_result[name] = self._make_batched_step(name, done, reward) |
|
|
|
self.step_result[name] = self._make_batched_step(name, done, reward, 0.0) |
|
|
|
self.step_result[name] = self._make_batched_step(name, False, 0.0) |
|
|
|
self.step_result[name] = self._make_batched_step(name, False, 0.0, 0.0) |
|
|
|
|
|
|
|
@property |
|
|
|
def reset_parameters(self) -> Dict[str, str]: |
|
|
|
|
|
|
self.num_show_steps = 2 |
|
|
|
|
|
|
|
def _make_batched_step( |
|
|
|
self, name: str, done: bool, reward: float |
|
|
|
self, name: str, done: bool, reward: float, group_reward: float |
|
|
|
) -> Tuple[DecisionSteps, TerminalSteps]: |
|
|
|
recurrent_obs_val = ( |
|
|
|
self.goal[name] if self.step_count[name] <= self.num_show_steps else 0 |
|
|
|
|
|
|
m_agent_id = np.array([self.agent_id[name]], dtype=np.int32) |
|
|
|
m_group_id = np.array([0], dtype=np.int32) |
|
|
|
m_group_reward = np.array([group_reward], dtype=np.float32) |
|
|
|
action_mask = self._generate_mask() |
|
|
|
decision_step = DecisionSteps(m_vector_obs, m_reward, m_agent_id, action_mask) |
|
|
|
terminal_step = TerminalSteps.empty(self.behavior_spec) |
|
|
|
|
|
|
new_done, |
|
|
|
new_agent_id, |
|
|
|
new_action_mask, |
|
|
|
new_group_id, |
|
|
|
new_group_reward, |
|
|
|
new_vector_obs, new_reward, new_agent_id, new_action_mask |
|
|
|
new_vector_obs, |
|
|
|
new_reward, |
|
|
|
new_agent_id, |
|
|
|
new_action_mask, |
|
|
|
new_group_id, |
|
|
|
new_group_reward, |
|
|
|
m_vector_obs, m_reward, np.array([False], dtype=np.bool), m_agent_id |
|
|
|
m_vector_obs, |
|
|
|
m_reward, |
|
|
|
np.array([False], dtype=np.bool), |
|
|
|
m_agent_id, |
|
|
|
m_group_id, |
|
|
|
m_group_reward, |
|
|
|
) |
|
|
|
return (decision_step, terminal_step) |
|
|
|
|
|
|
|