|
|
|
|
|
|
vis_obs_size=VIS_OBS_SIZE, |
|
|
|
vec_obs_size=OBS_SIZE, |
|
|
|
action_size=1, |
|
|
|
obs_spec_type="normal" # normal: (x,y); rich: (x+y, x-y, x*y) |
|
|
|
obs_spec_type="normal", # normal: (x,y); rich: (x+y, x-y, x*y) |
|
|
|
goal_type="hard", # easy: 1 or -1; hard: uniformly random |
|
|
|
): |
|
|
|
super().__init__() |
|
|
|
self.discrete = use_discrete |
|
|
|
|
|
|
self.vec_obs_size = vec_obs_size |
|
|
|
self.obs_spec_type = obs_spec_type |
|
|
|
self.goal_type = goal_type |
|
|
|
action_type = ActionType.DISCRETE if use_discrete else ActionType.CONTINUOUS |
|
|
|
self.behavior_spec = BehaviorSpec( |
|
|
|
self._make_obs_spec(), |
|
|
|
|
|
|
self.positions: Dict[str, List[float]] = {} |
|
|
|
self.step_count: Dict[str, float] = {} |
|
|
|
self.random = random.Random(str(self.behavior_spec)) |
|
|
|
self.goal: Dict[str, int] = {} |
|
|
|
self.goal: Dict[str, List[float]] = {} |
|
|
|
self.num_steps: Dict[str, int] = {} |
|
|
|
self.horizon: Dict[str, int] = {} |
|
|
|
self.action = {} |
|
|
|
self.rewards: Dict[str, float] = {} |
|
|
|
self.final_rewards: Dict[str, List[float]] = {} |
|
|
|
|
|
|
|
|
|
|
for name in self.names: |
|
|
|
self.agent_id[name] = 0 |
|
|
|
self.goal[name] = self.random.choice([-1, 1]) |
|
|
|
if self.goal_type == "easy": |
|
|
|
self.goal[name] = self.random.choice([-1, 1]) |
|
|
|
elif self.goal_type == "hard": |
|
|
|
self.goal[name] = [] |
|
|
|
for _ in range(self.num_vector): |
|
|
|
self.goal[name].append(self.random.uniform(-1,1)) |
|
|
|
self.step_count[name] = 0 |
|
|
|
self.horizon[name] = 5000 |
|
|
|
print(self.goal) |
|
|
|
|
|
|
|
def _make_obs_spec(self) -> List[Any]: |
|
|
|
obs_spec: List[Any] = [] |
|
|
|
|
|
|
for _ in range(self.num_vector): |
|
|
|
obs_spec.append((self.vec_obs_size,)) |
|
|
|
# composed position |
|
|
|
if self.obs_spec_type == "rich": |
|
|
|
if "rich" in self.obs_spec_type: |
|
|
|
def _make_obs(self, value: float) -> List[np.ndarray]: |
|
|
|
def _make_obs(self, value: List[float]) -> List[np.ndarray]: |
|
|
|
for _ in range(self.num_vector): |
|
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * value) |
|
|
|
for i in range(self.num_vector): |
|
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * value[i]) |
|
|
|
elif self.obs_spec_type == "rich": |
|
|
|
elif self.obs_spec_type == "rich1": |
|
|
|
for name in self.names: |
|
|
|
i = self.positions[name][0] |
|
|
|
j = self.positions[name][1] |
|
|
|
|
|
|
elif self.obs_spec_type == "rich2": |
|
|
|
for name in self.names: |
|
|
|
i = self.positions[name][0] |
|
|
|
j = self.positions[name][1] |
|
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * (i*j)) |
|
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * (2*i+j)) |
|
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * (2*i-j)) |
|
|
|
for _ in range(self.num_visual): |
|
|
|
obs.append(np.ones((1,) + self.vis_obs_size, dtype=np.float32) * value) |
|
|
|
return obs |
|
|
|
|
|
|
self.positions[name][i] = clamp(self.positions[name][i], -1, 1) |
|
|
|
self.step_count[name] += 1 |
|
|
|
# Both must be in 1.0 to be done |
|
|
|
done = all(pos >= 1.0 or pos <= -1.0 for pos in self.positions[name]) |
|
|
|
# print(self.positions) |
|
|
|
# print(self.positions[name], end="") |
|
|
|
if self.goal_type == "easy": |
|
|
|
done = all(pos >= 1.0 or pos <= -1.0 for pos in self.positions[name]) or self.step_count[name] >= self.horizon[name] |
|
|
|
elif self.goal_type == "hard": |
|
|
|
# done = self.step_count[name] >= self.horizon[name] |
|
|
|
done = all(abs(pos-goal) <= 0.1 for pos, goal in zip(self.positions[name], self.goal[name])) \ |
|
|
|
or self.step_count[name] >= self.horizon[name] |
|
|
|
# if done: |
|
|
|
# print(self.positions[name], end=" done ") |
|
|
|
return done |
|
|
|
|
|
|
|
def _generate_mask(self): |
|
|
|
|
|
|
return action_mask |
|
|
|
|
|
|
|
def _compute_reward(self, name: str, done: bool) -> float: |
|
|
|
# reward = 0.0 |
|
|
|
# for _pos in self.positions[name]: |
|
|
|
# if abs(_pos - self.goal[name]) < 0.1: |
|
|
|
# reward += SUCCESS_REWARD |
|
|
|
# else: |
|
|
|
# reward -= TIME_PENALTY |
|
|
|
# reward += np.exp(-abs(_pos - self.goal[name])) |
|
|
|
|
|
|
|
reward = 0.0 |
|
|
|
for _pos in self.positions[name]: |
|
|
|
reward += (SUCCESS_REWARD * _pos * self.goal[name]) / len( |
|
|
|
self.positions[name] |
|
|
|
) |
|
|
|
reward = SUCCESS_REWARD |
|
|
|
# for _pos in self.positions[name]: |
|
|
|
# if self.goal_type == "easy": |
|
|
|
# reward += (SUCCESS_REWARD * _pos * self.goal[name]) / len( |
|
|
|
# self.positions[name] |
|
|
|
# ) |
|
|
|
# elif self.goal_type == "hard": |
|
|
|
# reward += np.exp(-abs(_pos - self.goal[name])) |
|
|
|
|
|
|
|
self.goal[name] = self.random.choice([-1, 1]) |
|
|
|
if self.goal_type == "easy": |
|
|
|
self.goal[name] = self.random.choice([-1, 1]) |
|
|
|
elif self.goal_type == "hard": |
|
|
|
self.goal[name] = [] |
|
|
|
for _ in range(self.num_vector): |
|
|
|
self.goal[name].append(self.random.uniform(-1,1)) |
|
|
|
# print("new goal:", self.goal[name]) |
|
|
|
|
|
|
|
def _make_batched_step( |
|
|
|
self, name: str, done: bool, reward: float |
|
|
|