浏览代码

Create a message system to handle more than just collab data.

/3.1
Tim Mowrer 5 年前
当前提交
a1c2fe9b
共有 13 个文件被更改,包括 447 次插入64 次删除
  1. 151
      Assets/Scenes/ARCollaborationData/TCPConnection.cs
  2. 4
      Assets/Scenes/ARCollaborationData/IMessage.cs
  3. 11
      Assets/Scenes/ARCollaborationData/IMessage.cs.meta
  4. 23
      Assets/Scenes/ARCollaborationData/MessageHeader.cs
  5. 11
      Assets/Scenes/ARCollaborationData/MessageHeader.cs.meta
  6. 6
      Assets/Scenes/ARCollaborationData/MessageType.cs
  7. 11
      Assets/Scenes/ARCollaborationData/MessageType.cs.meta
  8. 102
      Assets/Scenes/ARCollaborationData/NetworkBuffer.cs
  9. 11
      Assets/Scenes/ARCollaborationData/NetworkBuffer.cs.meta
  10. 84
      Assets/Scenes/ARCollaborationData/NetworkDataDecoder.cs
  11. 11
      Assets/Scenes/ARCollaborationData/NetworkDataDecoder.cs.meta
  12. 75
      Assets/Scenes/ARCollaborationData/NetworkDataEncoder.cs
  13. 11
      Assets/Scenes/ARCollaborationData/NetworkDataEncoder.cs.meta

151
Assets/Scenes/ARCollaborationData/TCPConnection.cs


if (collaborationData.valid)
{
// Serialize the collaboration data to a byte array
SerializedARCollaborationData serializedData;
using (var serializedData = collaborationData.ToSerialized())
SendData(stream, serializedData.bytes);
// ARCollaborationData can be diposed after being serialized to bytes.
serializedData = collaborationData.ToSerialized();
}
using (serializedData)
{
// Get the raw data as a NativeSlice
var collaborationBytes = serializedData.bytes;
// Construct the message header
var header = new MessageHeader
{
messageSize = collaborationBytes.Length,
messageType = MessageType.CollaborationData
};
// Send the header followed by the ARCollaborationData bytes
m_WriteBuffer.Send(stream, header);
m_WriteBuffer.Send(stream, collaborationBytes);
}
}

var stream = m_TcpClient.GetStream();
while (!m_ExitRequested)
{
if (stream.DataAvailable)
// Loop until there is data available
if (!stream.DataAvailable)
var lengthBytes = ReadBytes(stream, sizeof(int));
int expectedLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(lengthBytes, 0));
Thread.Sleep(1);
continue;
}
if (expectedLength <= 0)
{
Logger.Log($"Warning: received data of length {expectedLength}. Ignoring.");
}
else
{
// Read incomming stream into byte arrary.
var collaborationBytes = ReadBytes(stream, expectedLength);
var collaborationData = new ARCollaborationData(collaborationBytes);
// Read the header
var messageHeader = ReadMessageHeader(stream);
// Handle the message
switch (messageHeader.messageType)
{
case MessageType.CollaborationData:
var collaborationData = ReadCollaborationData(stream, messageHeader.messageSize);
lock (m_CollaborationDataReadQueue)
{
m_CollaborationDataReadQueue.Enqueue(collaborationData);
}
// Only log critical data updates; optional updates can come every frame.
// Only store critical data updates; optional updates can come every frame.
Logger.Log($"Received {expectedLength} bytes from remote host.");
lock (m_CollaborationDataReadQueue)
{
m_CollaborationDataReadQueue.Enqueue(collaborationData);
}
Logger.Log($"Received {messageHeader.messageSize} bytes from remote host.");
Logger.Log($"Received {expectedLength} bytes from remote host, but the collaboration data was not valid.");
Logger.Log($"Received {messageHeader.messageSize} bytes from remote host, but the collaboration data was not valid.");
}
break;
default:
Logger.Log($"Unhandled message type {messageHeader.messageType}");
// We don't understand this message, but read it out anyway
// so we can process the next message
int bytesRemaining = messageHeader.messageSize;
while (bytesRemaining > 0)
{
m_ReadBuffer.Read(stream, 0, Mathf.Min(bytesRemaining, m_ReadBuffer.buffer.Length));
}
break;
Thread.Sleep(1);
// Check for new data and queue it
if (subsystem.collaborationDataCount > 0)
// Exit if no new data is available
if (subsystem.collaborationDataCount == 0)
return;
lock (m_CollaborationDataSendQueue)
CollaborationNetworkingIndicator.NotifyHasCollaborationData();
lock (m_CollaborationDataSendQueue)
// Enqueue all new collaboration data with critical priority
while (subsystem.collaborationDataCount > 0)
while (subsystem.collaborationDataCount > 0)
{
var collaborationData = subsystem.DequeueCollaborationData();
var collaborationData = subsystem.DequeueCollaborationData();
// As all data in this sample is sent over TCP, only send critical data
if (collaborationData.priority == ARCollaborationDataPriority.Critical)
{
m_CollaborationDataSendQueue.Enqueue(collaborationData);
}
// As all data in this sample is sent over TCP, only send critical data
if (collaborationData.priority == ARCollaborationDataPriority.Critical)
{
m_CollaborationDataSendQueue.Enqueue(collaborationData);
CollaborationNetworkingIndicator.NotifyHasCollaborationData();
}
}
}

{
using (var collaborationData = m_CollaborationDataReadQueue.Dequeue())
{
CollaborationNetworkingIndicator.NotifyIncomingDataReceived();
// Assume we only put in valid collaboration data into the queue.
subsystem.UpdateWithCollaborationData(collaborationData);
}

static byte[] ReadBytes(NetworkStream stream, int count)
{
var bytes = new byte[count];
int bytesRemaining = count;
int offset = 0;
const int k_BufferSize = 10240;
NetworkBuffer m_ReadBuffer = new NetworkBuffer(k_BufferSize);
while (bytesRemaining > 0)
{
int bytesRead = stream.Read(bytes, offset, bytesRemaining);
offset += bytesRead;
bytesRemaining -= bytesRead;
}
NetworkBuffer m_WriteBuffer = new NetworkBuffer(k_BufferSize);
return bytes;
MessageHeader ReadMessageHeader(NetworkStream stream)
{
int bytesRead = m_ReadBuffer.Read(stream, 0, MessageHeader.k_EncodedSize);
return new MessageHeader(m_ReadBuffer.buffer, bytesRead);
/// <summary>
/// Send message to other device using socket connection.
/// </summary>
static void SendData(NetworkStream stream, NativeArray<byte> bytes)
ARCollaborationData ReadCollaborationData(NetworkStream stream, int size)
var builder = new ARCollaborationDataBuilder();
var byteArray = bytes.ToArray();
int length = IPAddress.HostToNetworkOrder(byteArray.Length);
var lengthBytes = BitConverter.GetBytes(length);
stream.Write(lengthBytes, 0, lengthBytes.Length);
stream.Write(byteArray, 0, byteArray.Length);
CollaborationNetworkingIndicator.NotifyOutgoingDataSent();
int bytesRemaining = size;
while (bytesRemaining > 0)
{
int bytesRead = m_ReadBuffer.Read(stream, 0, Mathf.Min(bytesRemaining, m_ReadBuffer.buffer.Length));
builder.Append(m_ReadBuffer.buffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
Logger.Log($"Sent {byteArray.Length} bytes to remote.");
return builder.ToCollaborationData();
catch (SocketException socketException)
finally
Logger.Log("Socket exception: " + socketException);
builder.Dispose();
}
}
#endif

4
Assets/Scenes/ARCollaborationData/IMessage.cs


public interface IMessage
{
int EncodeTo(byte[] bytes);
}

11
Assets/Scenes/ARCollaborationData/IMessage.cs.meta


fileFormatVersion: 2
guid: 48716d5221a70425aa3409ef5a24a10d
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

23
Assets/Scenes/ARCollaborationData/MessageHeader.cs


public struct MessageHeader : IMessage
{
public int messageSize;
public MessageType messageType;
public int EncodeTo(byte[] bytes)
{
var encoder = new NetworkDataEncoder(bytes);
encoder.Encode(messageSize);
encoder.Encode((byte)messageType);
return encoder.length;
}
public const int k_EncodedSize = sizeof(int) + sizeof(byte);
public MessageHeader(byte[] bytes, int size)
{
var decoder = new NetworkDataDecoder(bytes, size);
messageSize = decoder.DecodeInt();
messageType = (MessageType)decoder.DecodeByte();
}
}

11
Assets/Scenes/ARCollaborationData/MessageHeader.cs.meta


fileFormatVersion: 2
guid: 392beb324d6f3400ab0b5ffe1ae43b70
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

6
Assets/Scenes/ARCollaborationData/MessageType.cs


public enum MessageType : byte
{
None,
CollaborationData
}

11
Assets/Scenes/ARCollaborationData/MessageType.cs.meta


fileFormatVersion: 2
guid: b16d841d0e5dd48d0a70c79beff77499
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

102
Assets/Scenes/ARCollaborationData/NetworkBuffer.cs


using System;
using System.Net.Sockets;
using UnityEngine;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
public struct NetworkBuffer
{
byte[] m_Buffer;
public NetworkBuffer(int size)
{
m_Buffer = new byte[size];
}
public byte[] buffer => m_Buffer;
public int Read(NetworkStream stream, int offset, int size)
{
ValidateAndThrow(stream);
if (offset < 0)
throw new ArgumentOutOfRangeException(nameof(offset), offset, $"{nameof(offset)} must be greater than or equal to zero.");
if (size < 0)
throw new ArgumentOutOfRangeException(nameof(size), size, $"{nameof(size)} must be greater than or equal to zero.");
if (offset + size > m_Buffer.Length)
throw new InvalidOperationException($"Reading {size} bytes starting at offset {offset} would read past the end of the buffer (buffer length = {m_Buffer.Length}).");
int bytesRemaining = size;
while (bytesRemaining > 0)
{
int bytesRead = stream.Read(m_Buffer, offset, size);
CollaborationNetworkingIndicator.NotifyIncomingDataReceived();
offset += bytesRead;
bytesRemaining -= bytesRead;
}
return size;
}
public void Send(NetworkStream stream, int offset, int size)
{
ValidateAndThrow(stream);
if (offset + size > m_Buffer.Length)
throw new InvalidOperationException($"Writing {size} bytes starting at offset {offset} would write past the end of the buffer (buffer length = {m_Buffer.Length}).");
try
{
stream.Write(m_Buffer, offset, size);
CollaborationNetworkingIndicator.NotifyOutgoingDataSent();
}
catch (SocketException socketException)
{
Logger.Log($"Socket exception: {socketException}");
}
}
public unsafe void Send(NetworkStream stream, NativeSlice<byte> bytes)
{
ValidateAndThrow(stream);
var basePtr = new IntPtr(bytes.GetUnsafeReadOnlyPtr());
int bytesRemaining = bytes.Length;
int offset = 0;
while (bytesRemaining > 0)
{
// Memcpy next chunk into destinationBuffer
int size = Mathf.Min(m_Buffer.Length, bytesRemaining);
fixed(byte* dst = m_Buffer)
{
var src = basePtr + offset;
UnsafeUtility.MemCpy(dst, (void*)src, size);
}
bytesRemaining -= size;
offset += size;
Send(stream, 0, size);
}
}
public void Send<T>(NetworkStream stream, T message) where T : struct, IMessage
{
ValidateAndThrow(stream);
int size = message.EncodeTo(m_Buffer);
Send(stream, 0, size);
}
void ValidateAndThrow(NetworkStream stream)
{
if (stream == null)
throw new ArgumentNullException(nameof(stream));
if (m_Buffer == null)
throw new InvalidOperationException($"{nameof(NetworkBuffer)} has not been initialized.");
}
}

11
Assets/Scenes/ARCollaborationData/NetworkBuffer.cs.meta


fileFormatVersion: 2
guid: 6159b84b308ec4356bdc9ae17d26964b
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

84
Assets/Scenes/ARCollaborationData/NetworkDataDecoder.cs


using System;
using System.Net;
public struct NetworkDataDecoder
{
byte[] m_Buffer;
int m_Offset;
int m_Length;
public NetworkDataDecoder(byte[] buffer, int size)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (size > buffer.Length)
throw new ArgumentOutOfRangeException(nameof(size), size, $"'{nameof(size)}' is greater than the length of {nameof(buffer)} ({buffer.Length}).");
m_Buffer = buffer;
m_Offset = 0;
m_Length = size;
}
public unsafe float DecodeFloat()
{
var value = DecodeInt();
return *(float*)&value;
}
public unsafe double DecodeDouble()
{
var value = DecodeLong();
return *(double*)&value;
}
public uint DecodeUInt() => (uint)DecodeInt();
public ulong DecodeULong() => (ulong)DecodeLong();
public byte DecodeByte()
{
if (m_Offset >= m_Length)
throw new InvalidOperationException("Buffer is exhausted. Cannot decode more data.");
return m_Buffer[m_Offset++];
}
public unsafe short DecodeShort()
{
if (m_Offset + 2 > m_Length)
throw new InvalidOperationException("Buffer is exhausted. Cannot decode more data.");
fixed(byte* ptr = &m_Buffer[m_Offset])
{
m_Offset += 2;
return IPAddress.NetworkToHostOrder(*(short*)ptr);
}
}
public unsafe int DecodeInt()
{
if (m_Offset + 4 > m_Length)
throw new InvalidOperationException("Buffer is exhausted. Cannot decode more data.");
fixed(byte* ptr = &m_Buffer[m_Offset])
{
m_Offset += 4;
return IPAddress.NetworkToHostOrder(*(int*)ptr);
}
}
public unsafe long DecodeLong()
{
if (m_Offset + 8 > m_Length)
throw new InvalidOperationException("Buffer is exhausted. Cannot decode more data.");
fixed(byte* ptr = &m_Buffer[m_Offset])
{
m_Offset += 8;
return IPAddress.NetworkToHostOrder(*(long*)ptr);
}
}
}

11
Assets/Scenes/ARCollaborationData/NetworkDataDecoder.cs.meta


fileFormatVersion: 2
guid: d3331be3e25f44589a8a9895cdb60f02
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

75
Assets/Scenes/ARCollaborationData/NetworkDataEncoder.cs


using System;
using System.Net;
public struct NetworkDataEncoder
{
byte[] m_Buffer;
int m_Offset;
public NetworkDataEncoder(byte[] buffer)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
m_Buffer = buffer;
m_Offset = 0;
}
public int length => m_Offset;
public unsafe void Encode(float value) => Encode(*(int*)&value);
public unsafe void Encode(double value) => Encode(*(long*)&value);
public void Encode(uint value) => Encode((int)value);
public void Encode(ulong value) => Encode((long)value);
public void Encode(byte value)
{
if (m_Offset + 1 > m_Buffer.Length)
throw new InvalidOperationException("Buffer is full. Cannot write more data.");
m_Buffer[m_Offset++] = value;
}
public unsafe void Encode(short value)
{
int newOffset = m_Offset + 2;
if (newOffset > m_Buffer.Length)
throw new InvalidOperationException("Buffer is full. Cannot write more data.");
fixed(byte* ptr = &m_Buffer[m_Offset])
{
*(short*)ptr = IPAddress.HostToNetworkOrder(value);
}
m_Offset = newOffset;
}
public unsafe void Encode(int value)
{
int newOffset = m_Offset + 4;
if (newOffset > m_Buffer.Length)
throw new InvalidOperationException("Buffer is full. Cannot write more data.");
fixed(byte* ptr = &m_Buffer[m_Offset])
{
*(int*)ptr = IPAddress.HostToNetworkOrder(value);
}
m_Offset = newOffset;
}
public unsafe void Encode(long value)
{
int newOffset = m_Offset + 8;
if (newOffset > m_Buffer.Length)
throw new InvalidOperationException("Buffer is full. Cannot write more data.");
fixed(byte* ptr = &m_Buffer[m_Offset])
{
*(long*)ptr = IPAddress.HostToNetworkOrder(value);
}
m_Offset = newOffset;
}
}

11
Assets/Scenes/ARCollaborationData/NetworkDataEncoder.cs.meta


fileFormatVersion: 2
guid: 03435c1a2d82b46f98fe7980c199a46f
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
正在加载...
取消
保存