浏览代码

Get timers from subprocess (#2268)

* Timer proof-of-concept

* micro optimizations

* add some timers

* cleanup, add asserts

* Cleanup (no start/end methods) and handle exceptions

* unit test and decorator

* move output code, add a decorator

* cleanup

* module docstring

* actually write the timings when done with training

* use __qualname__ instead

* add a few more timers

* fix mock import

* fix unit test

* get timers from worker process (WIP)

* clean up timer merging

* typo

* WIP

* cleanup merging code

* bad merge

* undo accidental change

* remove reset command

* fix style

* fix unit tests

* fix unit tests (they got overwrote in merge)

* get timer root though a function

* timer around communicate
/develop-generalizationTraining-TrainerController
GitHub 5 年前
当前提交
f82f0f37
共有 4 个文件被更改,包括 114 次插入21 次删除
  1. 11
      ml-agents-envs/mlagents/envs/environment.py
  2. 42
      ml-agents-envs/mlagents/envs/subprocess_env_manager.py
  3. 11
      ml-agents-envs/mlagents/envs/tests/test_subprocess_env_manager.py
  4. 71
      ml-agents-envs/mlagents/envs/timers.py

11
ml-agents-envs/mlagents/envs/environment.py


from typing import *
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.timers import timed, hierarchical_timer
from .brain import AllBrainInfo, BrainInfo, BrainParameters
from .exception import (
UnityEnvironmentException,

else:
raise UnityEnvironmentException("No Unity environment is loaded.")
@timed
def step(
self,
vector_action=None,

)
)
outputs = self.communicator.exchange(
self._generate_step_input(
vector_action, memory, text_action, value, custom_action
)
step_input = self._generate_step_input(
vector_action, memory, text_action, value, custom_action
with hierarchical_timer("communicator.exchange"):
outputs = self.communicator.exchange(step_input)
if outputs is None:
raise KeyboardInterrupt
rl_output = outputs.rl_output

)
return _data, global_done
@timed
def _generate_step_input(
self, vector_action, memory, text_action, value, custom_action
) -> UnityInput:

42
ml-agents-envs/mlagents/envs/subprocess_env_manager.py


from queue import Empty as EmptyQueueException
from mlagents.envs.base_unity_environment import BaseUnityEnvironment
from mlagents.envs.env_manager import EnvManager, StepInfo
from mlagents.envs.timers import timed
from mlagents.envs import BrainParameters, ActionInfo
from mlagents.envs.timers import (
TimerNode,
timed,
hierarchical_timer,
reset_timers,
get_timer_root,
)
from mlagents.envs import AllBrainInfo, BrainParameters, ActionInfo
class EnvironmentCommand(NamedTuple):

name: str
worker_id: int
payload: Any
class StepResponse(NamedTuple):
all_brain_info: AllBrainInfo
timer_root: Optional[TimerNode]
class UnityEnvWorker:

texts[brain_name] = action_info.text
values[brain_name] = action_info.value
all_brain_info = env.step(actions, memories, texts, values)
step_queue.put(EnvironmentResponse("step", worker_id, all_brain_info))
# The timers in this process are independent from all the processes and the "main" process
# So after we send back the root timer, we can safely clear them.
# Note that we could randomly return timers a fraction of the time if we wanted to reduce
# the data transferred.
step_response = StepResponse(all_brain_info, get_timer_root())
step_queue.put(EnvironmentResponse("step", worker_id, step_response))
reset_timers()
elif cmd.name == "external_brains":
_send_response("external_brains", env.external_brains)
elif cmd.name == "reset_parameters":

def close(self) -> None:
self.step_queue.close()
self.step_queue.join_thread()
for env in self.env_workers:
env.close()
for env_worker in self.env_workers:
env_worker.close()
timer_nodes = []
payload: StepResponse = step.payload
step.payload,
payload.all_brain_info,
if payload.timer_root:
timer_nodes.append(payload.timer_root)
if timer_nodes:
with hierarchical_timer("workers") as main_timer_node:
for worker_timer_node in timer_nodes:
main_timer_node.merge(
worker_timer_node, root_name="worker_root", is_parallel=True
)
return step_infos
@timed

11
ml-agents-envs/mlagents/envs/tests/test_subprocess_env_manager.py


EnvironmentResponse,
EnvironmentCommand,
worker,
StepResponse,
)
from mlagents.envs.base_unity_environment import BaseUnityEnvironment

# recv called twice to get step and close command
self.assertEqual(mock_parent_connection.recv.call_count, 2)
expected_step_response = StepResponse(
all_brain_info="reset_data", timer_root=mock.ANY
)
EnvironmentResponse("step", 0, "reset_data")
EnvironmentResponse("step", 0, expected_step_response)
)
def test_reset_passes_reset_params(self):

manager = SubprocessEnvManager(mock_env_factory, 3)
manager.step_queue = Mock()
manager.step_queue.get_nowait.side_effect = [
EnvironmentResponse("step", 0, 0),
EnvironmentResponse("step", 1, 1),
EnvironmentResponse("step", 0, StepResponse(0, None)),
EnvironmentResponse("step", 1, StepResponse(1, None)),
EmptyQueue(),
]
step_mock = Mock()

71
ml-agents-envs/mlagents/envs/timers.py


Represents the time spent in a block of code.
"""
__slots__ = ["children", "total", "count"]
__slots__ = ["children", "total", "count", "is_parallel"]
def __init__(self):
# Note that since dictionary keys are the node names, we don't explicitly store the name on the TimerNode.

self.is_parallel = False
def get_child(self, name: str) -> "TimerNode":
"""

self.total += elapsed
self.count += 1
def merge(self, other: "TimerNode", root_name: str = None, is_parallel=True):
"""
Add the other node to this node, then do the same recursively on its children.
:param other: The other node to merge
:param root_name: Optional name of the root node being merged.
:param is_parallel: Whether or not the code block was executed in parallel.
:return:
"""
if root_name:
node = self.get_child(root_name)
else:
node = self
node.total += other.total
node.count += other.count
node.is_parallel |= is_parallel
for other_child_name, other_child_node in other.children.items():
child = node.get_child(other_child_name)
child.merge(other_child_node, is_parallel=is_parallel)
class TimerStack:
"""

self.stack = [self.root]
self.start_time = perf_counter()
def reset(self):
self.root = TimerNode()
self.stack = [self.root]
self.start_time = perf_counter()
def push(self, name: str) -> TimerNode:
"""
Called when entering a new block of code that is timed (e.g. with a contextmanager).

"""
self.stack.pop()
def get_root(self) -> TimerNode:
"""
Update the total time and count of the root name, and return it.
"""
root = self.root
root.total = perf_counter() - self.start_time
root.count = 1
return root
res: Dict[str, Any] = {}
node = self.root
total_elapsed = perf_counter() - self.start_time
res = {"name": "root", "total": total_elapsed, "count": 1}
else:
res = {"total": node.total, "count": node.count}
node = self.get_root()
res["name"] = "root"
res["total"] = node.total
res["count"] = node.count
if node.is_parallel:
# Note when the block ran in parallel, so that it's less confusing that a timer is less that its children.
res["is_parallel"] = True
child_total = 0.0
child_list = []

try:
# The wrapped code block will run here.
yield
yield timer_node
finally:
# This will trigger either when the context manager exits, or an exception is raised.
# We'll accumulate the time, and the exception (if any) gets raised automatically.

"""
timer_stack = timer_stack or _global_timer_stack
return timer_stack.get_timing_tree()
def get_timer_root(timer_stack: TimerStack = None) -> TimerNode:
"""
Get the root TimerNode of the timer_stack (or the global TimerStack if not specified)
"""
timer_stack = timer_stack or _global_timer_stack
return timer_stack.get_root()
def reset_timers(timer_stack: TimerStack = None) -> None:
"""
Reset the timer_stack (or the global TimerStack if not specified)
"""
timer_stack = timer_stack or _global_timer_stack
timer_stack.reset()
正在加载...
取消
保存