import logging
import argparse
from multiprocessing import Process , Queue
import os
import glob
import shutil
import mlagents_envs
from mlagents import tf_utils
from mlagents.trainers.trainer_controller import TrainerController
from mlagents.trainers.exception import TrainerError
from mlagents.trainers.meta_curriculum import MetaCurriculum
from mlagents.trainers.trainer_util import load_config , TrainerFactory
from mlagents.trainers.stats import TensorboardWriter , CSVWriter , StatsReporter
class CommandLineOptions ( NamedTuple ) :
debug : bool
num_runs : int
seed : int
env_path : str
run_id : str
help = " The directory name for model and summary statistics " ,
)
parser . add_argument (
" --num-runs " , default = 1 , type = int , help = " Number of concurrent training sessions "
)
parser . add_argument (
" --save-freq " , default = 50000 , type = int , help = " Frequency at which to save model "
)
parser . add_argument (
return CommandLineOptions . from_argparse ( args )
def run_training (
sub_id : int , run_seed : int , options : CommandLineOptions , process_queue : Queue
) - > None :
def run_training ( run_seed : int , options : CommandLineOptions ) - > None :
: param process_queue : Queue used to send signal back to main .
: param sub_id : Unique id for training session .
: param options : parsed command line arguments
: param run_seed : Random seed used for training .
: param run_options : Command line arguments for training .
curriculum_folder = options . curriculum_folder
# Recognize and use docker volume if one is passed as an argument
if not options . docker_target_name :
model_path = " ./models/{run_id}-{sub_id} " . format (
run_id = options . run_id , sub_id = sub_id
)
model_path = f " ./models/{options.run_id} "
trainer_config_path = " /{docker_target_name}/{trainer_config_path} " . format (
docker_target_name = options . docker_target_name ,
trainer_config_path = trainer_config_path ,
)
trainer_config_path = f " /{options.docker_target_name}/{trainer_config_path} "
curriculum_folder = " /{docker_target_name}/{curriculum_folder} " . format (
docker_target_name = options . docker_target_name ,
curriculum_folder = curriculum_folder ,
)
model_path = " /{docker_target_name}/models/{run_id}-{sub_id} " . format (
docker_target_name = options . docker_target_name ,
run_id = options . run_id ,
sub_id = sub_id ,
)
summaries_dir = " /{docker_target_name}/summaries " . format (
docker_target_name = options . docker_target_name
)
curriculum_folder = f " /{options.docker_target_name}/{curriculum_folder} "
model_path = f " /{options.docker_target_name}/models/{options.run_id} "
summaries_dir = f " /{options.docker_target_name}/summaries "
port = options . base_port + ( sub_id * options . num_envs )
port = options . base_port
# Configure CSV, Tensorboard Writers and StatsReporter
# We assume reward and episode length are needed in the CSV.
trainer_factory ,
model_path ,
summaries_dir ,
options . run_id + " - " + str ( sub_id ) ,
options . run_id ,
options . save_freq ,
maybe_meta_curriculum ,
options . train_model ,
)
# Signal that environment has been launched.
process_queue . put ( True )
# Begin training
try :
tc . start_learning ( env_manager )
else :
# disable noisy warnings from tensorflow.
tf_utils . set_warnings_enabled ( False )
if options . env_path is None and options . num_runs > 1 :
raise TrainerError (
" It is not possible to launch more than one concurrent training session "
" when training from the editor. "
)
jobs = [ ]
if options . num_runs == 1 :
if options . seed == - 1 :
run_seed = np . random . randint ( 0 , 10000 )
run_training ( 0 , run_seed , options , Queue ( ) )
else :
for i in range ( options . num_runs ) :
if options . seed == - 1 :
run_seed = np . random . randint ( 0 , 10000 )
process_queue = Queue ( )
p = Process ( target = run_training , args = ( i , run_seed , options , process_queue ) )
jobs . append ( p )
p . start ( )
# Wait for signal that environment has successfully launched
while process_queue . get ( ) is not True :
continue
# Wait for jobs to complete. Otherwise we'll have an extra
# unhandled KeyboardInterrupt if we end early.
try :
for job in jobs :
job . join ( )
except KeyboardInterrupt :
pass
if options . seed == - 1 :
run_seed = np . random . randint ( 0 , 10000 )
run_training ( run_seed , options )
# For python debugger to directly run this script