|
|
|
|
|
|
it reaches -1. The position is incremented by the action amount (clamped to [-step_size, step_size]). |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self, brain_names, use_discrete): |
|
|
|
def __init__(self, brain_names, use_discrete, step_size=STEP_SIZE): |
|
|
|
super().__init__() |
|
|
|
self.discrete = use_discrete |
|
|
|
action_type = ActionType.DISCRETE if use_discrete else ActionType.CONTINUOUS |
|
|
|
|
|
|
self.rewards: Dict[str, float] = {} |
|
|
|
self.final_rewards: Dict[str, List[float]] = {} |
|
|
|
self.step_result: Dict[str, BatchedStepResult] = {} |
|
|
|
self.step_size = step_size # defines the difficulty of the test |
|
|
|
|
|
|
|
for name in self.names: |
|
|
|
self.goal[name] = self.random.choice([-1, 1]) |
|
|
|
|
|
|
def get_step_result(self, name): |
|
|
|
return self.step_result[name] |
|
|
|
|
|
|
|
def _take_action(self, name: str) -> bool: |
|
|
|
if self.discrete: |
|
|
|
act = self.action[name][0][0] |
|
|
|
delta = 1 if act else -1 |
|
|
|
else: |
|
|
|
delta = self.action[name][0][0] |
|
|
|
delta = clamp(delta, -self.step_size, self.step_size) |
|
|
|
self.position[name] += delta |
|
|
|
self.position[name] = clamp(self.position[name], -1, 1) |
|
|
|
self.step_count[name] += 1 |
|
|
|
done = self.position[name] >= 1.0 or self.position[name] <= -1.0 |
|
|
|
return done |
|
|
|
|
|
|
|
def _compute_reward(self, name: str, done: bool) -> float: |
|
|
|
if done: |
|
|
|
reward = SUCCESS_REWARD * self.position[name] * self.goal[name] |
|
|
|
else: |
|
|
|
reward = -TIME_PENALTY |
|
|
|
return reward |
|
|
|
|
|
|
|
def _make_batched_step( |
|
|
|
self, name: str, done: bool, reward: float |
|
|
|
) -> BatchedStepResult: |
|
|
|
m_vector_obs = [np.ones((1, OBS_SIZE), dtype=np.float32) * self.goal[name]] |
|
|
|
m_reward = np.array([reward], dtype=np.float32) |
|
|
|
m_done = np.array([done], dtype=np.bool) |
|
|
|
m_agent_id = np.array([0], dtype=np.int32) |
|
|
|
action_mask = self._generate_mask() |
|
|
|
return BatchedStepResult( |
|
|
|
m_vector_obs, m_reward, m_done, m_done, m_agent_id, action_mask |
|
|
|
) |
|
|
|
|
|
|
|
if self.discrete: |
|
|
|
act = self.action[name][0][0] |
|
|
|
delta = 1 if act else -1 |
|
|
|
else: |
|
|
|
delta = self.action[name][0][0] |
|
|
|
delta = clamp(delta, -STEP_SIZE, STEP_SIZE) |
|
|
|
self.position[name] += delta |
|
|
|
self.position[name] = clamp(self.position[name], -1, 1) |
|
|
|
self.step_count[name] += 1 |
|
|
|
done = self.position[name] >= 1.0 or self.position[name] <= -1.0 |
|
|
|
if done: |
|
|
|
reward = SUCCESS_REWARD * self.position[name] * self.goal[name] |
|
|
|
else: |
|
|
|
reward = -TIME_PENALTY |
|
|
|
done = self._take_action(name) |
|
|
|
|
|
|
|
reward = self._compute_reward(name, done) |
|
|
|
|
|
|
|
m_vector_obs = [np.ones((1, OBS_SIZE), dtype=np.float32) * self.goal[name]] |
|
|
|
m_reward = np.array([reward], dtype=np.float32) |
|
|
|
m_done = np.array([done], dtype=np.bool) |
|
|
|
m_agent_id = np.array([0], dtype=np.int32) |
|
|
|
action_mask = self._generate_mask() |
|
|
|
|
|
|
|
self.step_result[name] = self._make_batched_step(name, done, reward) |
|
|
|
self.step_result[name] = BatchedStepResult( |
|
|
|
m_vector_obs, m_reward, m_done, m_done, m_agent_id, action_mask |
|
|
|
) |
|
|
|
|
|
|
|
def _generate_mask(self): |
|
|
|
if self.discrete: |
|
|
|
# LL-Python API will return an empty dim if there is only 1 agent. |
|
|
|
|
|
|
def reset(self) -> None: # type: ignore |
|
|
|
for name in self.names: |
|
|
|
self._reset_agent(name) |
|
|
|
|
|
|
|
m_vector_obs = [np.ones((1, OBS_SIZE), dtype=np.float32) * self.goal[name]] |
|
|
|
m_reward = np.array([0], dtype=np.float32) |
|
|
|
m_done = np.array([False], dtype=np.bool) |
|
|
|
m_agent_id = np.array([0], dtype=np.int32) |
|
|
|
action_mask = self._generate_mask() |
|
|
|
|
|
|
|
self.step_result[name] = BatchedStepResult( |
|
|
|
m_vector_obs, m_reward, m_done, m_done, m_agent_id, action_mask |
|
|
|
) |
|
|
|
self.step_result[name] = self._make_batched_step(name, False, 0.0) |
|
|
|
|
|
|
|
@property |
|
|
|
def reset_parameters(self) -> Dict[str, str]: |
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
class Memory1DEnvironment(Simple1DEnvironment): |
|
|
|
def __init__(self, brain_names, use_discrete, step_size=0.2): |
|
|
|
super().__init__(brain_names, use_discrete, step_size=0.2) |
|
|
|
# Number of steps to reveal the goal for. Lower is harder. Should be |
|
|
|
# less than 1/step_size to force agent to use memory |
|
|
|
self.num_show_steps = 2 |
|
|
|
|
|
|
|
def _make_batched_step( |
|
|
|
self, name: str, done: bool, reward: float |
|
|
|
) -> BatchedStepResult: |
|
|
|
recurrent_obs_val = ( |
|
|
|
self.goal[name] if self.step_count[name] <= self.num_show_steps else 0 |
|
|
|
) |
|
|
|
m_vector_obs = [np.ones((1, OBS_SIZE), dtype=np.float32) * recurrent_obs_val] |
|
|
|
m_reward = np.array([reward], dtype=np.float32) |
|
|
|
m_done = np.array([done], dtype=np.bool) |
|
|
|
m_agent_id = np.array([0], dtype=np.int32) |
|
|
|
action_mask = self._generate_mask() |
|
|
|
return BatchedStepResult( |
|
|
|
m_vector_obs, m_reward, m_done, m_done, m_agent_id, action_mask |
|
|
|
) |