浏览代码

Allow mypy to reject incomplete defs for mlagents-envs (#2585)

This wasn't working before because of several remaining partially defined
function definitions.
/develop-gpu-test
GitHub 5 年前
当前提交
e1d93a0e
共有 9 个文件被更改,包括 125 次插入101 次删除
  1. 3
      .pre-commit-config.yaml
  2. 13
      ml-agents-envs/mlagents/envs/base_unity_environment.py
  3. 144
      ml-agents-envs/mlagents/envs/brain.py
  4. 4
      ml-agents-envs/mlagents/envs/env_manager.py
  5. 32
      ml-agents-envs/mlagents/envs/environment.py
  6. 13
      ml-agents-envs/mlagents/envs/sampler_class.py
  7. 9
      ml-agents-envs/mlagents/envs/subprocess_env_manager.py
  8. 2
      ml-agents-envs/mlagents/envs/tests/test_subprocess_env_manager.py
  9. 6
      ml-agents-envs/mlagents/envs/timers.py

3
.pre-commit-config.yaml


files: "ml-agents-envs/.*"
# Exclude protobuf files and don't follow them when imported
exclude: ".*_pb2.py"
# TODO get disallow-incomplete-defs working
args: [--ignore-missing-imports, --follow-imports=silent]
args: [--ignore-missing-imports, --disallow-incomplete-defs]
- id: mypy
name: mypy-gym-unity
files: "gym-unity/.*"

13
ml-agents-envs/mlagents/envs/base_unity_environment.py


from abc import ABC, abstractmethod
from typing import Dict
from typing import Dict, Optional, Any
from mlagents.envs.brain import AllBrainInfo, BrainParameters

def step(
self, vector_action=None, memory=None, text_action=None, value=None
self,
vector_action: Optional[Dict] = None,
memory: Optional[Dict] = None,
text_action: Optional[Dict] = None,
value: Optional[Dict] = None,
self, config=None, train_mode=True, custom_reset_parameters=None
self,
config: Optional[Dict] = None,
train_mode: bool = True,
custom_reset_parameters: Any = None,
) -> AllBrainInfo:
pass

144
ml-agents-envs/mlagents/envs/brain.py


import numpy as np
import io
from mlagents.envs.communicator_objects.agent_info_proto_pb2 import AgentInfoProto
from mlagents.envs.communicator_objects.brain_parameters_proto_pb2 import (
BrainParametersProto,
)
class BrainParameters:
def __init__(
self,
brain_name: str,
vector_observation_space_size: int,
num_stacked_vector_observations: int,
camera_resolutions: List[Dict],
vector_action_space_size: List[int],
vector_action_descriptions: List[str],
vector_action_space_type: int,
):
"""
Contains all brain-specific parameters.
"""
self.brain_name = brain_name
self.vector_observation_space_size = vector_observation_space_size
self.num_stacked_vector_observations = num_stacked_vector_observations
self.number_visual_observations = len(camera_resolutions)
self.camera_resolutions = camera_resolutions
self.vector_action_space_size = vector_action_space_size
self.vector_action_descriptions = vector_action_descriptions
self.vector_action_space_type = ["discrete", "continuous"][
vector_action_space_type
]
def __str__(self):
return """Unity brain name: {}
Number of Visual Observations (per agent): {}
Vector Observation space size (per agent): {}
Number of stacked Vector Observation: {}
Vector Action space type: {}
Vector Action space size (per agent): {}
Vector Action descriptions: {}""".format(
self.brain_name,
str(self.number_visual_observations),
str(self.vector_observation_space_size),
str(self.num_stacked_vector_observations),
self.vector_action_space_type,
str(self.vector_action_space_size),
", ".join(self.vector_action_descriptions),
)
@staticmethod
def from_proto(brain_param_proto: BrainParametersProto) -> "BrainParameters":
"""
Converts brain parameter proto to BrainParameter object.
:param brain_param_proto: protobuf object.
:return: BrainParameter object.
"""
resolution = [
{"height": x.height, "width": x.width, "blackAndWhite": x.gray_scale}
for x in brain_param_proto.camera_resolutions
]
brain_params = BrainParameters(
brain_param_proto.brain_name,
brain_param_proto.vector_observation_size,
brain_param_proto.num_stacked_vector_observations,
resolution,
list(brain_param_proto.vector_action_size),
list(brain_param_proto.vector_action_descriptions),
brain_param_proto.vector_action_space_type,
)
return brain_params
class BrainInfo:

return s
@staticmethod
def from_agent_proto(worker_id: int, agent_info_list, brain_params):
def from_agent_proto(
worker_id: int,
agent_info_list: List[AgentInfoProto],
brain_params: BrainParameters,
) -> "BrainInfo":
"""
Converts list of agent infos to BrainInfo.
"""

return copy
def safe_concat_np_ndarray(a1: Optional[np.ndarray], a2: Optional[np.ndarray]):
def safe_concat_np_ndarray(
a1: Optional[np.ndarray], a2: Optional[np.ndarray]
) -> Optional[np.ndarray]:
if a1 is not None and a1.size != 0:
if a2 is not None and a2.size != 0:
return np.append(a1, a2, axis=0)

# Renaming of dictionary of brain name to BrainInfo for clarity
AllBrainInfo = Dict[str, BrainInfo]
class BrainParameters:
def __init__(
self,
brain_name: str,
vector_observation_space_size: int,
num_stacked_vector_observations: int,
camera_resolutions: List[Dict],
vector_action_space_size: List[int],
vector_action_descriptions: List[str],
vector_action_space_type: int,
):
"""
Contains all brain-specific parameters.
"""
self.brain_name = brain_name
self.vector_observation_space_size = vector_observation_space_size
self.num_stacked_vector_observations = num_stacked_vector_observations
self.number_visual_observations = len(camera_resolutions)
self.camera_resolutions = camera_resolutions
self.vector_action_space_size = vector_action_space_size
self.vector_action_descriptions = vector_action_descriptions
self.vector_action_space_type = ["discrete", "continuous"][
vector_action_space_type
]
def __str__(self):
return """Unity brain name: {}
Number of Visual Observations (per agent): {}
Vector Observation space size (per agent): {}
Number of stacked Vector Observation: {}
Vector Action space type: {}
Vector Action space size (per agent): {}
Vector Action descriptions: {}""".format(
self.brain_name,
str(self.number_visual_observations),
str(self.vector_observation_space_size),
str(self.num_stacked_vector_observations),
self.vector_action_space_type,
str(self.vector_action_space_size),
", ".join(self.vector_action_descriptions),
)
@staticmethod
def from_proto(brain_param_proto):
"""
Converts brain parameter proto to BrainParameter object.
:param brain_param_proto: protobuf object.
:return: BrainParameter object.
"""
resolution = [
{"height": x.height, "width": x.width, "blackAndWhite": x.gray_scale}
for x in brain_param_proto.camera_resolutions
]
brain_params = BrainParameters(
brain_param_proto.brain_name,
brain_param_proto.vector_observation_size,
brain_param_proto.num_stacked_vector_observations,
resolution,
list(brain_param_proto.vector_action_size),
list(brain_param_proto.vector_action_descriptions),
brain_param_proto.vector_action_space_type,
)
return brain_params

4
ml-agents-envs/mlagents/envs/env_manager.py


pass
@abstractmethod
def reset(self, config=None, train_mode=True) -> List[EnvironmentStep]:
def reset(
self, config: Dict = None, train_mode: bool = True
) -> List[EnvironmentStep]:
pass
@property

32
ml-agents-envs/mlagents/envs/environment.py


import numpy as np
import os
import subprocess
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Any
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.timers import timed, hierarchical_timer

class UnityEnvironment(BaseUnityEnvironment):
SCALAR_ACTION_TYPES = (int, np.int32, np.int64, float, np.float32, np.float64)
SINGLE_BRAIN_ACTION_TYPES = SCALAR_ACTION_TYPES + (list, np.ndarray)
SINGLE_BRAIN_TEXT_TYPES = (str, list, np.ndarray)
SINGLE_BRAIN_TEXT_TYPES = list
def __init__(
self,

)
def reset(
self, config=None, train_mode=True, custom_reset_parameters=None
self,
config: Dict = None,
train_mode: bool = True,
custom_reset_parameters: Any = None,
) -> AllBrainInfo:
"""
Sends a signal to reset the unity environment.

@timed
def step(
self,
vector_action=None,
memory=None,
text_action=None,
value=None,
custom_action=None,
vector_action: Dict[str, np.ndarray] = None,
memory: Optional[Dict[str, np.ndarray]] = None,
text_action: Optional[Dict[str, List[str]]] = None,
value: Optional[Dict[str, np.ndarray]] = None,
custom_action: Dict[str, Any] = None,
) -> AllBrainInfo:
"""
Provides the environment with an action, moves the environment dynamics forward accordingly,

else:
if text_action[brain_name] is None:
text_action[brain_name] = [""] * n_agent
if isinstance(text_action[brain_name], str):
text_action[brain_name] = [text_action[brain_name]] * n_agent
if brain_name not in custom_action:
custom_action[brain_name] = [None] * n_agent
else:

self.proc1.kill()
@classmethod
def _flatten(cls, arr) -> List[float]:
def _flatten(cls, arr: Any) -> List[float]:
"""
Converts arrays to list.
:param arr: numpy vector.

@timed
def _generate_step_input(
self, vector_action, memory, text_action, value, custom_action
self,
vector_action: Dict[str, np.ndarray],
memory: Dict[str, np.ndarray],
text_action: Dict[str, list],
value: Dict[str, np.ndarray],
custom_action: Dict[str, list],
) -> UnityInput:
rl_in = UnityRLInput()
for b in vector_action:

return self.wrap_unity_input(rl_in)
def _generate_reset_input(
self, training, config, custom_reset_parameters
self, training: bool, config: Dict, custom_reset_parameters: Any
) -> UnityInput:
rl_in = UnityRLInput()
rl_in.is_training = training

13
ml-agents-envs/mlagents/envs/sampler_class.py


min_value: Union[int, float],
max_value: Union[int, float],
seed: Optional[int] = None,
**kwargs
) -> None:
):
"""
:param min_value: minimum value of the range to be sampled uniformly from
:param max_value: maximum value of the range to be sampled uniformly from

"""
def __init__(
self,
intervals: List[List[Union[int, float]]],
seed: Optional[int] = None,
**kwargs
) -> None:
self, intervals: List[List[Union[int, float]]], seed: Optional[int] = None
):
"""
:param intervals: List of intervals to draw uniform samples from
:param seed: Random seed used for making uniform draws from the specified intervals

mean: Union[float, int],
st_dev: Union[float, int],
seed: Optional[int] = None,
**kwargs
) -> None:
):
"""
:param mean: Specifies the mean of the gaussian distribution to draw from
:param st_dev: Specifies the standard devation of the gaussian distribution to draw from

9
ml-agents-envs/mlagents/envs/subprocess_env_manager.py


self.previous_all_action_info: Dict[str, ActionInfo] = {}
self.waiting = False
def send(self, name: str, payload=None):
def send(self, name: str, payload: Any = None) -> None:
try:
cmd = EnvironmentCommand(name, payload)
self.conn.send(cmd)

def worker(
parent_conn: Connection, step_queue: Queue, pickled_env_factory: str, worker_id: int
):
) -> None:
env_factory: Callable[[int], UnityEnvironment] = cloudpickle.loads(
pickled_env_factory
)

return step_infos
def reset(
self, config=None, train_mode=True, custom_reset_parameters=None
self,
config: Optional[Dict] = None,
train_mode: bool = True,
custom_reset_parameters: Any = None,
) -> List[EnvironmentStep]:
while any([ew.waiting for ew in self.env_workers]):
if not self.step_queue.empty():

2
ml-agents-envs/mlagents/envs/tests/test_subprocess_env_manager.py


from mlagents.envs.base_unity_environment import BaseUnityEnvironment
def mock_env_factory(worker_id: int):
def mock_env_factory(worker_id):
return mock.create_autospec(spec=BaseUnityEnvironment)

6
ml-agents-envs/mlagents/envs/timers.py


self.total += elapsed
self.count += 1
def merge(self, other: "TimerNode", root_name: str = None, is_parallel=True):
def merge(
self, other: "TimerNode", root_name: str = None, is_parallel: bool = True
) -> None:
"""
Add the other node to this node, then do the same recursively on its children.
:param other: The other node to merge

self.max_value = value
self.count = 1
def update(self, new_value: float):
def update(self, new_value: float) -> None:
self.min_value = min(self.min_value, new_value)
self.max_value = max(self.max_value, new_value)
self.value = new_value

正在加载...
取消
保存