浏览代码

Don't block when disabling threading

/develop/sac-apex
Ervin Teng 4 年前
当前提交
971e4b2d
共有 7 个文件被更改,包括 12 次插入15 次删除
  1. 10
      ml-agents/mlagents/trainers/agent_processor.py
  2. 2
      ml-agents/mlagents/trainers/env_manager.py
  3. 8
      ml-agents/mlagents/trainers/ghost/trainer.py
  4. 2
      ml-agents/mlagents/trainers/tests/test_agent_processor.py
  5. 2
      ml-agents/mlagents/trainers/tests/test_ghost.py
  6. 1
      ml-agents/mlagents/trainers/tests/test_simple_rl.py
  7. 2
      ml-agents/mlagents/trainers/trainer/rl_trainer.py

10
ml-agents/mlagents/trainers/agent_processor.py


def empty(self) -> bool:
return self.queue.empty()
def get_nowait(self) -> T:
try:
return self.queue.get_nowait()
except queue.Empty:
raise self.Empty("The AgentManagerQueue is empty.")
def get(self, timeout: float) -> T:
def get(self, block: bool = True, timeout: float = None) -> T:
return self.queue.get(timeout=timeout)
return self.queue.get(block=block, timeout=timeout)
except queue.Empty:
raise self.Empty("The AgentManagerQueue is empty.")

2
ml-agents/mlagents/trainers/env_manager.py


# Get new policies if found
for brain_name in self.external_brains:
try:
_policy = self.agent_managers[brain_name].policy_queue.get_nowait()
_policy = self.agent_managers[brain_name].policy_queue.get(block=False)
self.set_policy(brain_name, _policy)
except AgentManagerQueue.Empty:
pass

8
ml-agents/mlagents/trainers/ghost/trainer.py


# This ensures that even if the queue is being filled faster than it is
# being emptied, the trajectories in the queue are on-policy.
for _ in range(trajectory_queue.maxlen):
t = trajectory_queue.get_nowait()
t = trajectory_queue.get(block=not empty_queue, timeout=0.05)
# adds to wrapped trainers queue
internal_trajectory_queue.put(t)
self._process_trajectory(t)

# Dump trajectories from non-learning policy
try:
for _ in range(trajectory_queue.maxlen):
t = trajectory_queue.get_nowait()
t = trajectory_queue.get(block=not empty_queue, timeout=0.05)
if not empty_queue:
break
except AgentManagerQueue.Empty:
pass

for brain_name in self._internal_policy_queues:
internal_policy_queue = self._internal_policy_queues[brain_name]
try:
policy = cast(TFPolicy, internal_policy_queue.get_nowait())
policy = cast(TFPolicy, internal_policy_queue.get(block=False))
self.current_policy_snapshot[brain_name] = policy.get_weights()
except AgentManagerQueue.Empty:
pass

2
ml-agents/mlagents/trainers/tests/test_agent_processor.py


assert queue.empty()
queue.put(trajectory)
assert not queue.empty()
queue_traj = queue.get_nowait()
queue_traj = queue.get(block=False)
assert isinstance(queue_traj, Trajectory)
assert queue.empty()

2
ml-agents/mlagents/trainers/tests/test_ghost.py


trainer._swap_snapshots()
assert policy_queue0.empty() and not policy_queue1.empty()
# clear
policy_queue1.get_nowait()
policy_queue1.get(block=False)
mock_brain = mb.setup_mock_brain(
False,

1
ml-agents/mlagents/trainers/tests/test_simple_rl.py


sampler_manager=SamplerManager(None),
resampling_interval=None,
save_freq=save_freq,
threaded=False,
)
# Begin training

2
ml-agents/mlagents/trainers/trainer/rl_trainer.py


# being emptied, the trajectories in the queue are on-policy.
for _ in range(traj_queue.maxlen):
try:
t = traj_queue.get(0.05)
t = traj_queue.get(block=not empty_queue, timeout=0.05)
self._process_trajectory(t)
if not empty_queue:
break

正在加载...
取消
保存