浏览代码

initial addition of active learning (incomplete)

/active-variablespeed
Scott Jordan 5 年前
当前提交
d695c044
共有 7 个文件被更改,包括 473 次插入4 次删除
  1. 6
      Project/Assets/ML-Agents/Examples/Walker/Scripts/WalkerAgent.cs
  2. 31
      ml-agents/mlagents/trainers/agent_processor.py
  3. 25
      ml-agents/mlagents/trainers/settings.py
  4. 23
      ml-agents/mlagents/trainers/subprocess_env_manager.py
  5. 8
      ml-agents/mlagents/trainers/trainer_controller.py
  6. 224
      ml-agents/mlagents/trainers/active_learning.py
  7. 160
      ml-agents/mlagents/trainers/active_learning_manager.py

6
Project/Assets/ML-Agents/Examples/Walker/Scripts/WalkerAgent.cs


UpdateOrientationObjects();
//Set our goal walking speed
targetWalkingSpeed =
randomizeWalkSpeedEachEpisode ? Random.Range(0.1f, m_maxWalkingSpeed) : targetWalkingSpeed;
// targetWalkingSpeed =
// randomizeWalkSpeedEachEpisode ? Random.Range(0.1f, m_maxWalkingSpeed) : targetWalkingSpeed;
SetResetParameters();
}

public void SetResetParameters()
{
targetWalkingSpeed = GetParameterWithDefault("target_walkingspeed", 10.0f);
// target_headheight = GetParameterWithDefault("target_height", 0.5497f);
SetTorsoMass();
}
}

31
ml-agents/mlagents/trainers/agent_processor.py


from collections import defaultdict, Counter
import queue
import numpy as np
from mlagents_envs.base_env import (
DecisionSteps,
DecisionStep,

self.policy = policy
self.episode_steps: Counter = Counter()
self.episode_rewards: Dict[str, float] = defaultdict(float)
self.episode_tasks: Dict[str, Dict[str, float]] = {}
self.task_queue: List[Dict[str, float]] = []
self.task_perf_queue: List[Tuple[Dict[str, float],float]] = []
self.task_to_set: Dict[str, List] = defaultdict([])
def add_experiences(
self,

[_gid], take_action_outputs["action"]
)
def _assign_task(self, worker_id:str, global_id: str, local_id: int):
task = self.task_queue.pop(0)
if len(self.task_queue) == 0 # if task queue is empty put a copy of this task on the queue so other agents don't miss out
self.task_queue.append(task)
self.episode_tasks[global_id] = task
self.task_to_set[worker_id].append((local_id, task))
# agent_params = AgentParametersChannel()
# for param, value in task.items():
# self.task_params_channel.set_float_parameter(local_id, param, value)
def _process_step(
self, step: Union[TerminalStep, DecisionStep], global_id: str, index: int
) -> None:

# Add the value outputs if needed
self.experience_buffers[global_id].append(experience)
self.episode_rewards[global_id] += step.reward
if not terminated:
self.episode_steps[global_id] += 1

for traj_queue in self.trajectory_queues:
traj_queue.put(trajectory)
self.experience_buffers[global_id] = []
self.publish_task_performance_queue(self.episode_tasks[global_id], self.episode_rewards[global_id])
if terminated:
# Record episode length.
self.stats_reporter.add_stat(

self._safe_delete(self.last_step_result, global_id)
self._safe_delete(self.episode_steps, global_id)
self._safe_delete(self.episode_rewards, global_id)
self._safe_delete(self.episode_tasks, global_id)
self.policy.remove_previous_action([global_id])
self.policy.remove_memories([global_id])

:param trajectory_queue: Trajectory queue to publish to.
"""
self.trajectory_queues.append(trajectory_queue)
def publish_task_performance_queue(self, task: Dict[str, float], performance: float):
"""
Adds the performance of a given task to the queue to be processed by the task manager
:param task: Dictionary of the mapping of task parameter name to its value
:param performance: scalar value representing the performance (return) of the agent while executing this task
"""
self.task_perf_queue.append((task, performance))
def end_episode(self) -> None:
"""

25
ml-agents/mlagents/trainers/settings.py


name: str
completion_criteria: Optional[CompletionCriteriaSettings] = attr.ib(default=None)
@attr.s(auto_attribs=True)
class AgentParameterSettings:
parameters: Dict[str, UniformSettings]
@staticmethod
def structure(d: Mapping, t: type) -> AgentParameterSettings:
"""
Helper method to structure a Dict of EnvironmentParameterSettings class. Meant
to be registered with cattr.register_structure_hook() and called with
cattr.structure().
"""
if not isinstance(d, Mapping):
raise TrainerConfigError(
f"Unsupported agent environment parameter settings {d}."
)
d_final: Dict[str, ] = {}
for agent_parameter, agent_parameter_config in d.items():
sampler = ParameterRandomizationSettings.structure(
agent_parameter_config, ParameterRandomizationSettings
)
d_final[agent_parameter] = sampler
print(agent_parameter)
settings = AgentParameterSettings(parameters=d_final)
return settings
@attr.s(auto_attribs=True)
class EnvironmentParameterSettings:

23
ml-agents/mlagents/trainers/subprocess_env_manager.py


from mlagents_envs.side_channel.environment_parameters_channel import (
EnvironmentParametersChannel,
)
from mlagents_envs.side_channel.agent_parameters_channel import AgentParametersChannel
from mlagents_envs.side_channel.engine_configuration_channel import (
EngineConfigurationChannel,
EngineConfig,

RESET = 4
CLOSE = 5
ENV_EXITED = 6
AGENT_PARAMETERS = 7
class EnvironmentRequest(NamedTuple):

[int, List[SideChannel]], UnityEnvironment
] = cloudpickle.loads(pickled_env_factory)
env_parameters = EnvironmentParametersChannel()
agent_parameters = AgentParametersChannel()
engine_configuration_channel = EngineConfigurationChannel()
engine_configuration_channel.set_configuration(engine_configuration)
stats_channel = StatsSideChannel()

try:
env = env_factory(
worker_id, [env_parameters, engine_configuration_channel, stats_channel]
worker_id, [env_parameters, agent_parameters, engine_configuration_channel, stats_channel]
)
while True:
req: EnvironmentRequest = parent_conn.recv()

for k, v in req.payload.items():
if isinstance(v, ParameterRandomizationSettings):
v.apply(k, env_parameters)
elif req.cmd == EnvironmentCommand.AGENT_PARAMETERS:
to_assign = req.payload
if isinstance(to_assign, List):
for local_id, task in to_assign:
for param, value in task.items():
agent_parameters.set_float_parameter(local_id, param, value)
elif req.cmd == EnvironmentCommand.RESET:
env.reset()
all_step_result = _generate_all_results()

"""
for ew in self.env_workers:
ew.send(EnvironmentCommand.ENVIRONMENT_PARAMETERS, config)
def set_agent_parameters(self) -> None:
"""
Sends environment parameter settings to C# via the
AgentParametersSidehannel for each worker.
:param config: Dict of environment parameter keys and values
"""
for worker_id, ew in enumerate(self.env_workers):
for brain_name in self.agent_managers.keys():
tasks = self.agent_managers[brain_name].task_to_set[worker_id]
ew.send(EnvironmentCommand.AGENT_PARAMETERS, tasks)
self.agent_managers[brain_name].task_to_set[worker_id].empty()
@property
def training_behaviors(self) -> Dict[BehaviorName, BehaviorSpec]:

8
ml-agents/mlagents/trainers/trainer_controller.py


A Data structure corresponding to the initial reset state of the
environment.
"""
new_config = self.param_manager.get_current_samplers()
new_config = self.param_manager.get_current_samplers() # TODO add parameter sample
env_manager.reset(config=new_config)
# Register any new behavior ids that were generated on the reset.
self._register_new_behaviors(env_manager, env_manager.first_step_infos)

reward_buff = {k: list(t.reward_buffer) for (k, t) in self.trainers.items()}
curr_step = {k: int(t.step) for (k, t) in self.trainers.items()}
max_step = {k: int(t.get_max_steps) for (k, t) in self.trainers.items()}
task_perf = {}
for k, v in env.agent_managers.items():
perfs = v.task_perf_queue
v.task_perf_queue.empty()
task_perf[k] = perfs
# Attempt to increment the lessons of the brains who
# were ready.
updated, param_must_reset = self.param_manager.update_lessons(

224
ml-agents/mlagents/trainers/active_learning.py


import torch
from torch import Tensor
from botorch import settings
from botorch.acquisition.monte_carlo import MCAcquisitionFunction
from botorch.acquisition.objective import ScalarizedObjective, IdentityMCObjective
from botorch.models.gpytorch import GPyTorchModel
from botorch.models.model import Model
from botorch.models import SingleTaskGP
from botorch.sampling.samplers import MCSampler, SobolQMCNormalSampler
from botorch.utils.transforms import concatenate_pending_points, t_batch_mode_transform
from botorch.fit import fit_gpytorch_model
from botorch.optim import optimize_acqf_cyclic, optimize_acqf
from botorch.optim.initializers import initialize_q_batch_nonneg
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.distributions import MultivariateNormal
from gpytorch.means import ConstantMean
from gpytorch.models import ExactGP
from gpytorch.mlls import ExactMarginalLogLikelihood
from gpytorch.kernels import ScaleKernel, RBFKernel, Kernel, ProductKernel, AdditiveKernel, GridInterpolationKernel, AdditiveStructureKernel, ProductStructureKernel
from gpytorch.utils.grid import choose_grid_size
from typing import Optional, Union
class qEISP(MCAcquisitionFunction):
def __init__(
self,
model: Model,
beta: Union[float, Tensor],
mc_points: Tensor,
sampler: Optional[MCSampler] = None,
objective: Optional[ScalarizedObjective] = None,
X_pending: Optional[Tensor] = None,
maximize: bool = True,
) -> None:
r"""q-Espected Improvement of Skill Performance.
Args:
model: A fitted model.
beta: value to trade off between upper confidence bound and mean of fantasized performance.
mc_points: A `batch_shape x N x d` tensor of points to use for
MC-integrating the posterior variance. Usually, these are qMC
samples on the whole design space, but biased sampling directly
allows weighted integration of the posterior variance.
sampler: The sampler used for drawing fantasy samples. In the basic setting
of a standard GP (default) this is a dummy, since the variance of the
model after conditioning does not actually depend on the sampled values.
objective: A ScalarizedObjective. Required for multi-output models.
X_pending: A `n' x d`-dim Tensor of `n'` design points that have
points that have been submitted for function evaluation but
have not yet been evaluated.
maximize: If true uses the UCB of performance scaled by beta, else it uses LCB
Docstring from BOTorch class and same with comments below
"""
super().__init__(model=model, objective=objective)
if sampler is None:
# If no sampler is provided, we use the following dummy sampler for the
# fantasize() method in forward. IMPORTANT: This assumes that the posterior
# variance does not depend on the samples y (only on x), which is true for
# standard GP models, but not in general (e.g. for other likelihoods or
# heteroskedastic GPs using a separate noise model fit on data).
sampler = SobolQMCNormalSampler(
num_samples=1, resample=False, collapse_batch_dims=True
)
if not torch.is_tensor(beta):
beta = torch.tensor(beta)
self.register_buffer("beta", beta)
self.sampler = sampler
self.X_pending = X_pending
self.register_buffer("mc_points", mc_points)
self.maximize = maximize
@concatenate_pending_points
@t_batch_mode_transform()
def forward(self, X: Tensor) -> Tensor:
self.beta = self.beta.to(X)
with settings.propagate_grads(True):
posterior = self.model.posterior(X=X)
batch_shape = X.shape[:-2]
mean = posterior.mean.view(*batch_shape, X.shape[-2], -1)
variance = posterior.variance.view(*batch_shape, X.shape[-2], -1)
delta = self.beta.expand_as(mean) * variance.sqrt()
if self.maximize:
Yhat = mean + delta
else:
Yhat = mean - delta
bdims = tuple(1 for _ in X.shape[:-2])
if self.model.num_outputs > 1:
# We use q=1 here b/c ScalarizedObjective currently does not fully exploit
# lazy tensor operations and thus may be slow / overly memory-hungry.
# TODO (T52818288): Properly use lazy tensors in scalarize_posterior
mc_points = self.mc_points.view(-1, *bdims, 1, X.size(-1))
else:
# While we only need marginal variances, we can evaluate for q>1
# b/c for GPyTorch models lazy evaluation can make this quite a bit
# faster than evaluting in t-batch mode with q-batch size of 1
mc_points = self.mc_points.view(*bdims, -1, X.size(-1))
Yhat = Yhat.view(*batch_shape, X.shape[-2], -1)
fantasy_model = self.model.condition_on_observations(X=X, Y=Yhat)
posterior1 = self.model.posterior(mc_points)
posterior2 = fantasy_model.posterior(mc_points)
# transform with the scalarized objective
posterior1 = self.objective(posterior1.mean)
posterior2 = self.objective(posterior2.mean)
improvement = posterior2 - posterior1
return improvement.mean(dim=-1)
class StandardActiveLearningGP(ExactGP, GPyTorchModel):
_num_outputs = 1 # to inform GPyTorchModel API
def __init__(self, train_X, train_Y, bounds=None):
# squeeze output dim before passing train_Y to ExactGP
super(StandardActiveLearningGP, self).__init__(train_X, train_Y.squeeze(-1), GaussianLikelihood())
self.mean_module = ConstantMean()
xdims = train_X.shape[-1]
self.Kspatial = ScaleKernel(RBFKernel(active_dims=torch.tensor(list(range(xdims-1)))))
self.Ktime = ScaleKernel(RBFKernel(active_dims=torch.tensor([xdims-1])))
# Kspatial = ScaleKernel(RBFKernel())
# Ktime = ScaleKernel(RBFKernel())
# self.covar_module = ScaleKernel(RBFKernel()) # AdditiveKernel(Kspatial, ProductKernel(Kspatial, Ktime))
self.covar_module = AdditiveKernel(self.Kspatial, ProductKernel(self.Kspatial, self.Ktime))
self.to(train_X) # make sure we're on the right device/dtype
def forward(self, x):
mean_x = self.mean_module(x)
covar_x = self.covar_module(x)
return MultivariateNormal(mean_x, covar_x)
class ActiveLearningTaskSampler(object):
def __init__(self,ranges):
self.ranges = ranges
self.xdim = ranges.shape[0] + 1
self.model = None
self.mll = None
self.Xdata = None
self.Ydata = None
self.bounds = torch.tensor(ranges)
self.bounds = torch.cat([self.bounds, torch.tensor([[0.0,1.0]])]).T
def update_model(self, new_X, new_Y, refit=False):
if self.model is not None:
new_X = new_X.to(self.X)
new_Y = new_Y.to(self.X)
self.X = torch.cat([self.X, new_X.to(self.X)])
self.Y = torch.cat([self.Y, new_Y.to(self.X)])
state_dict = self.model.state_dict()
else:
self.X = new_X.float()
self.Y = new_Y.float()
state_dict = None
T = 12*50
if self.X.shape[0] >= T:
self.X = self.X[-T:, :]
self.Y = self.Y[-T:, :]
if refit:
model = StandardActiveLearningGP(self.X, self.Y, bounds=self.bounds)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
self.model = model
self.mll = mll
if state_dict is not None:
self.model.load_state_dict(state_dict)
fit_gpytorch_model(mll)
else:
self.model.set_train_data(self.X, self.Y)
# self.model = self.model.condition_on_observations(new_X, new_Y)
def get_design_points(self, num_points:int=1, time=None):
if not self.model or time < 30:
return sample_random_points(self.bounds, num_points)
if not time:
time = self.X[:, -1].max() + 1
bounds = self.bounds
bounds[:, -1] = time
num_mc = 500
mc_points = torch.rand(num_mc, bounds.size(1), device=self.X.device, dtype=self.X.dtype)
mc_points = bounds[0] + (bounds[1] - bounds[0]) * mc_points
qeisp = qEISP(self.model, mc_points=mc_points, beta=1.96)
try:
candidates, acq_value = optimize_acqf(
acq_function=qeisp,
bounds=bounds,
raw_samples=128,
q=num_points,
num_restarts=1,
return_best_only=True,
)
return candidates
except:
return sample_random_points(self.bounds, num_points)
def sample_random_points(bounds, num_points):
points = torch.rand(num_points, bounds.size(1), device=bounds.device, dtype=bounds.dtype)
points = bounds[0] + (bounds[1] - bounds[0]) * points
return points

160
ml-agents/mlagents/trainers/active_learning_manager.py


from typing import Dict, List, Tuple, Optional
from mlagents.trainers.settings import (
EnvironmentParameterSettings,
ParameterRandomizationSettings,
)
from collections import defaultdict
from mlagents.trainers.training_status import GlobalTrainingStatus, StatusType
from mlagents_envs.logging_util import get_logger
from mlagents.trainers.environment_parameter_manager import EnvironmentParameterManager
from mlagents.trainers.active_learning import ActiveLearningTaskSampler
logger = get_logger(__name__)
class ActiveLearningTaskManager(EnvironmentParameterManager):
def __init__(
self,
settings: Optional[Dict[str, AgentParameterSettings]] = None,
run_seed: int = -1,
restore: bool = False,
):
"""
EnvironmentParameterManager manages all the environment parameters of a training
session. It determines when parameters should change and gives access to the
current sampler of each parameter.
:param settings: A dictionary from environment parameter to
EnvironmentParameterSettings.
:param run_seed: When the seed is not provided for an environment parameter,
this seed will be used instead.
:param restore: If true, the EnvironmentParameterManager will use the
GlobalTrainingStatus to try and reload the lesson status of each environment
parameter.
"""
if settings is None:
settings = {}
self._dict_settings = settings
lows = []
highs = []
for parameter_name in self._dict_settings.keys():
self._dict_settings[parameter_name].
self._smoothed_values: Dict[str, float] = defaultdict(float)
for key in self._dict_settings.keys():
self._smoothed_values[key] = 0.0
# Update the seeds of the samplers
self._set_sampler_seeds(run_seed)
task_ranges = []
self._taskSampler = ActiveLearningTaskSampler(task_ranges)
def _set_sampler_seeds(self, seed):
"""
Sets the seeds for the samplers (if no seed was already present). Note that
using the provided seed.
"""
offset = 0
for settings in self._dict_settings.values():
for lesson in settings.curriculum:
if lesson.value.seed == -1:
lesson.value.seed = seed + offset
offset += 1
def get_minimum_reward_buffer_size(self, behavior_name: str) -> int:
"""
Calculates the minimum size of the reward buffer a behavior must use. This
method uses the 'min_lesson_length' sampler_parameter to determine this value.
:param behavior_name: The name of the behavior the minimum reward buffer
size corresponds to.
"""
result = 1
for settings in self._dict_settings.values():
for lesson in settings.curriculum:
if lesson.completion_criteria is not None:
if lesson.completion_criteria.behavior == behavior_name:
result = max(
result, lesson.completion_criteria.min_lesson_length
)
return result
def get_current_samplers(self) -> Dict[str, ParameterRandomizationSettings]:
"""
Creates a dictionary from environment parameter name to their corresponding
ParameterRandomizationSettings. If curriculum is used, the
ParameterRandomizationSettings corresponds to the sampler of the current lesson.
"""
samplers: Dict[str, ParameterRandomizationSettings] = {}
for param_name, settings in self._dict_settings.items():
lesson_num = GlobalTrainingStatus.get_parameter_state(
param_name, StatusType.LESSON_NUM
)
lesson = settings.curriculum[lesson_num]
samplers[param_name] = lesson.value
return samplers
def get_current_lesson_number(self) -> Dict[str, int]:
"""
Creates a dictionary from environment parameter to the current lesson number.
If not using curriculum, this number is always 0 for that environment parameter.
"""
result: Dict[str, int] = {}
for parameter_name in self._dict_settings.keys():
result[parameter_name] = GlobalTrainingStatus.get_parameter_state(
parameter_name, StatusType.LESSON_NUM
)
return result
def update_lessons(
self,
trainer_steps: Dict[str, int],
trainer_max_steps: Dict[str, int],
trainer_reward_buffer: Dict[str, List[float]],
) -> Tuple[bool, bool]:
"""
Given progress metrics, calculates if at least one environment parameter is
in a new lesson and if at least one environment parameter requires the env
to reset.
:param trainer_steps: A dictionary from behavior_name to the number of training
steps this behavior's trainer has performed.
:param trainer_max_steps: A dictionary from behavior_name to the maximum number
of training steps this behavior's trainer has performed.
:param trainer_reward_buffer: A dictionary from behavior_name to the list of
the most recent episode returns for this behavior's trainer.
:returns: A tuple of two booleans : (True if any lesson has changed, True if
environment needs to reset)
"""
must_reset = False
updated = False
for param_name, settings in self._dict_settings.items():
lesson_num = GlobalTrainingStatus.get_parameter_state(
param_name, StatusType.LESSON_NUM
)
lesson = settings.curriculum[lesson_num]
if (
lesson.completion_criteria is not None
and len(settings.curriculum) > lesson_num + 1
):
behavior_to_consider = lesson.completion_criteria.behavior
if behavior_to_consider in trainer_steps:
must_increment, new_smoothing = lesson.completion_criteria.need_increment(
float(trainer_steps[behavior_to_consider])
/ float(trainer_max_steps[behavior_to_consider]),
trainer_reward_buffer[behavior_to_consider],
self._smoothed_values[param_name],
)
self._smoothed_values[param_name] = new_smoothing
if must_increment:
GlobalTrainingStatus.set_parameter_state(
param_name, StatusType.LESSON_NUM, lesson_num + 1
)
new_lesson_name = settings.curriculum[lesson_num + 1].name
logger.info(
f"Parameter '{param_name}' has changed. Now in lesson '{new_lesson_name}'"
)
updated = True
if lesson.completion_criteria.require_reset:
must_reset = True
return updated, must_reset
正在加载...
取消
保存