|
|
|
|
|
|
from typing import List, Optional |
|
|
|
import os |
|
|
|
from typing import List |
|
|
|
import numpy as np |
|
|
|
from mlagents_envs.base_env import BehaviorSpec, ActionSpec |
|
|
|
from mlagents.trainers.trajectory import Trajectory, AgentExperience |
|
|
|
from mlagents_envs.base_env import ActionTuple, BehaviorSpec, ActionSpec |
|
|
|
from mlagents_envs.communicator_objects.agent_info_action_pair_pb2 import ( |
|
|
|
AgentInfoActionPairProto, |
|
|
|
) |
|
|
|
from mlagents_envs.rpc_utils import steps_from_proto |
|
|
|
|
|
|
|
|
|
|
|
DemonstrationExperience, |
|
|
|
DemonstrationTrajectory, |
|
|
|
) |
|
|
|
from mlagents.trainers.demonstrations.demonstration_proto_utils import ( |
|
|
|
load_demonstration, |
|
|
|
|
|
|
import os |
|
|
|
from typing import List, Tuple |
|
|
|
import numpy as np |
|
|
|
from mlagents.trainers.buffer import AgentBuffer |
|
|
|
from mlagents_envs.communicator_objects.agent_info_action_pair_pb2 import ( |
|
|
|
AgentInfoActionPairProto, |
|
|
|
) |
|
|
|
from mlagents.trainers.trajectory import ObsUtil |
|
|
|
from mlagents_envs.rpc_utils import behavior_spec_from_proto, steps_from_proto |
|
|
|
from mlagents_envs.base_env import BehaviorSpec, ActionTuple |
|
|
|
from mlagents_envs.communicator_objects.brain_parameters_pb2 import BrainParametersProto |
|
|
|
from mlagents_envs.communicator_objects.demonstration_meta_pb2 import ( |
|
|
|
DemonstrationMetaProto, |
|
|
|
) |
|
|
|
from mlagents_envs.timers import timed, hierarchical_timer |
|
|
|
from google.protobuf.internal.decoder import _DecodeVarint32 # type: ignore |
|
|
|
from google.protobuf.internal.encoder import _EncodeVarint # type: ignore |
|
|
|
|
|
|
|
|
|
|
|
class LocalDemonstrationProver(DemonstrationProvider): |
|
|
|
class LocalDemonstrationProvider(DemonstrationProvider): |
|
|
|
self._trajectories: List[Trajectory] = [] |
|
|
|
self._load(file_path) |
|
|
|
|
|
|
|
def get_behavior_spec(self) -> BehaviorSpec: |
|
|
|
pass |
|
|
|
|
|
|
|
def get_trajectories(self) -> List[Trajectory]: |
|
|
|
pass |
|
|
|
def _load(self, file_path: str) -> None: |
|
|
|
self._behavior_spec = behavior_spec |
|
|
|
self._info_action_pairs = info_action_pairs |
|
|
|
|
|
|
|
def get_behavior_spec(self) -> BehaviorSpec: |
|
|
|
return self._behavior_spec |
|
|
|
|
|
|
|
def pop_trajectories(self) -> List[DemonstrationTrajectory]: |
|
|
|
trajectories = LocalDemonstrationProvider._info_action_pairs_to_trajectories( |
|
|
|
self._behavior_spec, self._info_action_pairs |
|
|
|
) |
|
|
|
self._info_action_pairs = [] |
|
|
|
return trajectories |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _get_demo_files(path: str) -> List[str]: |
|
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _info_action_pairs_to_trajectories( |
|
|
|
behavior_spec: BehaviorSpec, |
|
|
|
info_action_pairs: List[AgentInfoActionPairProto] |
|
|
|
) -> List[Trajectory]: |
|
|
|
trajectories_out = [] |
|
|
|
behavior_spec: BehaviorSpec, info_action_pairs: List[AgentInfoActionPairProto] |
|
|
|
) -> List[DemonstrationTrajectory]: |
|
|
|
trajectories_out: List[DemonstrationTrajectory] = [] |
|
|
|
previous_action: np.zeros(behavior_spec.action_spec.continuous_size, dtype=np.float32) # TODO or discrete? |
|
|
|
for pair in info_action_pairs: |
|
|
|
obs = None # TODO |
|
|
|
action_tuple = LocalDemonstrationProver._get_action_tuple(pair, behavior_spec.action_spec) |
|
|
|
previous_action = np.zeros( |
|
|
|
behavior_spec.action_spec.continuous_size, dtype=np.float32 |
|
|
|
) # TODO or discrete? |
|
|
|
for pair_index, pair in enumerate(info_action_pairs): |
|
|
|
|
|
|
|
# Extract the observations from the decision/terminal steps |
|
|
|
current_decision_step, current_terminal_step = steps_from_proto( |
|
|
|
[pair.agent_info], behavior_spec |
|
|
|
) |
|
|
|
if len(current_terminal_step) == 1: |
|
|
|
obs = list(current_terminal_step.values())[0].obs |
|
|
|
else: |
|
|
|
obs = list(current_decision_step.values())[0].obs |
|
|
|
|
|
|
|
action_tuple = LocalDemonstrationProvider._get_action_tuple( |
|
|
|
pair, behavior_spec.action_spec |
|
|
|
) |
|
|
|
action_mask = np.ndarray([bool(m) for m in pair.agent_info.action_mask], dtype=np.bool) |
|
|
|
action_mask = np.ndarray( |
|
|
|
[bool(m) for m in pair.agent_info.action_mask], dtype=np.bool |
|
|
|
) |
|
|
|
exp = AgentExperience( |
|
|
|
exp = DemonstrationExperience( |
|
|
|
reward=pair.agent_info.reward, |
|
|
|
reward=pair.agent_info.reward, # TODO next step's reward? |
|
|
|
action_probs=None, |
|
|
|
action_mask=action_mask, |
|
|
|
action_mask=action_mask, |
|
|
|
memory=None, |
|
|
|
previous_action = np.ndarray(pair.action_info.vector_actions_deprecated, dtype=np.float32) |
|
|
|
if pair.agent_info.done: |
|
|
|
previous_action = np.ndarray( |
|
|
|
pair.action_info.vector_actions_deprecated, dtype=np.float32 |
|
|
|
) |
|
|
|
if pair.agent_info.done or pair_index == len(info_action_pairs) - 1: |
|
|
|
Trajectory(steps=current_experiences, ne) |
|
|
|
DemonstrationTrajectory(experiences=current_experiences) |
|
|
|
current_experiences = [] |
|
|
|
return trajectories_out |
|
|
|
def _get_action_tuple(pair: AgentInfoActionPairProto, action_spec: ActionSpec) -> ActionTuple: |
|
|
|
def _get_action_tuple( |
|
|
|
pair: AgentInfoActionPairProto, action_spec: ActionSpec |
|
|
|
) -> ActionTuple: |
|
|
|
continuous_actions = None |
|
|
|
discrete_actions = None |
|
|
|
|
|
|
|
|
|
|
discrete_actions = pair.action_info.discrete_actions |
|
|
|
|
|
|
|
# TODO 2D? |
|
|
|
continuous_np = np.ndarray(continuous_actions, dtype=np.float32) if continuous_actions else None |
|
|
|
discrete_np = np.ndarray(discrete_actions, dtype=np.float32) if discrete_actions else None |
|
|
|
continuous_np = ( |
|
|
|
np.ndarray(continuous_actions, dtype=np.float32) |
|
|
|
if continuous_actions |
|
|
|
else None |
|
|
|
) |
|
|
|
discrete_np = ( |
|
|
|
np.ndarray(discrete_actions, dtype=np.float32) if discrete_actions else None |
|
|
|
) |
|
|
|
return ActionTuple(continuous_np, discrete_np) |