您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
149 行
6.9 KiB
149 行
6.9 KiB
import logging
|
|
import numpy as np
|
|
import io
|
|
|
|
from typing import Dict
|
|
from PIL import Image
|
|
|
|
logger = logging.getLogger("mlagents.envs")
|
|
|
|
|
|
class BrainInfo:
|
|
def __init__(self, visual_observation, vector_observation, text_observations, memory=None,
|
|
reward=None, agents=None, local_done=None,
|
|
vector_action=None, text_action=None, max_reached=None, action_mask=None):
|
|
"""
|
|
Describes experience at current step of all agents linked to a brain.
|
|
"""
|
|
self.visual_observations = visual_observation
|
|
self.vector_observations = vector_observation
|
|
self.text_observations = text_observations
|
|
self.memories = memory
|
|
self.rewards = reward
|
|
self.local_done = local_done
|
|
self.max_reached = max_reached
|
|
self.agents = agents
|
|
self.previous_vector_actions = vector_action
|
|
self.previous_text_actions = text_action
|
|
self.action_masks = action_mask
|
|
|
|
@staticmethod
|
|
def process_pixels(image_bytes, gray_scale):
|
|
"""
|
|
Converts byte array observation image into numpy array, re-sizes it,
|
|
and optionally converts it to grey scale
|
|
:param gray_scale: Whether to convert the image to grayscale.
|
|
:param image_bytes: input byte array corresponding to image
|
|
:return: processed numpy array of observation from environment
|
|
"""
|
|
s = bytearray(image_bytes)
|
|
image = Image.open(io.BytesIO(s))
|
|
s = np.array(image) / 255.0
|
|
if gray_scale:
|
|
s = np.mean(s, axis=2)
|
|
s = np.reshape(s, [s.shape[0], s.shape[1], 1])
|
|
return s
|
|
|
|
@staticmethod
|
|
def from_agent_proto(agent_info_list, brain_params):
|
|
"""
|
|
Converts list of agent infos to BrainInfo.
|
|
"""
|
|
vis_obs = []
|
|
for i in range(brain_params.number_visual_observations):
|
|
obs = [BrainInfo.process_pixels(x.visual_observations[i],
|
|
brain_params.camera_resolutions[i]['blackAndWhite'])
|
|
for x in agent_info_list]
|
|
vis_obs += [np.array(obs)]
|
|
if len(agent_info_list) == 0:
|
|
memory_size = 0
|
|
else:
|
|
memory_size = max([len(x.memories) for x in agent_info_list])
|
|
if memory_size == 0:
|
|
memory = np.zeros((0, 0))
|
|
else:
|
|
[x.memories.extend([0] * (memory_size - len(x.memories))) for x in agent_info_list]
|
|
memory = np.array([x.memories for x in agent_info_list])
|
|
total_num_actions = sum(brain_params.vector_action_space_size)
|
|
mask_actions = np.ones((len(agent_info_list), total_num_actions))
|
|
for agent_index, agent_info in enumerate(agent_info_list):
|
|
if agent_info.action_mask is not None:
|
|
if len(agent_info.action_mask) == total_num_actions:
|
|
mask_actions[agent_index, :] = [
|
|
0 if agent_info.action_mask[k] else 1 for k in range(total_num_actions)]
|
|
if any([np.isnan(x.reward) for x in agent_info_list]):
|
|
logger.warning("An agent had a NaN reward for brain " + brain_params.brain_name)
|
|
if any([np.isnan(x.stacked_vector_observation).any() for x in agent_info_list]):
|
|
logger.warning("An agent had a NaN observation for brain " + brain_params.brain_name)
|
|
brain_info = BrainInfo(
|
|
visual_observation=vis_obs,
|
|
vector_observation=np.nan_to_num(
|
|
np.array([x.stacked_vector_observation for x in agent_info_list])),
|
|
text_observations=[x.text_observation for x in agent_info_list],
|
|
memory=memory,
|
|
reward=[x.reward if not np.isnan(x.reward) else 0 for x in agent_info_list],
|
|
agents=[x.id for x in agent_info_list],
|
|
local_done=[x.done for x in agent_info_list],
|
|
vector_action=np.array([x.stored_vector_actions for x in agent_info_list]),
|
|
text_action=[x.stored_text_actions for x in agent_info_list],
|
|
max_reached=[x.max_step_reached for x in agent_info_list],
|
|
action_mask=mask_actions
|
|
)
|
|
return brain_info
|
|
|
|
|
|
# Renaming of dictionary of brain name to BrainInfo for clarity
|
|
AllBrainInfo = Dict[str, BrainInfo]
|
|
|
|
|
|
class BrainParameters:
|
|
def __init__(self, brain_name, vector_observation_space_size, num_stacked_vector_observations,
|
|
camera_resolutions, vector_action_space_size,
|
|
vector_action_descriptions, vector_action_space_type):
|
|
"""
|
|
Contains all brain-specific parameters.
|
|
"""
|
|
self.brain_name = brain_name
|
|
self.vector_observation_space_size = vector_observation_space_size
|
|
self.num_stacked_vector_observations = num_stacked_vector_observations
|
|
self.number_visual_observations = len(camera_resolutions)
|
|
self.camera_resolutions = camera_resolutions
|
|
self.vector_action_space_size = vector_action_space_size
|
|
self.vector_action_descriptions = vector_action_descriptions
|
|
self.vector_action_space_type = ["discrete", "continuous"][vector_action_space_type]
|
|
|
|
def __str__(self):
|
|
return '''Unity brain name: {}
|
|
Number of Visual Observations (per agent): {}
|
|
Vector Observation space size (per agent): {}
|
|
Number of stacked Vector Observation: {}
|
|
Vector Action space type: {}
|
|
Vector Action space size (per agent): {}
|
|
Vector Action descriptions: {}'''.format(self.brain_name,
|
|
str(self.number_visual_observations),
|
|
str(self.vector_observation_space_size),
|
|
str(self.num_stacked_vector_observations),
|
|
self.vector_action_space_type,
|
|
str(self.vector_action_space_size),
|
|
', '.join(self.vector_action_descriptions))
|
|
|
|
@staticmethod
|
|
def from_proto(brain_param_proto):
|
|
"""
|
|
Converts brain parameter proto to BrainParameter object.
|
|
:param brain_param_proto: protobuf object.
|
|
:return: BrainParameter object.
|
|
"""
|
|
resolution = [{
|
|
"height": x.height,
|
|
"width": x.width,
|
|
"blackAndWhite": x.gray_scale
|
|
} for x in brain_param_proto.camera_resolutions]
|
|
brain_params = BrainParameters(brain_param_proto.brain_name,
|
|
brain_param_proto.vector_observation_size,
|
|
brain_param_proto.num_stacked_vector_observations,
|
|
resolution,
|
|
brain_param_proto.vector_action_size,
|
|
brain_param_proto.vector_action_descriptions,
|
|
brain_param_proto.vector_action_space_type)
|
|
return brain_params
|