# if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX using Grpc.Core; #endif #if UNITY_EDITOR using UnityEditor; #endif using System; using System.Collections.Generic; using System.Linq; using UnityEngine; using MLAgents.CommunicatorObjects; namespace MLAgents { /// Responsible for communication with External using gRPC. public class RpcCommunicator : ICommunicator { public event QuitCommandHandler QuitCommandReceived; public event ResetCommandHandler ResetCommandReceived; public event RLInputReceivedHandler RLInputReceived; /// If true, the communication is active. bool m_IsOpen; /// The default number of agents in the scene private const int k_NumAgents = 32; /// Keeps track of which brains have data to send on the current step Dictionary m_HasData = new Dictionary(); /// Keeps track of which brains queried the communicator on the current step Dictionary m_HasQueried = new Dictionary(); /// Keeps track of the agents of each brain on the current step Dictionary> m_CurrentAgents = new Dictionary>(); /// The current UnityRLOutput to be sent when all the brains queried the communicator UnityRLOutputProto m_CurrentUnityRlOutput = new UnityRLOutputProto(); Dictionary> m_LastActionsReceived = new Dictionary>(); private UnityRLInitializationOutputProto m_CurrentUnityRlInitializationOutput; # if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX /// The Unity to External client. UnityToExternalProto.UnityToExternalProtoClient m_Client; #endif /// The communicator parameters sent at construction CommunicatorInitParameters m_CommunicatorInitParameters; /// /// Initializes a new instance of the RPCCommunicator class. /// /// Communicator parameters. public RpcCommunicator(CommunicatorInitParameters communicatorInitParameters) { m_CommunicatorInitParameters = communicatorInitParameters; } #region Initialization /// /// Sends the initialization parameters through the Communicator. /// Is used by the academy to send initialization parameters to the communicator. /// /// The External Initialization Parameters received. /// The Unity Initialization Parameters to be sent. public UnityRLInitParameters Initialize(CommunicatorInitParameters initParameters) { var academyParameters = new UnityRLInitializationOutputProto { Name = initParameters.name, Version = initParameters.version }; academyParameters.EnvironmentParameters = new EnvironmentParametersProto(); var resetParameters = initParameters.environmentResetParameters.resetParameters; foreach (var key in resetParameters.Keys) { academyParameters.EnvironmentParameters.FloatParameters.Add(key, resetParameters[key]); } UnityInputProto input; UnityInputProto initializationInput; try { initializationInput = Initialize( new UnityOutputProto { RlInitializationOutput = academyParameters }, out input); } catch { var exceptionMessage = "The Communicator was unable to connect. Please make sure the External " + "process is ready to accept communication with Unity."; // Check for common error condition and add details to the exception message. var httpProxy = Environment.GetEnvironmentVariable("HTTP_PROXY"); var httpsProxy = Environment.GetEnvironmentVariable("HTTPS_PROXY"); if (httpProxy != null || httpsProxy != null) { exceptionMessage += " Try removing HTTP_PROXY and HTTPS_PROXY from the" + "environment variables and try again."; } throw new UnityAgentsException(exceptionMessage); } UpdateEnvironmentWithInput(input.RlInput); return initializationInput.RlInitializationInput.ToUnityRLInitParameters(); } /// /// Adds the brain to the list of brains which will be sending information to External. /// /// Brain key. /// Brain parameters needed to send to the trainer. public void SubscribeBrain(string brainKey, BrainParameters brainParameters) { m_HasQueried[brainKey] = false; m_HasData[brainKey] = false; m_CurrentAgents[brainKey] = new List(k_NumAgents); m_CurrentUnityRlOutput.AgentInfos.Add( brainKey, new CommunicatorObjects.UnityRLOutputProto.Types.ListAgentInfoProto()); if (m_CurrentUnityRlInitializationOutput == null) { m_CurrentUnityRlInitializationOutput = new UnityRLInitializationOutputProto(); } m_CurrentUnityRlInitializationOutput.BrainParameters.Add(brainParameters.ToProto(brainKey, true)); } void UpdateEnvironmentWithInput(UnityRLInputProto rlInput) { SendRLInputReceivedEvent(rlInput.IsTraining); SendCommandEvent(rlInput.Command, rlInput.EnvironmentParameters); } private UnityInputProto Initialize(UnityOutputProto unityOutput, out UnityInputProto unityInput) { # if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX m_IsOpen = true; var channel = new Channel( "localhost:" + m_CommunicatorInitParameters.port, ChannelCredentials.Insecure); m_Client = new UnityToExternalProto.UnityToExternalProtoClient(channel); var result = m_Client.Exchange(WrapMessage(unityOutput, 200)); unityInput = m_Client.Exchange(WrapMessage(null, 200)).UnityInput; #if UNITY_EDITOR #if UNITY_2017_2_OR_NEWER EditorApplication.playModeStateChanged += HandleOnPlayModeChanged; #else EditorApplication.playmodeStateChanged += HandleOnPlayModeChanged; #endif #endif return result.UnityInput; #else throw new UnityAgentsException( "You cannot perform training on this platform."); #endif } #endregion #region Destruction /// /// Close the communicator gracefully on both sides of the communication. /// public void Dispose() { # if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX if (!m_IsOpen) { return; } try { m_Client.Exchange(WrapMessage(null, 400)); m_IsOpen = false; } catch { // ignored } #else throw new UnityAgentsException( "You cannot perform training on this platform."); #endif } #endregion #region Sending Events private void SendCommandEvent(CommandProto command, EnvironmentParametersProto environmentParametersProto) { switch (command) { case CommandProto.Quit: { QuitCommandReceived?.Invoke(); return; } case CommandProto.Reset: { ResetCommandReceived?.Invoke(environmentParametersProto.ToEnvironmentResetParameters()); return; } default: { return; } } } private void SendRLInputReceivedEvent(bool isTraining) { RLInputReceived?.Invoke(new UnityRLInputParameters { isTraining = isTraining }); } #endregion #region Sending and retreiving data /// /// Sends the observations. If at least one brain has an agent in need of /// a decision or if the academy is done, the data is sent via /// Communicator. Else, a new step is realized. The data can only be /// sent once all the brains that were part of initialization have tried /// to send information. /// /// Batch Key. /// Agent info. public void PutObservations(string brainKey, ICollection agents) { // The brain tried called GiveBrainInfo, update m_hasQueried m_HasQueried[brainKey] = true; // Populate the currentAgents dictionary m_CurrentAgents[brainKey].Clear(); foreach (var agent in agents) { m_CurrentAgents[brainKey].Add(agent); } // If at least one agent has data to send, then append data to // the message and update hasSentState if (m_CurrentAgents[brainKey].Count > 0) { foreach (var agent in m_CurrentAgents[brainKey]) { var agentInfoProto = agent.Info.ToProto(); m_CurrentUnityRlOutput.AgentInfos[brainKey].Value.Add(agentInfoProto); // Avoid visual obs memory leak. This should be called AFTER we are done with the visual obs. // e.g. after recording them to demo and using them for inference. agent.ClearVisualObservations(); } m_HasData[brainKey] = true; } // If any agent needs to send data, then the whole message // must be sent if (m_HasQueried.Values.All(x => x)) { if (m_HasData.Values.Any(x => x)) { SendBatchedMessageHelper(); } // The message was just sent so we must reset hasSentState and // triedSendState foreach (var k in m_CurrentAgents.Keys) { m_HasData[k] = false; m_HasQueried[k] = false; } } } /// /// Helper method that sends the current UnityRLOutput, receives the next UnityInput and /// Applies the appropriate AgentAction to the agents. /// void SendBatchedMessageHelper() { var message = new UnityOutputProto { RlOutput = m_CurrentUnityRlOutput, }; if (m_CurrentUnityRlInitializationOutput != null) { message.RlInitializationOutput = m_CurrentUnityRlInitializationOutput; } var input = Exchange(message); m_CurrentUnityRlInitializationOutput = null; foreach (var k in m_CurrentUnityRlOutput.AgentInfos.Keys) { m_CurrentUnityRlOutput.AgentInfos[k].Value.Clear(); } var rlInput = input?.RlInput; if (rlInput?.AgentActions == null) { return; } UpdateEnvironmentWithInput(rlInput); m_LastActionsReceived.Clear(); foreach (var brainName in rlInput.AgentActions.Keys) { if (!m_CurrentAgents[brainName].Any()) { continue; } if (!rlInput.AgentActions[brainName].Value.Any()) { continue; } var agentActions = rlInput.AgentActions[brainName].ToAgentActionList(); var numAgents = m_CurrentAgents[brainName].Count; var agentActionDict = new Dictionary(numAgents); m_LastActionsReceived[brainName] = agentActionDict; for (var i = 0; i < numAgents; i++) { var agent = m_CurrentAgents[brainName][i]; var agentAction = agentActions[i]; agentActionDict[agent] = agentAction; agent.UpdateAgentAction(agentAction); } } } public Dictionary GetActions(string key) { return m_LastActionsReceived[key]; } /// /// Send a UnityOutput and receives a UnityInput. /// /// The next UnityInput. /// The UnityOutput to be sent. private UnityInputProto Exchange(UnityOutputProto unityOutput) { # if UNITY_EDITOR || UNITY_STANDALONE_WIN || UNITY_STANDALONE_OSX || UNITY_STANDALONE_LINUX if (!m_IsOpen) { return null; } try { var message = m_Client.Exchange(WrapMessage(unityOutput, 200)); if (message.Header.Status == 200) { return message.UnityInput; } m_IsOpen = false; // Not sure if the quit command is actually sent when a // non 200 message is received. Notify that we are indeed // quitting. QuitCommandReceived?.Invoke(); return message.UnityInput; } catch { m_IsOpen = false; QuitCommandReceived?.Invoke(); return null; } #else throw new UnityAgentsException( "You cannot perform training on this platform."); #endif } /// /// Wraps the UnityOuptut into a message with the appropriate status. /// /// The UnityMessage corresponding. /// The UnityOutput to be wrapped. /// The status of the message. private static UnityMessageProto WrapMessage(UnityOutputProto content, int status) { return new UnityMessageProto { Header = new HeaderProto { Status = status }, UnityOutput = content }; } #endregion #if UNITY_EDITOR #if UNITY_2017_2_OR_NEWER /// /// When the editor exits, the communicator must be closed /// /// State. private void HandleOnPlayModeChanged(PlayModeStateChange state) { // This method is run whenever the playmode state is changed. if (state == PlayModeStateChange.ExitingPlayMode) { Dispose(); } } #else /// /// When the editor exits, the communicator must be closed /// private void HandleOnPlayModeChanged() { // This method is run whenever the playmode state is changed. if (!EditorApplication.isPlayingOrWillChangePlaymode) { Close(); } } #endif #endif } }