浏览代码

Revert to get_nowait method in AgentManagerQueue

/develop/sac-apex
Ervin Teng 5 年前
当前提交
e90ef688
共有 7 个文件被更改,包括 15 次插入18 次删除
  1. 11
      ml-agents/mlagents/trainers/agent_processor.py
  2. 6
      ml-agents/mlagents/trainers/ghost/trainer.py
  3. 2
      ml-agents/mlagents/trainers/tests/test_agent_processor.py
  4. 2
      ml-agents/mlagents/trainers/tests/test_ghost.py
  5. 6
      ml-agents/mlagents/trainers/tests/test_rl_trainer.py
  6. 4
      ml-agents/mlagents/trainers/tests/test_sac.py
  7. 2
      ml-agents/mlagents/trainers/trainer/rl_trainer.py

11
ml-agents/mlagents/trainers/agent_processor.py


def empty(self) -> bool:
return self._queue.empty()
def get(self, block: bool = True, timeout: float = None) -> T:
def get_nowait(self, block: bool = True, timeout: float = None) -> T:
Gets the next item from the queue.
:param block: Block if the queue is empty. If False, exit immediately and
throw an AgentManagerQueue.Empty exception. (default = True)
:param timeout: Timeout for blocking get. If a positive float, wait timeout seconds
before throwing an AgentManagerQueue.Empty exception. (default = None)
Gets the next item from the queue, throwing an AgentManagerQueue.Empty exception
if the queue is empty.
return self._queue.get(block=block, timeout=timeout)
return self._queue.get_nowait()
except queue.Empty:
raise self.Empty("The AgentManagerQueue is empty.")

6
ml-agents/mlagents/trainers/ghost/trainer.py


# This ensures that even if the queue is being filled faster than it is
# being emptied, the trajectories in the queue are on-policy.
for _ in range(trajectory_queue.qsize()):
t = trajectory_queue.get(block=False)
t = trajectory_queue.get_nowait()
# adds to wrapped trainers queue
internal_trajectory_queue.put(t)
self._process_trajectory(t)

# Dump trajectories from non-learning policy
try:
for _ in range(trajectory_queue.qsize()):
t = trajectory_queue.get(block=False)
t = trajectory_queue.get_nowait()
# count ghost steps
self.ghost_step += len(t.steps)
except AgentManagerQueue.Empty:

for brain_name in self._internal_policy_queues:
internal_policy_queue = self._internal_policy_queues[brain_name]
try:
policy = cast(TFPolicy, internal_policy_queue.get(block=False))
policy = cast(TFPolicy, internal_policy_queue.get_nowait())
self.current_policy_snapshot[brain_name] = policy.get_weights()
except AgentManagerQueue.Empty:
pass

2
ml-agents/mlagents/trainers/tests/test_agent_processor.py


assert queue.empty()
queue.put(trajectory)
assert not queue.empty()
queue_traj = queue.get(block=False)
queue_traj = queue.get_nowait()
assert isinstance(queue_traj, Trajectory)
assert queue.empty()

2
ml-agents/mlagents/trainers/tests/test_ghost.py


trainer._swap_snapshots()
assert policy_queue0.empty() and not policy_queue1.empty()
# clear
policy_queue1.get(block=False)
policy_queue1.get_nowait()
mock_brain = mb.setup_mock_brain(
False,

6
ml-agents/mlagents/trainers/tests/test_rl_trainer.py


trajectory_queue.put(trajectory)
trainer.advance()
policy_queue.get(block=False)
policy_queue.get_nowait()
# Check that get_step is correct
assert trainer.get_step == time_horizon
# Check that we can turn off the trainer and that the buffer is cleared

# Check that there is stuff in the policy queue
policy_queue.get(block=False)
policy_queue.get_nowait()
# Check that if the policy doesn't update, we don't push it to the queue
trainer.set_is_policy_updating(False)

# Check that there nothing in the policy queue
with pytest.raises(AgentManagerQueue.Empty):
policy_queue.get(block=False)
policy_queue.get_nowait()
# Check that the buffer has been cleared
assert not trainer.should_still_train

4
ml-agents/mlagents/trainers/tests/test_sac.py


)
# Make sure there is a policy on the queue
policy_queue.get(block=False)
policy_queue.get_nowait()
# Add another trajectory. Since this is less than 20 steps total (enough for)
# two updates, there should NOT be a policy on the queue.

trajectory_queue.put(trajectory)
trainer.advance()
with pytest.raises(AgentManagerQueue.Empty):
policy_queue.get(block=False)
policy_queue.get_nowait()
def test_bad_config(dummy_config):

2
ml-agents/mlagents/trainers/trainer/rl_trainer.py


# being emptied, the trajectories in the queue are on-policy.
for _ in range(traj_queue.qsize()):
try:
t = traj_queue.get(block=False)
t = traj_queue.get_nowait()
self._process_trajectory(t)
except AgentManagerQueue.Empty:
break

正在加载...
取消
保存