|
|
|
|
|
|
from typing import Dict, List, Tuple, Optional |
|
|
|
from mlagents.trainers.settings import ( |
|
|
|
EnvironmentParameterSettings, |
|
|
|
AgentParameterSettings, |
|
|
|
ParameterRandomizationSettings, |
|
|
|
) |
|
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
import torch |
|
|
|
class ActiveLearningTaskManager(EnvironmentParameterManager): |
|
|
|
class ActiveLearningTaskManager: |
|
|
|
settings: Optional[Dict[str, AgentParameterSettings]] = None, |
|
|
|
run_seed: int = -1, |
|
|
|
settings: Optional[Dict[str,AgentParameterSettings]] = None, |
|
|
|
restore: bool = False, |
|
|
|
): |
|
|
|
""" |
|
|
|
|
|
|
:param settings: A dictionary from environment parameter to |
|
|
|
EnvironmentParameterSettings. |
|
|
|
:param run_seed: When the seed is not provided for an environment parameter, |
|
|
|
this seed will be used instead. |
|
|
|
:param restore: If true, the EnvironmentParameterManager will use the |
|
|
|
GlobalTrainingStatus to try and reload the lesson status of each environment |
|
|
|
parameter. |
|
|
|
|
|
|
self._dict_settings = settings |
|
|
|
lows = [] |
|
|
|
highs = [] |
|
|
|
for parameter_name in self._dict_settings.keys(): |
|
|
|
self._dict_settings[parameter_name]. |
|
|
|
|
|
|
|
self._smoothed_values: Dict[str, float] = defaultdict(float) |
|
|
|
for key in self._dict_settings.keys(): |
|
|
|
self._smoothed_values[key] = 0.0 |
|
|
|
# Update the seeds of the samplers |
|
|
|
self._set_sampler_seeds(run_seed) |
|
|
|
|
|
|
|
task_ranges = [] |
|
|
|
self._taskSampler = ActiveLearningTaskSampler(task_ranges) |
|
|
|
|
|
|
|
def _set_sampler_seeds(self, seed): |
|
|
|
""" |
|
|
|
Sets the seeds for the samplers (if no seed was already present). Note that |
|
|
|
using the provided seed. |
|
|
|
""" |
|
|
|
offset = 0 |
|
|
|
for settings in self._dict_settings.values(): |
|
|
|
for lesson in settings.curriculum: |
|
|
|
if lesson.value.seed == -1: |
|
|
|
lesson.value.seed = seed + offset |
|
|
|
offset += 1 |
|
|
|
self.behavior_names = list(self._dict_settings.keys()) |
|
|
|
self.param_names = {name: list(self._dict_settings[name].parameters.keys()) for name in self.behavior_names} |
|
|
|
self._taskSamplers = {} |
|
|
|
for behavior_name in self.behavior_names: |
|
|
|
lows = [] |
|
|
|
highs = [] |
|
|
|
parameters = self._dict_settings[behavior_name].parameters |
|
|
|
for parameter_name in self.param_names[behavior_name]: |
|
|
|
low = parameters[parameter_name].min_value |
|
|
|
high = parameters[parameter_name].max_value |
|
|
|
lows.append(low) |
|
|
|
highs.append(high) |
|
|
|
|
|
|
|
task_ranges = torch.tensor([lows, highs]).float().T |
|
|
|
self._taskSamplers[behavior_name] = ActiveLearningTaskSampler(task_ranges) |
|
|
|
self.t = {name: 0.0 for name in self.behavior_names} |
|
|
|
def get_minimum_reward_buffer_size(self, behavior_name: str) -> int: |
|
|
|
""" |
|
|
|
Calculates the minimum size of the reward buffer a behavior must use. This |
|
|
|
method uses the 'min_lesson_length' sampler_parameter to determine this value. |
|
|
|
:param behavior_name: The name of the behavior the minimum reward buffer |
|
|
|
size corresponds to. |
|
|
|
""" |
|
|
|
result = 1 |
|
|
|
for settings in self._dict_settings.values(): |
|
|
|
for lesson in settings.curriculum: |
|
|
|
if lesson.completion_criteria is not None: |
|
|
|
if lesson.completion_criteria.behavior == behavior_name: |
|
|
|
result = max( |
|
|
|
result, lesson.completion_criteria.min_lesson_length |
|
|
|
) |
|
|
|
return result |
|
|
|
def _make_task(self, behavior_name, tau): |
|
|
|
task = {} |
|
|
|
for i, name in enumerate(self.param_names[behavior_name]): |
|
|
|
task[name] = tau[i] |
|
|
|
return task |
|
|
|
def get_current_samplers(self) -> Dict[str, ParameterRandomizationSettings]: |
|
|
|
""" |
|
|
|
Creates a dictionary from environment parameter name to their corresponding |
|
|
|
ParameterRandomizationSettings. If curriculum is used, the |
|
|
|
ParameterRandomizationSettings corresponds to the sampler of the current lesson. |
|
|
|
""" |
|
|
|
samplers: Dict[str, ParameterRandomizationSettings] = {} |
|
|
|
for param_name, settings in self._dict_settings.items(): |
|
|
|
lesson_num = GlobalTrainingStatus.get_parameter_state( |
|
|
|
param_name, StatusType.LESSON_NUM |
|
|
|
) |
|
|
|
lesson = settings.curriculum[lesson_num] |
|
|
|
samplers[param_name] = lesson.value |
|
|
|
return samplers |
|
|
|
def _build_tau(self, behavior_name, task, time): |
|
|
|
tau = [] |
|
|
|
for name in self.param_names[behavior_name]: |
|
|
|
tau.append(task[name]) |
|
|
|
tau.append(time) |
|
|
|
return torch.tensor(tau).float() |
|
|
|
def get_current_lesson_number(self) -> Dict[str, int]: |
|
|
|
def get_tasks(self, behavior_name, num_samples) -> Dict[str, ParameterRandomizationSettings]: |
|
|
|
Creates a dictionary from environment parameter to the current lesson number. |
|
|
|
If not using curriculum, this number is always 0 for that environment parameter. |
|
|
|
TODO |
|
|
|
result: Dict[str, int] = {} |
|
|
|
for parameter_name in self._dict_settings.keys(): |
|
|
|
result[parameter_name] = GlobalTrainingStatus.get_parameter_state( |
|
|
|
parameter_name, StatusType.LESSON_NUM |
|
|
|
) |
|
|
|
return result |
|
|
|
behavior_name = [bname for bname in self.behavior_names if bname in behavior_name][0] # TODO make work with actual behavior names |
|
|
|
current_time = self.t[behavior_name] + 1 |
|
|
|
|
|
|
|
taus = self._taskSamplers[behavior_name].get_design_points(num_points=num_samples, time=current_time).data.numpy().tolist() |
|
|
|
tasks = [self._make_task(behavior_name, tau) for tau in taus] |
|
|
|
return tasks |
|
|
|
def update_lessons( |
|
|
|
self, |
|
|
|
trainer_steps: Dict[str, int], |
|
|
|
trainer_max_steps: Dict[str, int], |
|
|
|
trainer_reward_buffer: Dict[str, List[float]], |
|
|
|
def update(self, behavior_name: str, task_perfs: List[Tuple[Dict, float]] |
|
|
|
Given progress metrics, calculates if at least one environment parameter is |
|
|
|
in a new lesson and if at least one environment parameter requires the env |
|
|
|
to reset. |
|
|
|
:param trainer_steps: A dictionary from behavior_name to the number of training |
|
|
|
steps this behavior's trainer has performed. |
|
|
|
:param trainer_max_steps: A dictionary from behavior_name to the maximum number |
|
|
|
of training steps this behavior's trainer has performed. |
|
|
|
:param trainer_reward_buffer: A dictionary from behavior_name to the list of |
|
|
|
the most recent episode returns for this behavior's trainer. |
|
|
|
:returns: A tuple of two booleans : (True if any lesson has changed, True if |
|
|
|
environment needs to reset) |
|
|
|
TODO |
|
|
|
|
|
|
|
updated = False |
|
|
|
for param_name, settings in self._dict_settings.items(): |
|
|
|
lesson_num = GlobalTrainingStatus.get_parameter_state( |
|
|
|
param_name, StatusType.LESSON_NUM |
|
|
|
) |
|
|
|
lesson = settings.curriculum[lesson_num] |
|
|
|
if ( |
|
|
|
lesson.completion_criteria is not None |
|
|
|
and len(settings.curriculum) > lesson_num + 1 |
|
|
|
): |
|
|
|
behavior_to_consider = lesson.completion_criteria.behavior |
|
|
|
if behavior_to_consider in trainer_steps: |
|
|
|
must_increment, new_smoothing = lesson.completion_criteria.need_increment( |
|
|
|
float(trainer_steps[behavior_to_consider]) |
|
|
|
/ float(trainer_max_steps[behavior_to_consider]), |
|
|
|
trainer_reward_buffer[behavior_to_consider], |
|
|
|
self._smoothed_values[param_name], |
|
|
|
) |
|
|
|
self._smoothed_values[param_name] = new_smoothing |
|
|
|
if must_increment: |
|
|
|
GlobalTrainingStatus.set_parameter_state( |
|
|
|
param_name, StatusType.LESSON_NUM, lesson_num + 1 |
|
|
|
) |
|
|
|
new_lesson_name = settings.curriculum[lesson_num + 1].name |
|
|
|
logger.info( |
|
|
|
f"Parameter '{param_name}' has changed. Now in lesson '{new_lesson_name}'" |
|
|
|
) |
|
|
|
updated = True |
|
|
|
if lesson.completion_criteria.require_reset: |
|
|
|
must_reset = True |
|
|
|
updated = True |
|
|
|
behavior_name = [bname for bname in self.behavior_names if bname in behavior_name][0] # TODO make work with actual behavior names |
|
|
|
taus = [] |
|
|
|
perfs = [] |
|
|
|
for task, perf in task_perfs: |
|
|
|
perfs.append(perf) |
|
|
|
self.t[behavior_name] = self.t[behavior_name] + 1 |
|
|
|
tau = self._build_tau(behavior_name, task, self.t[behavior_name]) |
|
|
|
taus.append(tau) |
|
|
|
|
|
|
|
X = torch.stack(taus, dim=0) |
|
|
|
Y = torch.tensor(perfs).float() |
|
|
|
self._taskSamplers[behavior_name].update_model(X, Y, refit=True) |
|
|
|
|
|
|
|
return updated, must_reset |