您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
333 行
13 KiB
333 行
13 KiB
import random
|
|
from typing import Dict, List, Any, Tuple
|
|
import numpy as np
|
|
|
|
from mlagents_envs.base_env import (
|
|
BaseEnv,
|
|
BehaviorSpec,
|
|
DecisionSteps,
|
|
TerminalSteps,
|
|
ActionType,
|
|
BehaviorMapping,
|
|
)
|
|
from mlagents_envs.tests.test_rpc_utils import proto_from_steps_and_action
|
|
from mlagents_envs.communicator_objects.agent_info_action_pair_pb2 import (
|
|
AgentInfoActionPairProto,
|
|
)
|
|
|
|
OBS_SIZE = 1
|
|
VIS_OBS_SIZE = (20, 20, 3)
|
|
STEP_SIZE = 0.1
|
|
|
|
TIME_PENALTY = 0.01
|
|
MIN_STEPS = int(1.0 / STEP_SIZE) + 1
|
|
SUCCESS_REWARD = 1.0 + MIN_STEPS * TIME_PENALTY
|
|
EXTRA_OBS_SIZE = 10
|
|
|
|
|
|
def clamp(x, min_val, max_val):
|
|
return max(min_val, min(x, max_val))
|
|
|
|
|
|
class SimpleTransferEnvironment(BaseEnv):
|
|
"""
|
|
Very simple "game" - the agent has a position on [-1, 1], gets a reward of 1 if it reaches 1, and a reward of -1 if
|
|
it reaches -1. The position is incremented by the action amount (clamped to [-step_size, step_size]).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
brain_names,
|
|
use_discrete,
|
|
step_size=STEP_SIZE,
|
|
num_visual=0,
|
|
num_vector=1,
|
|
vis_obs_size=VIS_OBS_SIZE,
|
|
vec_obs_size=OBS_SIZE,
|
|
action_size=1,
|
|
obs_spec_type="normal", # normal: (x,y); rich: (x+y, x-y, x*y); long: (x,y,1,...,1)
|
|
goal_type="hard", # easy: 1 or -1; hard: uniformly random
|
|
extra_obs_size=EXTRA_OBS_SIZE,
|
|
):
|
|
super().__init__()
|
|
self.discrete = use_discrete
|
|
self.num_visual = num_visual
|
|
self.num_vector = num_vector
|
|
self.vis_obs_size = vis_obs_size
|
|
self.vec_obs_size = vec_obs_size
|
|
self.obs_spec_type = obs_spec_type
|
|
self.extra_obs_size = extra_obs_size
|
|
self.goal_type = goal_type
|
|
action_type = ActionType.DISCRETE if use_discrete else ActionType.CONTINUOUS
|
|
self.behavior_spec = BehaviorSpec(
|
|
self._make_obs_spec(),
|
|
action_type,
|
|
tuple(2 for _ in range(action_size)) if use_discrete else action_size,
|
|
)
|
|
self.action_size = action_size
|
|
self.names = brain_names
|
|
self.positions: Dict[str, List[float]] = {}
|
|
self.step_count: Dict[str, float] = {}
|
|
self.random = random.Random(str(self.behavior_spec))
|
|
self.goal: Dict[str, List[float]] = {}
|
|
self.num_steps: Dict[str, int] = {}
|
|
self.horizon: Dict[str, int] = {}
|
|
self.action = {}
|
|
self.rewards: Dict[str, float] = {}
|
|
self.final_rewards: Dict[str, List[float]] = {}
|
|
self.step_result: Dict[str, Tuple[DecisionSteps, TerminalSteps]] = {}
|
|
self.agent_id: Dict[str, int] = {}
|
|
self.step_size = step_size # defines the difficulty of the test
|
|
|
|
|
|
for name in self.names:
|
|
self.agent_id[name] = 0
|
|
if self.goal_type == "easy":
|
|
self.goal[name] = []
|
|
for _ in range(self.num_vector):
|
|
self.goal[name].append(self.random.choice([-1, 1]))
|
|
elif self.goal_type == "hard":
|
|
self.goal[name] = []
|
|
for _ in range(self.num_vector):
|
|
self.goal[name].append(self.random.uniform(-1, 1))
|
|
self.rewards[name] = 0
|
|
self.final_rewards[name] = []
|
|
self._reset_agent(name)
|
|
self.action[name] = None
|
|
self.step_result[name] = None
|
|
self.step_count[name] = 0
|
|
self.horizon[name] = 1000
|
|
print(self.goal)
|
|
|
|
def _make_obs_spec(self) -> List[Any]:
|
|
obs_spec: List[Any] = []
|
|
# goal
|
|
for _ in range(self.num_vector):
|
|
obs_spec.append((self.vec_obs_size,))
|
|
for _ in range(self.num_visual):
|
|
obs_spec.append(self.vis_obs_size)
|
|
# position
|
|
if self.obs_spec_type == "normal":
|
|
for _ in range(self.num_vector):
|
|
obs_spec.append((self.vec_obs_size,))
|
|
# composed position
|
|
if "rich" in self.obs_spec_type:
|
|
for _ in range(self.num_vector + 1):
|
|
obs_spec.append((self.vec_obs_size,))
|
|
if "long" in self.obs_spec_type:
|
|
for _ in range(self.num_vector + self.extra_obs_size):
|
|
obs_spec.append((self.vec_obs_size,))
|
|
print("obs_spec:", obs_spec)
|
|
return obs_spec
|
|
|
|
def _make_obs(self, value: List[float]) -> List[np.ndarray]:
|
|
obs = []
|
|
for i in range(self.num_vector):
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * value[i])
|
|
if self.obs_spec_type == "normal":
|
|
for name in self.names:
|
|
for i in self.positions[name]:
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * i)
|
|
elif self.obs_spec_type == "rich1":
|
|
for name in self.names:
|
|
i = self.positions[name][0]
|
|
j = self.positions[name][1]
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * (i + j))
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * (i - j))
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * (i * j))
|
|
elif self.obs_spec_type == "rich2":
|
|
for name in self.names:
|
|
i = self.positions[name][0]
|
|
j = self.positions[name][1]
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * (i * j))
|
|
obs.append(
|
|
np.ones((1, self.vec_obs_size), dtype=np.float32) * (2 * i + j)
|
|
)
|
|
obs.append(
|
|
np.ones((1, self.vec_obs_size), dtype=np.float32) * (2 * i - j)
|
|
)
|
|
elif self.obs_spec_type == "long":
|
|
for name in self.names:
|
|
for i in self.positions[name]:
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * i)
|
|
for _ in range(self.extra_obs_size):
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32))
|
|
elif self.obs_spec_type == "longpre":
|
|
for name in self.names:
|
|
for _ in range(self.extra_obs_size):
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32))
|
|
for i in self.positions[name]:
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * i)
|
|
elif self.obs_spec_type == "long-n":
|
|
for name in self.names:
|
|
for i in self.positions[name]:
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * i)
|
|
for _ in range(self.extra_obs_size):
|
|
obs.append(np.random.randn(1, self.vec_obs_size))
|
|
elif self.obs_spec_type == "longpre-n":
|
|
for name in self.names:
|
|
for _ in range(self.extra_obs_size):
|
|
obs.append(np.random.randn(1, self.vec_obs_size))
|
|
for i in self.positions[name]:
|
|
obs.append(np.ones((1, self.vec_obs_size), dtype=np.float32) * i)
|
|
for _ in range(self.num_visual):
|
|
obs.append(np.ones((1,) + self.vis_obs_size, dtype=np.float32) * value)
|
|
# print(obs)
|
|
return obs
|
|
|
|
@property
|
|
def behavior_specs(self):
|
|
behavior_dict = {}
|
|
for n in self.names:
|
|
behavior_dict[n] = self.behavior_spec
|
|
return BehaviorMapping(behavior_dict)
|
|
|
|
def set_action_for_agent(self, behavior_name, agent_id, action):
|
|
pass
|
|
|
|
def set_actions(self, behavior_name, action):
|
|
self.action[behavior_name] = action
|
|
|
|
def get_steps(self, behavior_name):
|
|
return self.step_result[behavior_name]
|
|
|
|
def _take_action(self, name: str) -> bool:
|
|
deltas = []
|
|
for _act in self.action[name][0]:
|
|
if self.discrete:
|
|
deltas.append(1 if _act else -1)
|
|
else:
|
|
deltas.append(_act)
|
|
for i, _delta in enumerate(deltas):
|
|
_delta = clamp(_delta, -self.step_size, self.step_size)
|
|
self.positions[name][i] += _delta
|
|
self.positions[name][i] = clamp(self.positions[name][i], -1, 1)
|
|
self.step_count[name] += 1
|
|
# Both must be in 1.0 to be done
|
|
# print(self.positions[name], end="")
|
|
if self.goal_type == "easy":
|
|
done = (
|
|
all(pos >= 1.0 or pos <= -1.0 for pos in self.positions[name])
|
|
or self.step_count[name] >= self.horizon[name]
|
|
)
|
|
elif self.goal_type == "hard":
|
|
# done = self.step_count[name] >= self.horizon[name]
|
|
done = (
|
|
all(
|
|
abs(pos - goal) <= 0.1
|
|
for pos, goal in zip(self.positions[name], self.goal[name])
|
|
)
|
|
or self.step_count[name] >= self.horizon[name]
|
|
)
|
|
# if done:
|
|
# print(self.positions[name], end=" done ")
|
|
return done
|
|
|
|
def _generate_mask(self):
|
|
if self.discrete:
|
|
# LL-Python API will return an empty dim if there is only 1 agent.
|
|
ndmask = np.array(2 * self.action_size * [False], dtype=np.bool)
|
|
ndmask = np.expand_dims(ndmask, axis=0)
|
|
action_mask = [ndmask]
|
|
else:
|
|
action_mask = None
|
|
return action_mask
|
|
|
|
def _compute_reward(self, name: str, done: bool) -> float:
|
|
reward = -TIME_PENALTY
|
|
# for _pos, goal in zip(self.positions[name], self.goal[name]):
|
|
# if abs(_pos - self.goal[name]) < 0.1:
|
|
# reward += SUCCESS_REWARD
|
|
# else:
|
|
# reward -= TIME_PENALTY
|
|
# reward += 2 - abs(_pos - goal) #np.exp(-abs(_pos - goal))
|
|
|
|
# if done:
|
|
# reward = 0#SUCCESS_REWARD
|
|
# # for _pos in self.positions[name]:
|
|
# # if self.goal_type == "easy":
|
|
# # reward += (SUCCESS_REWARD * _pos * self.goal[name]) / len(
|
|
# # self.positions[name]
|
|
# # )
|
|
# # elif self.goal_type == "hard":
|
|
# # reward += np.exp(-abs(_pos - self.goal[name]))
|
|
# else:
|
|
# reward = -TIME_PENALTY
|
|
|
|
return reward
|
|
|
|
def _reset_agent(self, name):
|
|
if self.goal_type == "easy":
|
|
self.goal[name] = []
|
|
for _ in range(self.num_vector):
|
|
self.goal[name].append(self.random.choice([-1, 1]))
|
|
elif self.goal_type == "hard":
|
|
self.goal[name] = []
|
|
for _ in range(self.num_vector):
|
|
self.goal[name].append(self.random.uniform(-1, 1))
|
|
self.positions[name] = [
|
|
self.random.uniform(-1, 1) for _ in range(self.action_size)
|
|
]
|
|
self.step_count[name] = 0
|
|
self.rewards[name] = 0
|
|
self.agent_id[name] = self.agent_id[name] + 1
|
|
# print("new goal:", self.goal[name])
|
|
# print("new pos:", self.positions[name])
|
|
|
|
def _make_batched_step(
|
|
self, name: str, done: bool, reward: float
|
|
) -> Tuple[DecisionSteps, TerminalSteps]:
|
|
m_vector_obs = self._make_obs(self.goal[name])
|
|
m_reward = np.array([reward], dtype=np.float32)
|
|
m_agent_id = np.array([self.agent_id[name]], dtype=np.int32)
|
|
action_mask = self._generate_mask()
|
|
decision_step = DecisionSteps(m_vector_obs, m_reward, m_agent_id, action_mask)
|
|
terminal_step = TerminalSteps.empty(self.behavior_spec)
|
|
if done:
|
|
self.final_rewards[name].append(self.rewards[name])
|
|
self._reset_agent(name)
|
|
new_vector_obs = self._make_obs(self.goal[name])
|
|
(
|
|
new_reward,
|
|
new_done,
|
|
new_agent_id,
|
|
new_action_mask,
|
|
) = self._construct_reset_step(name)
|
|
|
|
decision_step = DecisionSteps(
|
|
new_vector_obs, new_reward, new_agent_id, new_action_mask
|
|
)
|
|
terminal_step = TerminalSteps(
|
|
m_vector_obs, m_reward, np.array([False], dtype=np.bool), m_agent_id
|
|
)
|
|
return (decision_step, terminal_step)
|
|
|
|
def _construct_reset_step(
|
|
self, name: str
|
|
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
|
|
new_reward = np.array([0.0], dtype=np.float32)
|
|
new_done = np.array([False], dtype=np.bool)
|
|
new_agent_id = np.array([self.agent_id[name]], dtype=np.int32)
|
|
new_action_mask = self._generate_mask()
|
|
return new_reward, new_done, new_agent_id, new_action_mask
|
|
|
|
def step(self) -> None:
|
|
assert all(action is not None for action in self.action.values())
|
|
for name in self.names:
|
|
|
|
done = self._take_action(name)
|
|
reward = self._compute_reward(name, done)
|
|
self.rewards[name] += reward
|
|
self.step_result[name] = self._make_batched_step(name, done, reward)
|
|
|
|
def reset(self) -> None: # type: ignore
|
|
for name in self.names:
|
|
self._reset_agent(name)
|
|
self.step_result[name] = self._make_batched_step(name, False, 0.0)
|
|
|
|
@property
|
|
def reset_parameters(self) -> Dict[str, str]:
|
|
return {}
|
|
|
|
def close(self):
|
|
pass
|