# if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX using Grpc.Core; #endif #if UNITY_EDITOR using UnityEditor; #endif using System; using System.Collections.Generic; using System.Linq; using UnityEngine; using MLAgents.CommunicatorObjects; using System.IO; using Google.Protobuf; using MLAgents.Sensor; namespace MLAgents { /// Responsible for communication with External using gRPC. public class RpcCommunicator : ICommunicator { public struct IdCallbackPair { public int AgentId; public Action Callback; } public event QuitCommandHandler QuitCommandReceived; public event ResetCommandHandler ResetCommandReceived; /// If true, the communication is active. bool m_IsOpen; /// The default number of agents in the scene const int k_NumAgents = 32; List m_BehaviorNames = new List(); bool m_NeedCommunicateThisStep; WriteAdapter m_WriteAdapter = new WriteAdapter(); Dictionary m_SensorShapeValidators = new Dictionary(); Dictionary> m_ActionCallbacks = new Dictionary>(); /// The current UnityRLOutput to be sent when all the brains queried the communicator UnityRLOutputProto m_CurrentUnityRlOutput = new UnityRLOutputProto(); Dictionary> m_LastActionsReceived = new Dictionary>(); // Brains that we have sent over the communicator with agents. HashSet m_SentBrainKeys = new HashSet(); Dictionary m_UnsentBrainKeys = new Dictionary(); # if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX /// The Unity to External client. UnityToExternalProto.UnityToExternalProtoClient m_Client; #endif /// The communicator parameters sent at construction CommunicatorInitParameters m_CommunicatorInitParameters; Dictionary m_SideChannels = new Dictionary(); /// /// Initializes a new instance of the RPCCommunicator class. /// /// Communicator parameters. public RpcCommunicator(CommunicatorInitParameters communicatorInitParameters) { m_CommunicatorInitParameters = communicatorInitParameters; } #region Initialization /// /// Sends the initialization parameters through the Communicator. /// Is used by the academy to send initialization parameters to the communicator. /// /// The External Initialization Parameters received. /// The Unity Initialization Parameters to be sent. public UnityRLInitParameters Initialize(CommunicatorInitParameters initParameters) { var academyParameters = new UnityRLInitializationOutputProto { Name = initParameters.name, Version = initParameters.version }; UnityInputProto input; UnityInputProto initializationInput; try { initializationInput = Initialize( new UnityOutputProto { RlInitializationOutput = academyParameters }, out input); } catch { var exceptionMessage = "The Communicator was unable to connect. Please make sure the External " + "process is ready to accept communication with Unity."; // Check for common error condition and add details to the exception message. var httpProxy = Environment.GetEnvironmentVariable("HTTP_PROXY"); var httpsProxy = Environment.GetEnvironmentVariable("HTTPS_PROXY"); if (httpProxy != null || httpsProxy != null) { exceptionMessage += " Try removing HTTP_PROXY and HTTPS_PROXY from the" + "environment variables and try again."; } throw new UnityAgentsException(exceptionMessage); } UpdateEnvironmentWithInput(input.RlInput); return initializationInput.RlInitializationInput.ToUnityRLInitParameters(); } /// /// Adds the brain to the list of brains which will be sending information to External. /// /// Brain key. /// Brain parameters needed to send to the trainer. public void SubscribeBrain(string brainKey, BrainParameters brainParameters) { if (m_BehaviorNames.Contains(brainKey)) { return; } m_BehaviorNames.Add(brainKey); m_CurrentUnityRlOutput.AgentInfos.Add( brainKey, new UnityRLOutputProto.Types.ListAgentInfoProto() ); CacheBrainParameters(brainKey, brainParameters); } void UpdateEnvironmentWithInput(UnityRLInputProto rlInput) { ProcessSideChannelData(m_SideChannels, rlInput.SideChannel.ToArray()); SendCommandEvent(rlInput.Command); } UnityInputProto Initialize(UnityOutputProto unityOutput, out UnityInputProto unityInput) { # if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX m_IsOpen = true; var channel = new Channel( "localhost:" + m_CommunicatorInitParameters.port, ChannelCredentials.Insecure); m_Client = new UnityToExternalProto.UnityToExternalProtoClient(channel); var result = m_Client.Exchange(WrapMessage(unityOutput, 200)); unityInput = m_Client.Exchange(WrapMessage(null, 200)).UnityInput; #if UNITY_EDITOR EditorApplication.playModeStateChanged += HandleOnPlayModeChanged; #endif return result.UnityInput; #else throw new UnityAgentsException( "You cannot perform training on this platform."); #endif } #endregion #region Destruction /// /// Close the communicator gracefully on both sides of the communication. /// public void Dispose() { # if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX if (!m_IsOpen) { return; } try { m_Client.Exchange(WrapMessage(null, 400)); m_IsOpen = false; } catch { // ignored } #else throw new UnityAgentsException( "You cannot perform training on this platform."); #endif } #endregion #region Sending Events void SendCommandEvent(CommandProto command) { switch (command) { case CommandProto.Quit: { QuitCommandReceived?.Invoke(); return; } case CommandProto.Reset: { foreach (var brainName in m_ActionCallbacks.Keys) { m_ActionCallbacks[brainName].Clear(); } ResetCommandReceived?.Invoke(); return; } default: { return; } } } #endregion #region Sending and retreiving data public void DecideBatch() { if (!m_NeedCommunicateThisStep) { return; } m_NeedCommunicateThisStep = false; SendBatchedMessageHelper(); } /// /// Sends the observations of one Agent. /// /// Batch Key. /// Agent info. public void PutObservations(string brainKey, AgentInfo info, List sensors, Action action) { # if DEBUG if (!m_SensorShapeValidators.ContainsKey(brainKey)) { m_SensorShapeValidators[brainKey] = new SensorShapeValidator(); } m_SensorShapeValidators[brainKey].ValidateSensors(sensors); #endif using (TimerStack.Instance.Scoped("AgentInfo.ToProto")) { var agentInfoProto = info.ToAgentInfoProto(); using (TimerStack.Instance.Scoped("GenerateSensorData")) { foreach (var sensor in sensors) { var obsProto = sensor.GetObservationProto(m_WriteAdapter); agentInfoProto.Observations.Add(obsProto); } } m_CurrentUnityRlOutput.AgentInfos[brainKey].Value.Add(agentInfoProto); } m_NeedCommunicateThisStep = true; if (!m_ActionCallbacks.ContainsKey(brainKey)) { m_ActionCallbacks[brainKey] = new List(); } m_ActionCallbacks[brainKey].Add(new IdCallbackPair { AgentId = info.episodeId, Callback = action }); } /// /// Helper method that sends the current UnityRLOutput, receives the next UnityInput and /// Applies the appropriate AgentAction to the agents. /// void SendBatchedMessageHelper() { var message = new UnityOutputProto { RlOutput = m_CurrentUnityRlOutput, }; var tempUnityRlInitializationOutput = GetTempUnityRlInitializationOutput(); if (tempUnityRlInitializationOutput != null) { message.RlInitializationOutput = tempUnityRlInitializationOutput; } byte[] messageAggregated = GetSideChannelMessage(m_SideChannels); message.RlOutput.SideChannel = ByteString.CopyFrom(messageAggregated); var input = Exchange(message); UpdateSentBrainParameters(tempUnityRlInitializationOutput); foreach (var k in m_CurrentUnityRlOutput.AgentInfos.Keys) { m_CurrentUnityRlOutput.AgentInfos[k].Value.Clear(); } var rlInput = input?.RlInput; if (rlInput?.AgentActions == null) { return; } UpdateEnvironmentWithInput(rlInput); m_LastActionsReceived.Clear(); foreach (var brainName in rlInput.AgentActions.Keys) { if (!m_ActionCallbacks[brainName].Any()) { continue; } if (!rlInput.AgentActions[brainName].Value.Any()) { continue; } var agentActions = rlInput.AgentActions[brainName].ToAgentActionList(); var numAgents = m_ActionCallbacks[brainName].Count; var agentActionDict = new Dictionary(numAgents); m_LastActionsReceived[brainName] = agentActionDict; for (var i = 0; i < numAgents; i++) { var agentAction = agentActions[i]; var agentId = m_ActionCallbacks[brainName][i].AgentId; agentActionDict[agentId] = agentAction; m_ActionCallbacks[brainName][i].Callback.Invoke(agentAction); } } foreach (var brainName in m_ActionCallbacks.Keys) { m_ActionCallbacks[brainName].Clear(); } } public Dictionary GetActions(string key) { return m_LastActionsReceived[key]; } /// /// Send a UnityOutput and receives a UnityInput. /// /// The next UnityInput. /// The UnityOutput to be sent. UnityInputProto Exchange(UnityOutputProto unityOutput) { # if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX if (!m_IsOpen) { return null; } try { var message = m_Client.Exchange(WrapMessage(unityOutput, 200)); if (message.Header.Status == 200) { return message.UnityInput; } m_IsOpen = false; // Not sure if the quit command is actually sent when a // non 200 message is received. Notify that we are indeed // quitting. QuitCommandReceived?.Invoke(); return message.UnityInput; } catch { m_IsOpen = false; QuitCommandReceived?.Invoke(); return null; } #else throw new UnityAgentsException( "You cannot perform training on this platform."); #endif } /// /// Wraps the UnityOuptut into a message with the appropriate status. /// /// The UnityMessage corresponding. /// The UnityOutput to be wrapped. /// The status of the message. static UnityMessageProto WrapMessage(UnityOutputProto content, int status) { return new UnityMessageProto { Header = new HeaderProto { Status = status }, UnityOutput = content }; } void CacheBrainParameters(string brainKey, BrainParameters brainParameters) { if (m_SentBrainKeys.Contains(brainKey)) { return; } // TODO We should check that if m_unsentBrainKeys has brainKey, it equals brainParameters m_UnsentBrainKeys[brainKey] = brainParameters; } UnityRLInitializationOutputProto GetTempUnityRlInitializationOutput() { UnityRLInitializationOutputProto output = null; foreach (var brainKey in m_UnsentBrainKeys.Keys) { if (m_CurrentUnityRlOutput.AgentInfos.ContainsKey(brainKey)) { if (output == null) { output = new UnityRLInitializationOutputProto(); } var brainParameters = m_UnsentBrainKeys[brainKey]; output.BrainParameters.Add(brainParameters.ToProto(brainKey, true)); } } return output; } void UpdateSentBrainParameters(UnityRLInitializationOutputProto output) { if (output == null) { return; } foreach (var brainProto in output.BrainParameters) { m_SentBrainKeys.Add(brainProto.BrainName); m_UnsentBrainKeys.Remove(brainProto.BrainName); } } #endregion #region Handling side channels /// /// Registers a side channel to the communicator. The side channel will exchange /// messages with its Python equivalent. /// /// The side channel to be registered. public void RegisterSideChannel(SideChannel sideChannel) { if (m_SideChannels.ContainsKey(sideChannel.ChannelType())) { throw new UnityAgentsException(string.Format( "A side channel with type index {} is already registered. You cannot register multiple " + "side channels of the same type.")); } m_SideChannels.Add(sideChannel.ChannelType(), sideChannel); } /// /// Grabs the messages that the registered side channels will send to Python at the current step /// into a singe byte array. /// /// A dictionary of channel type to channel. /// public static byte[] GetSideChannelMessage(Dictionary sideChannels) { using (var memStream = new MemoryStream()) { using (var binaryWriter = new BinaryWriter(memStream)) { foreach (var sideChannel in sideChannels.Values) { var messageList = sideChannel.MessageQueue; foreach (var message in messageList) { binaryWriter.Write(sideChannel.ChannelType()); binaryWriter.Write(message.Count()); binaryWriter.Write(message); } sideChannel.MessageQueue.Clear(); } return memStream.ToArray(); } } } /// /// Separates the data received from Python into individual messages for each registered side channel. /// /// A dictionary of channel type to channel. /// The byte array of data received from Python. public static void ProcessSideChannelData(Dictionary sideChannels, byte[] dataReceived) { if (dataReceived.Length == 0) { return; } using (var memStream = new MemoryStream(dataReceived)) { using (var binaryReader = new BinaryReader(memStream)) { while (memStream.Position < memStream.Length) { int channelType = 0; byte[] message = null; try { channelType = binaryReader.ReadInt32(); var messageLength = binaryReader.ReadInt32(); message = binaryReader.ReadBytes(messageLength); } catch (Exception ex) { throw new UnityAgentsException( "There was a problem reading a message in a SideChannel. Please make sure the " + "version of MLAgents in Unity is compatible with the Python version. Original error : " + ex.Message); } if (sideChannels.ContainsKey(channelType)) { sideChannels[channelType].OnMessageReceived(message); } else { Debug.Log(string.Format( "Unknown side channel data received. Channel type " + ": {0}", channelType)); } } } } } #endregion #if UNITY_EDITOR /// /// When the editor exits, the communicator must be closed /// /// State. void HandleOnPlayModeChanged(PlayModeStateChange state) { // This method is run whenever the playmode state is changed. if (state == PlayModeStateChange.ExitingPlayMode) { Dispose(); } } #endif } }