|
|
|
|
|
|
:param agent_info: protobuf object. |
|
|
|
:return: BehaviorSpec object. |
|
|
|
""" |
|
|
|
sensor_specs = [] |
|
|
|
observation_specs = [] |
|
|
|
sensor_specs.append( |
|
|
|
observation_specs.append( |
|
|
|
SensorType(obs.sensor_type), |
|
|
|
ObservationType(obs.sensor_type), |
|
|
|
) |
|
|
|
) |
|
|
|
# proto from communicator < v1.3 does not set action spec, use deprecated fields instead |
|
|
|
|
|
|
action_spec_proto.num_continuous_actions, |
|
|
|
tuple(branch for branch in action_spec_proto.discrete_branch_sizes), |
|
|
|
) |
|
|
|
return BehaviorSpec(sensor_specs, action_spec) |
|
|
|
return BehaviorSpec(observation_specs, action_spec) |
|
|
|
|
|
|
|
|
|
|
|
class OffsetBytesIO: |
|
|
|
|
|
|
] |
|
|
|
decision_obs_list: List[np.ndarray] = [] |
|
|
|
terminal_obs_list: List[np.ndarray] = [] |
|
|
|
for obs_index, sensor_specs in enumerate(behavior_spec.observation_specs): |
|
|
|
is_visual = len(sensor_specs.shape) == 3 |
|
|
|
for obs_index, observation_specs in enumerate(behavior_spec.observation_specs): |
|
|
|
is_visual = len(observation_specs.shape) == 3 |
|
|
|
obs_shape = cast(Tuple[int, int, int], sensor_specs.shape) |
|
|
|
obs_shape = cast(Tuple[int, int, int], observation_specs.shape) |
|
|
|
decision_obs_list.append( |
|
|
|
_process_visual_observation( |
|
|
|
obs_index, obs_shape, decision_agent_info_list |
|
|
|
|
|
|
else: |
|
|
|
decision_obs_list.append( |
|
|
|
_process_vector_observation( |
|
|
|
obs_index, sensor_specs.shape, decision_agent_info_list |
|
|
|
obs_index, observation_specs.shape, decision_agent_info_list |
|
|
|
obs_index, sensor_specs.shape, terminal_agent_info_list |
|
|
|
obs_index, observation_specs.shape, terminal_agent_info_list |
|
|
|
) |
|
|
|
) |
|
|
|
decision_rewards = np.array( |
|
|
|