import sys
import json
import logging
from typing import Dict , List , Optional , Set
from typing import Dict , Optional , Set
from mlagents.trainers.env_manager import EnvManager , EnvironmentStep
from mlagents.trainers.env_manager import EnvManager
from mlagents_envs.exception import (
UnityEnvironmentException ,
UnityCommunicationException ,
from mlagents.trainers.trainer import Trainer
from mlagents.trainers.meta_curriculum import MetaCurriculum
from mlagents.trainers.trainer_util import TrainerFactory
from mlagents.trainers.action_info import ActionInfo
from mlagents.trainers.agent_processor import AgentManager , AgentManagerQueue
from mlagents.trainers.agent_processor import AgentManager
class TrainerController ( object ) :
"""
self . trainers : Dict [ str , Trainer ] = { }
self . brain_name_to_identifier : Dict [ str , Set ] = defaultdict ( set )
self . managers : Dict [ str , AgentManager ] = { }
self . trainer_factory = trainer_factory
self . model_path = model_path
self . summaries_dir = summaries_dir
" permissions are set correctly. " . format ( model_path )
)
def _reset_env ( self , env : EnvManager ) - > List [ EnvironmentStep ] :
def _reset_env ( self , env : EnvManager ) - > None :
""" Resets the environment.
Returns :
self . meta_curriculum . get_config ( ) if self . meta_curriculum else { }
)
sampled_reset_param . update ( new_meta_curriculum_config )
return env . reset ( config = sampled_reset_param )
env . reset ( config = sampled_reset_param )
def _should_save_model ( self , global_step : int ) - > bool :
return (
policy = trainer . create_policy ( env_manager . external_brains [ name_behavior_id ] )
trainer . add_policy ( name_behavior_id , policy )
env_manager . set_policy ( name_behavior_id , policy )
self . brain_name_to_identifier [ brain_name ] . add ( name_behavior_id )
agent_manager = AgentManager (
policy ,
name_behavior_id ,
env_manager . set_agent_manager ( name_behavior_id , agent_manager )
env_manager . set_policy ( name_behavior_id , policy )
self . brain_name_to_identifier [ brain_name ] . add ( name_behavior_id )
self . managers [ name_behavior_id ] = agent_manager
def _create_trainers_and_managers (
self , env_manager : EnvManager , behavior_ids : Set [ str ]
global_step = 0
last_brain_behavior_ids : Set [ str ] = set ( )
try :
initial_step = self . _reset_env ( env_manager )
self . _process_step_infos ( initial_step )
self . _reset_env ( env_manager )
while self . _not_done_training ( ) :
external_brain_behavior_ids = set ( env_manager . external_brains . keys ( ) )
new_behavior_ids = external_brain_behavior_ids - last_brain_behavior_ids
def end_trainer_episodes (
self , env : EnvManager , lessons_incremented : Dict [ str , bool ]
) - > None :
reset_step = self . _reset_env ( env )
self . _process_step_infos ( reset_step )
self . _reset_env ( env )
# Reward buffers reset takes place only for curriculum learning
# else no reset.
for trainer in self . trainers . values ( ) :
if meta_curriculum_reset or generalization_reset :
self . end_trainer_episodes ( env , lessons_incremented )
def _get_and_process_experiences ( self , env : EnvManager ) - > int :
with hierarchical_timer ( " env_step " ) :
# Get new policies if found
for brain_name in self . trainers . keys ( ) :
for name_behavior_id in self . brain_name_to_identifier [ brain_name ] :
try :
_policy = self . managers [
name_behavior_id
] . policy_queue . get_nowait ( )
env . set_policy ( name_behavior_id , _policy )
except AgentManagerQueue . Empty :
pass
# Step the environment
new_step_infos = env . step ( )
# Add to AgentProcessor
num_step_infos = self . _process_step_infos ( new_step_infos )
return num_step_infos
def _process_step_infos ( self , step_infos : List [ EnvironmentStep ] ) - > int :
for step_info in step_infos :
for name_behavior_id in step_info . name_behavior_ids :
if name_behavior_id not in self . managers :
self . logger . warning (
" Agent manager was not created for behavior id {}. " . format (
name_behavior_id
)
)
continue
self . managers [ name_behavior_id ] . add_experiences (
step_info . current_all_step_result [ name_behavior_id ] ,
step_info . worker_id ,
step_info . brain_name_to_action_info . get (
name_behavior_id , ActionInfo ( [ ] , [ ] , { } , [ ] )
) ,
)
return len ( step_infos )
num_steps = self . _get_and_process_experiences ( env )
with hierarchical_timer ( " env_step " ) :
num_steps = env . advance ( )
# Report current lesson
if self . meta_curriculum :