|
|
|
|
|
|
else: |
|
|
|
raise UnityEnvironmentException("No Unity environment is loaded.") |
|
|
|
|
|
|
|
def step(self, vector_action=None, memory=None, text_action=None) -> AllBrainInfo: |
|
|
|
def step(self, vector_action=None, memory=None, text_action=None, value=None) -> AllBrainInfo: |
|
|
|
""" |
|
|
|
Provides the environment with an action, moves the environment dynamics forward accordingly, and returns |
|
|
|
observation, state, and reward information to the agent. |
|
|
|
|
|
|
vector_action = {} if vector_action is None else vector_action |
|
|
|
memory = {} if memory is None else memory |
|
|
|
text_action = {} if text_action is None else text_action |
|
|
|
value = {} if value is None else value |
|
|
|
if self._loaded and not self._global_done and self._global_done is not None: |
|
|
|
if isinstance(vector_action, (int, np.int_, float, np.float_, list, np.ndarray)): |
|
|
|
if self._num_external_brains == 1: |
|
|
|
|
|
|
raise UnityActionException( |
|
|
|
"There are no external brains in the environment, " |
|
|
|
"step cannot take a value input") |
|
|
|
if isinstance(value, (int, np.int_, float, np.float_, list, np.ndarray)): |
|
|
|
if self._num_external_brains == 1: |
|
|
|
value = {self._external_brain_names[0]: value} |
|
|
|
elif self._num_external_brains > 1: |
|
|
|
raise UnityActionException( |
|
|
|
"You have {0} brains, you need to feed a dictionary of brain names as keys " |
|
|
|
"and state/action value estimates as values".format(self._num_brains)) |
|
|
|
else: |
|
|
|
raise UnityActionException( |
|
|
|
"There are no external brains in the environment, " |
|
|
|
"step cannot take a value input") |
|
|
|
|
|
|
|
for brain_name in list(vector_action.keys()) + list(memory.keys()) + list(text_action.keys()): |
|
|
|
if brain_name not in self._external_brain_names: |
|
|
|
|
|
|
str(vector_action[b]))) |
|
|
|
|
|
|
|
outputs = self.communicator.exchange( |
|
|
|
self._generate_step_input(vector_action, memory, text_action) |
|
|
|
self._generate_step_input(vector_action, memory, text_action, value) |
|
|
|
) |
|
|
|
if outputs is None: |
|
|
|
raise KeyboardInterrupt |
|
|
|
|
|
|
) |
|
|
|
return _data, global_done |
|
|
|
|
|
|
|
def _generate_step_input(self, vector_action, memory, text_action) -> UnityRLInput: |
|
|
|
def _generate_step_input(self, vector_action, memory, text_action, value) -> UnityRLInput: |
|
|
|
rl_in = UnityRLInput() |
|
|
|
for b in vector_action: |
|
|
|
n_agents = self._n_agents[b] |
|
|
|
|
|
|
action = AgentActionProto( |
|
|
|
vector_actions=vector_action[b][i*_a_s: (i+1)*_a_s], |
|
|
|
memories=memory[b][i*_m_s: (i+1)*_m_s], |
|
|
|
text_actions=text_action[b][i] |
|
|
|
text_actions=text_action[b][i], |
|
|
|
if b in value: |
|
|
|
action.value = value[b][i] |
|
|
|
rl_in.agent_actions[b].value.extend([action]) |
|
|
|
rl_in.command = 0 |
|
|
|
return self.wrap_unity_input(rl_in) |
|
|
|