|
|
|
|
|
|
import numpy as np |
|
|
|
import io |
|
|
|
|
|
|
|
from mlagents.envs.communicator_objects.agent_info_proto_pb2 import AgentInfoProto |
|
|
|
from mlagents.envs.communicator_objects.brain_parameters_proto_pb2 import ( |
|
|
|
BrainParametersProto, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class BrainParameters: |
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
brain_name: str, |
|
|
|
vector_observation_space_size: int, |
|
|
|
num_stacked_vector_observations: int, |
|
|
|
camera_resolutions: List[Dict], |
|
|
|
vector_action_space_size: List[int], |
|
|
|
vector_action_descriptions: List[str], |
|
|
|
vector_action_space_type: int, |
|
|
|
): |
|
|
|
""" |
|
|
|
Contains all brain-specific parameters. |
|
|
|
""" |
|
|
|
self.brain_name = brain_name |
|
|
|
self.vector_observation_space_size = vector_observation_space_size |
|
|
|
self.num_stacked_vector_observations = num_stacked_vector_observations |
|
|
|
self.number_visual_observations = len(camera_resolutions) |
|
|
|
self.camera_resolutions = camera_resolutions |
|
|
|
self.vector_action_space_size = vector_action_space_size |
|
|
|
self.vector_action_descriptions = vector_action_descriptions |
|
|
|
self.vector_action_space_type = ["discrete", "continuous"][ |
|
|
|
vector_action_space_type |
|
|
|
] |
|
|
|
|
|
|
|
def __str__(self): |
|
|
|
return """Unity brain name: {} |
|
|
|
Number of Visual Observations (per agent): {} |
|
|
|
Vector Observation space size (per agent): {} |
|
|
|
Number of stacked Vector Observation: {} |
|
|
|
Vector Action space type: {} |
|
|
|
Vector Action space size (per agent): {} |
|
|
|
Vector Action descriptions: {}""".format( |
|
|
|
self.brain_name, |
|
|
|
str(self.number_visual_observations), |
|
|
|
str(self.vector_observation_space_size), |
|
|
|
str(self.num_stacked_vector_observations), |
|
|
|
self.vector_action_space_type, |
|
|
|
str(self.vector_action_space_size), |
|
|
|
", ".join(self.vector_action_descriptions), |
|
|
|
) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def from_proto(brain_param_proto: BrainParametersProto) -> "BrainParameters": |
|
|
|
""" |
|
|
|
Converts brain parameter proto to BrainParameter object. |
|
|
|
:param brain_param_proto: protobuf object. |
|
|
|
:return: BrainParameter object. |
|
|
|
""" |
|
|
|
resolution = [ |
|
|
|
{"height": x.height, "width": x.width, "blackAndWhite": x.gray_scale} |
|
|
|
for x in brain_param_proto.camera_resolutions |
|
|
|
] |
|
|
|
brain_params = BrainParameters( |
|
|
|
brain_param_proto.brain_name, |
|
|
|
brain_param_proto.vector_observation_size, |
|
|
|
brain_param_proto.num_stacked_vector_observations, |
|
|
|
resolution, |
|
|
|
list(brain_param_proto.vector_action_size), |
|
|
|
list(brain_param_proto.vector_action_descriptions), |
|
|
|
brain_param_proto.vector_action_space_type, |
|
|
|
) |
|
|
|
return brain_params |
|
|
|
|
|
|
|
|
|
|
|
class BrainInfo: |
|
|
|
|
|
|
return s |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def from_agent_proto(worker_id: int, agent_info_list, brain_params): |
|
|
|
def from_agent_proto( |
|
|
|
worker_id: int, |
|
|
|
agent_info_list: List[AgentInfoProto], |
|
|
|
brain_params: BrainParameters, |
|
|
|
) -> "BrainInfo": |
|
|
|
""" |
|
|
|
Converts list of agent infos to BrainInfo. |
|
|
|
""" |
|
|
|
|
|
|
return copy |
|
|
|
|
|
|
|
|
|
|
|
def safe_concat_np_ndarray(a1: Optional[np.ndarray], a2: Optional[np.ndarray]): |
|
|
|
def safe_concat_np_ndarray( |
|
|
|
a1: Optional[np.ndarray], a2: Optional[np.ndarray] |
|
|
|
) -> Optional[np.ndarray]: |
|
|
|
if a1 is not None and a1.size != 0: |
|
|
|
if a2 is not None and a2.size != 0: |
|
|
|
return np.append(a1, a2, axis=0) |
|
|
|
|
|
|
|
|
|
|
# Renaming of dictionary of brain name to BrainInfo for clarity |
|
|
|
AllBrainInfo = Dict[str, BrainInfo] |
|
|
|
|
|
|
|
|
|
|
|
class BrainParameters: |
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
brain_name: str, |
|
|
|
vector_observation_space_size: int, |
|
|
|
num_stacked_vector_observations: int, |
|
|
|
camera_resolutions: List[Dict], |
|
|
|
vector_action_space_size: List[int], |
|
|
|
vector_action_descriptions: List[str], |
|
|
|
vector_action_space_type: int, |
|
|
|
): |
|
|
|
""" |
|
|
|
Contains all brain-specific parameters. |
|
|
|
""" |
|
|
|
self.brain_name = brain_name |
|
|
|
self.vector_observation_space_size = vector_observation_space_size |
|
|
|
self.num_stacked_vector_observations = num_stacked_vector_observations |
|
|
|
self.number_visual_observations = len(camera_resolutions) |
|
|
|
self.camera_resolutions = camera_resolutions |
|
|
|
self.vector_action_space_size = vector_action_space_size |
|
|
|
self.vector_action_descriptions = vector_action_descriptions |
|
|
|
self.vector_action_space_type = ["discrete", "continuous"][ |
|
|
|
vector_action_space_type |
|
|
|
] |
|
|
|
|
|
|
|
def __str__(self): |
|
|
|
return """Unity brain name: {} |
|
|
|
Number of Visual Observations (per agent): {} |
|
|
|
Vector Observation space size (per agent): {} |
|
|
|
Number of stacked Vector Observation: {} |
|
|
|
Vector Action space type: {} |
|
|
|
Vector Action space size (per agent): {} |
|
|
|
Vector Action descriptions: {}""".format( |
|
|
|
self.brain_name, |
|
|
|
str(self.number_visual_observations), |
|
|
|
str(self.vector_observation_space_size), |
|
|
|
str(self.num_stacked_vector_observations), |
|
|
|
self.vector_action_space_type, |
|
|
|
str(self.vector_action_space_size), |
|
|
|
", ".join(self.vector_action_descriptions), |
|
|
|
) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def from_proto(brain_param_proto): |
|
|
|
""" |
|
|
|
Converts brain parameter proto to BrainParameter object. |
|
|
|
:param brain_param_proto: protobuf object. |
|
|
|
:return: BrainParameter object. |
|
|
|
""" |
|
|
|
resolution = [ |
|
|
|
{"height": x.height, "width": x.width, "blackAndWhite": x.gray_scale} |
|
|
|
for x in brain_param_proto.camera_resolutions |
|
|
|
] |
|
|
|
brain_params = BrainParameters( |
|
|
|
brain_param_proto.brain_name, |
|
|
|
brain_param_proto.vector_observation_size, |
|
|
|
brain_param_proto.num_stacked_vector_observations, |
|
|
|
resolution, |
|
|
|
list(brain_param_proto.vector_action_size), |
|
|
|
list(brain_param_proto.vector_action_descriptions), |
|
|
|
brain_param_proto.vector_action_space_type, |
|
|
|
) |
|
|
|
return brain_params |