您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
166 行
5.6 KiB
166 行
5.6 KiB
import random
|
|
from typing import Dict, List
|
|
import numpy as np
|
|
|
|
from mlagents_envs.base_env import (
|
|
BaseEnv,
|
|
AgentGroupSpec,
|
|
BatchedStepResult,
|
|
ActionType,
|
|
)
|
|
from mlagents_envs.rpc_utils import proto_from_batched_step_result
|
|
|
|
OBS_SIZE = 1
|
|
STEP_SIZE = 0.1
|
|
|
|
TIME_PENALTY = 0.001
|
|
MIN_STEPS = int(1.0 / STEP_SIZE) + 1
|
|
SUCCESS_REWARD = 1.0 + MIN_STEPS * TIME_PENALTY
|
|
|
|
|
|
def clamp(x, min_val, max_val):
|
|
return max(min_val, min(x, max_val))
|
|
|
|
|
|
class Simple1DEnvironment(BaseEnv):
|
|
"""
|
|
Very simple "game" - the agent has a position on [-1, 1], gets a reward of 1 if it reaches 1, and a reward of -1 if
|
|
it reaches -1. The position is incremented by the action amount (clamped to [-step_size, step_size]).
|
|
"""
|
|
|
|
def __init__(self, brain_names, use_discrete, step_size=STEP_SIZE):
|
|
super().__init__()
|
|
self.discrete = use_discrete
|
|
action_type = ActionType.DISCRETE if use_discrete else ActionType.CONTINUOUS
|
|
self.group_spec = AgentGroupSpec(
|
|
[(OBS_SIZE,)], action_type, (2,) if use_discrete else 1
|
|
)
|
|
self.names = brain_names
|
|
self.position: Dict[str, float] = {}
|
|
self.step_count: Dict[str, float] = {}
|
|
self.random = random.Random(str(self.group_spec))
|
|
self.goal: Dict[str, int] = {}
|
|
self.action = {}
|
|
self.rewards: Dict[str, float] = {}
|
|
self.final_rewards: Dict[str, List[float]] = {}
|
|
self.step_result: Dict[str, BatchedStepResult] = {}
|
|
self.step_size = step_size # defines the difficulty of the test
|
|
|
|
for name in self.names:
|
|
self.goal[name] = self.random.choice([-1, 1])
|
|
self.rewards[name] = 0
|
|
self.final_rewards[name] = []
|
|
self._reset_agent(name)
|
|
self.action[name] = None
|
|
self.step_result[name] = None
|
|
|
|
def get_agent_groups(self):
|
|
return self.names
|
|
|
|
def get_agent_group_spec(self, name):
|
|
return self.group_spec
|
|
|
|
def set_action_for_agent(self, name, id, data):
|
|
pass
|
|
|
|
def set_actions(self, name, data):
|
|
self.action[name] = data
|
|
|
|
def get_step_result(self, name):
|
|
return self.step_result[name]
|
|
|
|
def _take_action(self, name: str) -> bool:
|
|
if self.discrete:
|
|
act = self.action[name][0][0]
|
|
delta = 1 if act else -1
|
|
else:
|
|
delta = self.action[name][0][0]
|
|
delta = clamp(delta, -self.step_size, self.step_size)
|
|
self.position[name] += delta
|
|
self.position[name] = clamp(self.position[name], -1, 1)
|
|
self.step_count[name] += 1
|
|
done = self.position[name] >= 1.0 or self.position[name] <= -1.0
|
|
return done
|
|
|
|
def _compute_reward(self, name: str, done: bool) -> float:
|
|
if done:
|
|
reward = SUCCESS_REWARD * self.position[name] * self.goal[name]
|
|
else:
|
|
reward = -TIME_PENALTY
|
|
return reward
|
|
|
|
def _make_batched_step(
|
|
self, name: str, done: bool, reward: float
|
|
) -> BatchedStepResult:
|
|
m_vector_obs = [np.ones((1, OBS_SIZE), dtype=np.float32) * self.goal[name]]
|
|
m_reward = np.array([reward], dtype=np.float32)
|
|
m_done = np.array([done], dtype=np.bool)
|
|
m_agent_id = np.array([0], dtype=np.int32)
|
|
action_mask = self._generate_mask()
|
|
return BatchedStepResult(
|
|
m_vector_obs, m_reward, m_done, m_done, m_agent_id, action_mask
|
|
)
|
|
|
|
def step(self) -> None:
|
|
assert all(action is not None for action in self.action.values())
|
|
|
|
for name in self.names:
|
|
done = self._take_action(name)
|
|
|
|
reward = self._compute_reward(name, done)
|
|
self.rewards[name] += reward
|
|
self.step_result[name] = self._make_batched_step(name, done, reward)
|
|
if done:
|
|
self._reset_agent(name)
|
|
|
|
def _generate_mask(self):
|
|
if self.discrete:
|
|
# LL-Python API will return an empty dim if there is only 1 agent.
|
|
ndmask = np.array(2 * [False], dtype=np.bool)
|
|
ndmask = np.expand_dims(ndmask, axis=0)
|
|
action_mask = [ndmask]
|
|
else:
|
|
action_mask = None
|
|
return action_mask
|
|
|
|
def _reset_agent(self, name):
|
|
self.goal[name] = self.random.choice([-1, 1])
|
|
self.position[name] = 0.0
|
|
self.step_count[name] = 0
|
|
self.final_rewards[name].append(self.rewards[name])
|
|
self.rewards[name] = 0
|
|
|
|
def reset(self) -> None: # type: ignore
|
|
for name in self.names:
|
|
self._reset_agent(name)
|
|
self.step_result[name] = self._make_batched_step(name, False, 0.0)
|
|
|
|
@property
|
|
def reset_parameters(self) -> Dict[str, str]:
|
|
return {}
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
class Memory1DEnvironment(Simple1DEnvironment):
|
|
def __init__(self, brain_names, use_discrete, step_size=0.2):
|
|
super().__init__(brain_names, use_discrete, step_size=0.2)
|
|
# Number of steps to reveal the goal for. Lower is harder. Should be
|
|
# less than 1/step_size to force agent to use memory
|
|
self.num_show_steps = 2
|
|
|
|
def _make_batched_step(
|
|
self, name: str, done: bool, reward: float
|
|
) -> BatchedStepResult:
|
|
recurrent_obs_val = (
|
|
self.goal[name] if self.step_count[name] <= self.num_show_steps else 0
|
|
)
|
|
m_vector_obs = [np.ones((1, OBS_SIZE), dtype=np.float32) * recurrent_obs_val]
|
|
m_reward = np.array([reward], dtype=np.float32)
|
|
m_done = np.array([done], dtype=np.bool)
|
|
m_agent_id = np.array([0], dtype=np.int32)
|
|
action_mask = self._generate_mask()
|
|
return BatchedStepResult(
|
|
m_vector_obs, m_reward, m_done, m_done, m_agent_id, action_mask
|
|
)
|