|
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class ActionBuffers(NamedTuple): |
|
|
|
class ActionTuple(NamedTuple): |
|
|
|
continuous: np.ndarray # dims (n_agents, cont_size) |
|
|
|
discrete: np.ndarray # dims (n_agents, disc_size) |
|
|
|
continuous: np.ndarray # dims (n_agents, continuous_size) |
|
|
|
discrete: np.ndarray # dims (n_agents, discrete_size) |
|
|
|
|
|
|
|
|
|
|
|
class ActionSpec(NamedTuple): |
|
|
|
|
|
|
""" |
|
|
|
return len(self.discrete_branches) |
|
|
|
|
|
|
|
def empty_action(self, n_agents: int) -> ActionBuffers: |
|
|
|
def empty_action(self, n_agents: int) -> ActionTuple: |
|
|
|
Generates ActionBuffers corresponding to an empty action (all zeros) |
|
|
|
Generates ActionTuple corresponding to an empty action (all zeros) |
|
|
|
for a number of agents. |
|
|
|
:param n_agents: The number of agents that will have actions generated |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
if self.discrete_size > 0: |
|
|
|
discrete = np.zeros((n_agents, self.discrete_size), dtype=np.int32) |
|
|
|
return ActionBuffers(continuous, discrete) |
|
|
|
return ActionTuple(continuous, discrete) |
|
|
|
def random_action(self, n_agents: int) -> ActionBuffers: |
|
|
|
def random_action(self, n_agents: int) -> ActionTuple: |
|
|
|
Generates ActionBuffers corresponding to a random action (either discrete |
|
|
|
Generates ActionTuple corresponding to a random action (either discrete |
|
|
|
or continuous) for a number of agents. |
|
|
|
:param n_agents: The number of agents that will have actions generated |
|
|
|
""" |
|
|
|
|
|
|
for i in range(self.discrete_size) |
|
|
|
] |
|
|
|
) |
|
|
|
return ActionBuffers(continuous, discrete) |
|
|
|
return ActionTuple(continuous, discrete) |
|
|
|
self, actions: ActionBuffers, n_agents: int, name: str |
|
|
|
) -> ActionBuffers: |
|
|
|
self, actions: ActionTuple, n_agents: int, name: str |
|
|
|
) -> ActionTuple: |
|
|
|
""" |
|
|
|
Validates that action has the correct action dim |
|
|
|
for the correct number of agents and ensures the type. |
|
|
|
|
|
|
f"{_expected_shape} for (<number of agents>, <action size>) but " |
|
|
|
f"received input of dimension {actions.continuous.shape}" |
|
|
|
) |
|
|
|
if actions.continuous.dtype != np.float32: |
|
|
|
actions.continuous = actions.continuous.astype(np.float32) |
|
|
|
if actions.continuous.dtype != np.float32: |
|
|
|
actions.continuous = actions.continuous.astype(np.float32) |
|
|
|
|
|
|
|
_expected_shape = (n_agents, self.discrete_size) |
|
|
|
if self.discrete_size > 0 and actions.discrete.shape != _expected_shape: |
|
|
|
|
|
|
f"received input of dimension {actions.discrete.shape}" |
|
|
|
) |
|
|
|
if actions.discrete.dtype != np.int32: |
|
|
|
actions.discrete = actions.discrete.astype(np.int32) |
|
|
|
if actions.discrete.dtype != np.int32: |
|
|
|
actions.discrete = actions.discrete.astype(np.int32) |
|
|
|
return actions |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
@abstractmethod |
|
|
|
def set_actions(self, behavior_name: BehaviorName, action: ActionBuffers) -> None: |
|
|
|
def set_actions(self, behavior_name: BehaviorName, action: ActionTuple) -> None: |
|
|
|
:param action: ActionBuffers tuple of continuous and/or discrete action |
|
|
|
:param action: ActionTuple tuple of continuous and/or discrete action |
|
|
|
self, behavior_name: BehaviorName, agent_id: AgentId, action: ActionBuffers |
|
|
|
self, behavior_name: BehaviorName, agent_id: AgentId, action: ActionTuple |
|
|
|
) -> None: |
|
|
|
""" |
|
|
|
Sets the action for one of the agents in the simulation for the next |
|
|
|
|
|
|
:param action: ActionBuffers tuple of continuous and/or discrete action |
|
|
|
:param action: ActionTuple tuple of continuous and/or discrete action |
|
|
|
""" |
|
|
|
|
|
|
|
@abstractmethod |
|
|
|