|
|
|
|
|
|
import random |
|
|
|
from typing import Dict, List, Any |
|
|
|
from typing import Dict, List, Any, Tuple |
|
|
|
import numpy as np |
|
|
|
|
|
|
|
from mlagents_envs.base_env import ( |
|
|
|
|
|
|
self.rewards: Dict[str, float] = {} |
|
|
|
self.final_rewards: Dict[str, List[float]] = {} |
|
|
|
self.step_result: Dict[str, BatchedStepResult] = {} |
|
|
|
self.agent_id: Dict[str, int] = {} |
|
|
|
self.agent_id[name] = 0 |
|
|
|
self.goal[name] = self.random.choice([-1, 1]) |
|
|
|
self.rewards[name] = 0 |
|
|
|
self.final_rewards[name] = [] |
|
|
|
|
|
|
delta = 1 if act else -1 |
|
|
|
else: |
|
|
|
delta = self.action[name][0][0] |
|
|
|
|
|
|
|
delta = clamp(delta, -self.step_size, self.step_size) |
|
|
|
self.position[name] += delta |
|
|
|
self.position[name] = clamp(self.position[name], -1, 1) |
|
|
|
|
|
|
m_vector_obs = self._make_obs(self.goal[name]) |
|
|
|
m_reward = np.array([reward], dtype=np.float32) |
|
|
|
m_done = np.array([done], dtype=np.bool) |
|
|
|
m_agent_id = np.array([0], dtype=np.int32) |
|
|
|
m_agent_id = np.array([self.agent_id[name]], dtype=np.int32) |
|
|
|
|
|
|
|
if done: |
|
|
|
self._reset_agent(name) |
|
|
|
new_vector_obs = self._make_obs(self.goal[name]) |
|
|
|
( |
|
|
|
m_vector_obs, |
|
|
|
m_reward, |
|
|
|
m_done, |
|
|
|
m_agent_id, |
|
|
|
action_mask, |
|
|
|
) = self._construct_reset_step( |
|
|
|
m_vector_obs, |
|
|
|
new_vector_obs, |
|
|
|
m_reward, |
|
|
|
m_done, |
|
|
|
m_agent_id, |
|
|
|
action_mask, |
|
|
|
name, |
|
|
|
) |
|
|
|
m_vector_obs, m_reward, m_done, m_done, m_agent_id, action_mask |
|
|
|
m_vector_obs, |
|
|
|
m_reward, |
|
|
|
m_done, |
|
|
|
np.zeros(m_done.shape, dtype=bool), |
|
|
|
m_agent_id, |
|
|
|
action_mask, |
|
|
|
def _construct_reset_step( |
|
|
|
self, |
|
|
|
vector_obs: List[np.ndarray], |
|
|
|
new_vector_obs: List[np.ndarray], |
|
|
|
reward: np.ndarray, |
|
|
|
done: np.ndarray, |
|
|
|
agent_id: np.ndarray, |
|
|
|
action_mask: List[np.ndarray], |
|
|
|
name: str, |
|
|
|
) -> Tuple[List[np.ndarray], np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
|
|
|
new_reward = np.array([0.0], dtype=np.float32) |
|
|
|
new_done = np.array([False], dtype=np.bool) |
|
|
|
new_agent_id = np.array([self.agent_id[name]], dtype=np.int32) |
|
|
|
new_action_mask = self._generate_mask() |
|
|
|
|
|
|
|
m_vector_obs = [ |
|
|
|
np.concatenate((old, new), axis=0) |
|
|
|
for old, new in zip(vector_obs, new_vector_obs) |
|
|
|
] |
|
|
|
m_reward = np.concatenate((reward, new_reward), axis=0) |
|
|
|
m_done = np.concatenate((done, new_done), axis=0) |
|
|
|
m_agent_id = np.concatenate((agent_id, new_agent_id), axis=0) |
|
|
|
if action_mask is not None: |
|
|
|
action_mask = [ |
|
|
|
np.concatenate((old, new), axis=0) |
|
|
|
for old, new in zip(action_mask, new_action_mask) |
|
|
|
] |
|
|
|
return m_vector_obs, m_reward, m_done, m_agent_id, action_mask |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if done: |
|
|
|
self._reset_agent(name) |
|
|
|
|
|
|
|
def _generate_mask(self): |
|
|
|
if self.discrete: |
|
|
|
|
|
|
self.step_count[name] = 0 |
|
|
|
self.final_rewards[name].append(self.rewards[name]) |
|
|
|
self.rewards[name] = 0 |
|
|
|
self.agent_id[name] = self.agent_id[name] + 1 |
|
|
|
|
|
|
|
def reset(self) -> None: # type: ignore |
|
|
|
for name in self.names: |
|
|
|
|
|
|
m_vector_obs = self._make_obs(recurrent_obs_val) |
|
|
|
m_reward = np.array([reward], dtype=np.float32) |
|
|
|
m_done = np.array([done], dtype=np.bool) |
|
|
|
m_agent_id = np.array([0], dtype=np.int32) |
|
|
|
m_agent_id = np.array([self.agent_id[name]], dtype=np.int32) |
|
|
|
if done: |
|
|
|
self._reset_agent(name) |
|
|
|
recurrent_obs_val = ( |
|
|
|
self.goal[name] if self.step_count[name] <= self.num_show_steps else 0 |
|
|
|
) |
|
|
|
new_vector_obs = self._make_obs(recurrent_obs_val) |
|
|
|
( |
|
|
|
m_vector_obs, |
|
|
|
m_reward, |
|
|
|
m_done, |
|
|
|
m_agent_id, |
|
|
|
action_mask, |
|
|
|
) = self._construct_reset_step( |
|
|
|
m_vector_obs, |
|
|
|
new_vector_obs, |
|
|
|
m_reward, |
|
|
|
m_done, |
|
|
|
m_agent_id, |
|
|
|
action_mask, |
|
|
|
name, |
|
|
|
) |
|
|
|
m_vector_obs, m_reward, m_done, m_done, m_agent_id, action_mask |
|
|
|
m_vector_obs, |
|
|
|
m_reward, |
|
|
|
m_done, |
|
|
|
np.zeros(m_done.shape, dtype=bool), |
|
|
|
m_agent_id, |
|
|
|
action_mask, |
|
|
|
) |