您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
111 行
3.9 KiB
111 行
3.9 KiB
import numpy as np
|
|
|
|
history_keys = ['states', 'observations', 'actions', 'rewards', 'action_probs', 'epsilons',
|
|
'value_estimates', 'advantages', 'discounted_returns']
|
|
|
|
|
|
def discount_rewards(r, gamma=0.99, value_next=0.0):
|
|
"""
|
|
Computes discounted sum of future rewards for use in updating value estimate.
|
|
:param r: List of rewards.
|
|
:param gamma: Discount factor.
|
|
:param value_next: T+1 value estimate for returns calculation.
|
|
:return: discounted sum of future rewards as list.
|
|
"""
|
|
discounted_r = np.zeros_like(r)
|
|
running_add = value_next
|
|
for t in reversed(range(0, r.size)):
|
|
running_add = running_add * gamma + r[t]
|
|
discounted_r[t] = running_add
|
|
return discounted_r
|
|
|
|
|
|
def get_gae(rewards, value_estimates, value_next=0.0, gamma=0.99, lambd=0.95):
|
|
"""
|
|
Computes generalized advantage estimate for use in updating policy.
|
|
:param rewards: list of rewards for time-steps t to T.
|
|
:param value_next: Value estimate for time-step T+1.
|
|
:param value_estimates: list of value estimates for time-steps t to T.
|
|
:param gamma: Discount factor.
|
|
:param lambd: GAE weighing factor.
|
|
:return: list of advantage estimates for time-steps t to T.
|
|
"""
|
|
value_estimates = np.asarray(value_estimates.tolist() + [value_next])
|
|
delta_t = rewards + gamma * value_estimates[1:] - value_estimates[:-1]
|
|
advantage = discount_rewards(r=delta_t, gamma=gamma*lambd)
|
|
return advantage
|
|
|
|
|
|
def empty_local_history(agent_dict):
|
|
"""
|
|
Empties the experience history for a single agent.
|
|
:param agent_dict: Dictionary of agent experience history.
|
|
:return: Emptied dictionary (except for cumulative_reward and episode_steps).
|
|
"""
|
|
for key in history_keys:
|
|
agent_dict[key] = []
|
|
return agent_dict
|
|
|
|
|
|
def vectorize_history(agent_dict):
|
|
"""
|
|
Converts dictionary of lists into dictionary of numpy arrays.
|
|
:param agent_dict: Dictionary of agent experience history.
|
|
:return: dictionary of numpy arrays.
|
|
"""
|
|
for key in history_keys:
|
|
agent_dict[key] = np.array(agent_dict[key])
|
|
return agent_dict
|
|
|
|
|
|
def empty_all_history(agent_info):
|
|
"""
|
|
Clears all agent histories and resets reward and episode length counters.
|
|
:param agent_info: a BrainInfo object.
|
|
:return: an emptied history dictionary.
|
|
"""
|
|
history_dict = {}
|
|
for agent in agent_info.agents:
|
|
history_dict[agent] = {}
|
|
history_dict[agent] = empty_local_history(history_dict[agent])
|
|
history_dict[agent]['cumulative_reward'] = 0
|
|
history_dict[agent]['episode_steps'] = 0
|
|
return history_dict
|
|
|
|
|
|
def append_history(global_buffer, local_buffer=None):
|
|
"""
|
|
Appends agent experience history to global history buffer.
|
|
:param global_buffer: Global buffer for all agents experiences.
|
|
:param local_buffer: Local history for individual agents experiences.
|
|
:return: Global buffer with new experiences added.
|
|
"""
|
|
for key in history_keys:
|
|
global_buffer[key] = np.concatenate([global_buffer[key], local_buffer[key]], axis=0)
|
|
return global_buffer
|
|
|
|
|
|
def set_history(global_buffer, local_buffer=None):
|
|
"""
|
|
Creates new global_buffer from existing local_buffer
|
|
:param global_buffer: Global buffer for all agents experiences.
|
|
:param local_buffer: Local history for individual agents experiences.
|
|
:return: Global buffer with new experiences.
|
|
"""
|
|
for key in history_keys:
|
|
global_buffer[key] = np.copy(local_buffer[key])
|
|
return global_buffer
|
|
|
|
|
|
def shuffle_buffer(global_buffer):
|
|
"""
|
|
Randomizes experiences in global_buffer
|
|
:param global_buffer: training_buffer to randomize.
|
|
:return: Randomized buffer
|
|
"""
|
|
s = np.arange(global_buffer[history_keys[2]].shape[0])
|
|
np.random.shuffle(s)
|
|
for key in history_keys:
|
|
if len(global_buffer[key]) > 0:
|
|
global_buffer[key] = global_buffer[key][s]
|
|
return global_buffer
|