|
|
|
|
|
|
DEFAULT_EDITOR_PORT = 5004 |
|
|
|
|
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
file_name: Optional[str] = None, |
|
|
|
worker_id: int = 0, |
|
|
|
base_port: int = 5005, |
|
|
|
seed: int = 0, |
|
|
|
docker_training: bool = False, |
|
|
|
no_graphics: bool = False, |
|
|
|
timeout_wait: int = 60, |
|
|
|
args: Optional[List[str]] = None, |
|
|
|
side_channels: Optional[List[SideChannel]] = None, |
|
|
|
self, |
|
|
|
file_name: Optional[str] = None, |
|
|
|
worker_id: int = 0, |
|
|
|
base_port: int = 5005, |
|
|
|
seed: int = 0, |
|
|
|
docker_training: bool = False, |
|
|
|
no_graphics: bool = False, |
|
|
|
timeout_wait: int = 60, |
|
|
|
args: Optional[List[str]] = None, |
|
|
|
side_channels: Optional[List[SideChannel]] = None, |
|
|
|
): |
|
|
|
""" |
|
|
|
Starts a new unity environment and establishes a connection with the environment. |
|
|
|
|
|
|
# Strip out executable extensions if passed |
|
|
|
env_path = ( |
|
|
|
env_path.strip() |
|
|
|
.replace(".app", "") |
|
|
|
.replace(".exe", "") |
|
|
|
.replace(".x86_64", "") |
|
|
|
.replace(".x86", "") |
|
|
|
.replace(".app", "") |
|
|
|
.replace(".exe", "") |
|
|
|
.replace(".x86_64", "") |
|
|
|
.replace(".x86", "") |
|
|
|
) |
|
|
|
true_filename = os.path.basename(os.path.normpath(env_path)) |
|
|
|
logger.debug("The true file name is {}".format(true_filename)) |
|
|
|
|
|
|
self._close() |
|
|
|
raise UnityEnvironmentException( |
|
|
|
"Couldn't launch the {0} environment. " |
|
|
|
"Provided filename does not match any environments.".format( |
|
|
|
file_name |
|
|
|
) |
|
|
|
"Provided filename does not match any environments.".format(file_name) |
|
|
|
) |
|
|
|
else: |
|
|
|
logger.debug("This is the launch string {}".format(launch_string)) |
|
|
|
|
|
|
self._env_actions[agent_group] = action |
|
|
|
|
|
|
|
def set_action_for_agent( |
|
|
|
self, agent_group: AgentGroup, agent_id: AgentId, action: np.ndarray |
|
|
|
self, agent_group: AgentGroup, agent_id: AgentId, action: np.ndarray |
|
|
|
) -> None: |
|
|
|
self._assert_group_exists(agent_group) |
|
|
|
if agent_group not in self._env_state: |
|
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _parse_side_channel_message( |
|
|
|
side_channels: Dict[int, SideChannel], data: bytes |
|
|
|
side_channels: Dict[int, SideChannel], data: bytes |
|
|
|
) -> None: |
|
|
|
offset = 0 |
|
|
|
while offset < len(data): |
|
|
|
|
|
|
message_data = data[offset : offset + message_len] |
|
|
|
message_data = data[offset: offset + message_len] |
|
|
|
offset = offset + message_len |
|
|
|
except Exception: |
|
|
|
raise UnityEnvironmentException( |
|
|
|
|
|
|
|
|
|
|
@timed |
|
|
|
def _generate_step_input( |
|
|
|
self, vector_action: Dict[str, np.ndarray] |
|
|
|
self, vector_action: Dict[str, np.ndarray] |
|
|
|
) -> UnityInputProto: |
|
|
|
rl_in = UnityRLInputProto() |
|
|
|
for b in vector_action: |
|
|
|
|
|
|
return self.wrap_unity_input(rl_in) |
|
|
|
|
|
|
|
def send_academy_parameters( |
|
|
|
self, init_parameters: UnityRLInitializationInputProto |
|
|
|
self, init_parameters: UnityRLInitializationInputProto |
|
|
|
) -> UnityOutputProto: |
|
|
|
inputs = UnityInputProto() |
|
|
|
inputs.rl_initialization_input.CopyFrom(init_parameters) |
|
|
|