using Google.Protobuf; using System.Net.Sockets; using UnityEngine; using MLAgents.CommunicatorObjects; using System.Threading.Tasks; #if UNITY_EDITOR using UnityEditor; #endif namespace MLAgents { public class SocketCommunicator : ICommunicator { private const float k_TimeOut = 10f; private const int k_MessageLength = 12000; byte[] m_MessageHolder = new byte[k_MessageLength]; int m_ComPort; Socket m_Sender; byte[] m_LengthHolder = new byte[4]; CommunicatorParameters m_CommunicatorParameters; public SocketCommunicator(CommunicatorParameters communicatorParameters) { m_CommunicatorParameters = communicatorParameters; } /// /// Initialize the communicator by sending the first UnityOutput and receiving the /// first UnityInput. The second UnityInput is stored in the unityInput argument. /// /// The first Unity Input. /// The first Unity Output. /// The second Unity input. public UnityInput Initialize(UnityOutput unityOutput, out UnityInput unityInput) { m_Sender = new Socket( AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); m_Sender.Connect("localhost", m_CommunicatorParameters.port); var initializationInput = UnityMessage.Parser.ParseFrom(Receive()); Send(WrapMessage(unityOutput, 200).ToByteArray()); unityInput = UnityMessage.Parser.ParseFrom(Receive()).UnityInput; #if UNITY_EDITOR #if UNITY_2017_2_OR_NEWER EditorApplication.playModeStateChanged += HandleOnPlayModeChanged; #else EditorApplication.playmodeStateChanged += HandleOnPlayModeChanged; #endif #endif return initializationInput.UnityInput; } /// /// Uses the socke to receive a byte[] from External. Reassemble a message that was split /// by External if it was too long. /// /// The byte[] sent by External. byte[] Receive() { m_Sender.Receive(m_LengthHolder); var totalLength = System.BitConverter.ToInt32(m_LengthHolder, 0); var location = 0; var result = new byte[totalLength]; while (location != totalLength) { var fragment = m_Sender.Receive(m_MessageHolder); System.Buffer.BlockCopy( m_MessageHolder, 0, result, location, fragment); location += fragment; } return result; } /// /// Send the specified input via socket to External. Split the message into smaller /// parts if it is too long. /// /// The byte[] to be sent. void Send(byte[] input) { var newArray = new byte[input.Length + 4]; input.CopyTo(newArray, 4); System.BitConverter.GetBytes(input.Length).CopyTo(newArray, 0); m_Sender.Send(newArray); } /// /// Close the communicator gracefully on both sides of the communication. /// public void Close() { Send(WrapMessage(null, 400).ToByteArray()); } /// /// Send a UnityOutput and receives a UnityInput. /// /// The next UnityInput. /// The UnityOutput to be sent. public UnityInput Exchange(UnityOutput unityOutput) { Send(WrapMessage(unityOutput, 200).ToByteArray()); byte[] received = null; var task = Task.Run(() => received = Receive()); if (!task.Wait(System.TimeSpan.FromSeconds(k_TimeOut))) { throw new UnityAgentsException( "The communicator took too long to respond."); } var message = UnityMessage.Parser.ParseFrom(received); if (message.Header.Status != 200) { return null; } return message.UnityInput; } /// /// Wraps the UnityOuptut into a message with the appropriate status. /// /// The UnityMessage corresponding. /// The UnityOutput to be wrapped. /// The status of the message. private static UnityMessage WrapMessage(UnityOutput content, int status) { return new UnityMessage { Header = new Header { Status = status }, UnityOutput = content }; } /// /// When the Unity application quits, the communicator must be closed /// private void OnApplicationQuit() { Close(); } #if UNITY_EDITOR #if UNITY_2017_2_OR_NEWER /// /// When the editor exits, the communicator must be closed /// /// State. private void HandleOnPlayModeChanged(PlayModeStateChange state) { // This method is run whenever the playmode state is changed. if (state == PlayModeStateChange.ExitingPlayMode) { Close(); } } #else /// /// When the editor exits, the communicator must be closed /// private void HandleOnPlayModeChanged() { // This method is run whenever the playmode state is changed. if (!EditorApplication.isPlayingOrWillChangePlaymode) { Close(); } } #endif #endif } }