您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
784 行
31 KiB
784 行
31 KiB
using System;
|
|
using System.Collections;
|
|
using System.Collections.Generic;
|
|
using System.Runtime.CompilerServices;
|
|
using Unity.Collections;
|
|
using Unity.Collections.LowLevel.Unsafe;
|
|
using UnityEngine;
|
|
|
|
namespace Unity.Netcode
|
|
{
|
|
internal class HandlerNotRegisteredException : SystemException
|
|
{
|
|
public HandlerNotRegisteredException() { }
|
|
public HandlerNotRegisteredException(string issue) : base(issue) { }
|
|
}
|
|
|
|
internal class InvalidMessageStructureException : SystemException
|
|
{
|
|
public InvalidMessageStructureException() { }
|
|
public InvalidMessageStructureException(string issue) : base(issue) { }
|
|
}
|
|
|
|
internal class MessagingSystem : IDisposable
|
|
{
|
|
private struct ReceiveQueueItem
|
|
{
|
|
public FastBufferReader Reader;
|
|
public MessageHeader Header;
|
|
public ulong SenderId;
|
|
public float Timestamp;
|
|
public int MessageHeaderSerializedSize;
|
|
}
|
|
|
|
private struct SendQueueItem
|
|
{
|
|
public BatchHeader BatchHeader;
|
|
public FastBufferWriter Writer;
|
|
public readonly NetworkDelivery NetworkDelivery;
|
|
|
|
public SendQueueItem(NetworkDelivery delivery, int writerSize, Allocator writerAllocator, int maxWriterSize = -1)
|
|
{
|
|
Writer = new FastBufferWriter(writerSize, writerAllocator, maxWriterSize);
|
|
NetworkDelivery = delivery;
|
|
BatchHeader = default;
|
|
}
|
|
}
|
|
|
|
internal delegate void MessageHandler(FastBufferReader reader, ref NetworkContext context, MessagingSystem system);
|
|
internal delegate int VersionGetter();
|
|
|
|
private NativeList<ReceiveQueueItem> m_IncomingMessageQueue = new NativeList<ReceiveQueueItem>(16, Allocator.Persistent);
|
|
|
|
// These array will grow as we need more message handlers. 4 is just a starting size.
|
|
private MessageHandler[] m_MessageHandlers = new MessageHandler[4];
|
|
private Type[] m_ReverseTypeMap = new Type[4];
|
|
|
|
private Dictionary<Type, uint> m_MessageTypes = new Dictionary<Type, uint>();
|
|
private Dictionary<ulong, NativeList<SendQueueItem>> m_SendQueues = new Dictionary<ulong, NativeList<SendQueueItem>>();
|
|
|
|
// This is m_PerClientMessageVersion[clientId][messageType] = version
|
|
private Dictionary<ulong, Dictionary<Type, int>> m_PerClientMessageVersions = new Dictionary<ulong, Dictionary<Type, int>>();
|
|
private Dictionary<uint, Type> m_MessagesByHash = new Dictionary<uint, Type>();
|
|
private Dictionary<Type, int> m_LocalVersions = new Dictionary<Type, int>();
|
|
|
|
private List<INetworkHooks> m_Hooks = new List<INetworkHooks>();
|
|
|
|
private uint m_HighMessageType;
|
|
private object m_Owner;
|
|
private IMessageSender m_MessageSender;
|
|
private bool m_Disposed;
|
|
|
|
internal Type[] MessageTypes => m_ReverseTypeMap;
|
|
internal MessageHandler[] MessageHandlers => m_MessageHandlers;
|
|
|
|
internal uint MessageHandlerCount => m_HighMessageType;
|
|
|
|
internal uint GetMessageType(Type t)
|
|
{
|
|
return m_MessageTypes[t];
|
|
}
|
|
|
|
public const int NON_FRAGMENTED_MESSAGE_MAX_SIZE = 1300;
|
|
public const int FRAGMENTED_MESSAGE_MAX_SIZE = int.MaxValue;
|
|
|
|
internal struct MessageWithHandler
|
|
{
|
|
public Type MessageType;
|
|
public MessageHandler Handler;
|
|
public VersionGetter GetVersion;
|
|
}
|
|
|
|
internal List<MessageWithHandler> PrioritizeMessageOrder(List<MessageWithHandler> allowedTypes)
|
|
{
|
|
var prioritizedTypes = new List<MessageWithHandler>();
|
|
|
|
// first pass puts the priority message in the first indices
|
|
// Those are the messages that must be delivered in order to allow re-ordering the others later
|
|
foreach (var t in allowedTypes)
|
|
{
|
|
if (t.MessageType.FullName == typeof(ConnectionRequestMessage).FullName ||
|
|
t.MessageType.FullName == typeof(ConnectionApprovedMessage).FullName)
|
|
{
|
|
prioritizedTypes.Add(t);
|
|
}
|
|
}
|
|
|
|
foreach (var t in allowedTypes)
|
|
{
|
|
if (t.MessageType.FullName != typeof(ConnectionRequestMessage).FullName &&
|
|
t.MessageType.FullName != typeof(ConnectionApprovedMessage).FullName)
|
|
{
|
|
prioritizedTypes.Add(t);
|
|
}
|
|
}
|
|
|
|
return prioritizedTypes;
|
|
}
|
|
|
|
public MessagingSystem(IMessageSender messageSender, object owner, IMessageProvider provider = null)
|
|
{
|
|
try
|
|
{
|
|
m_MessageSender = messageSender;
|
|
m_Owner = owner;
|
|
|
|
if (provider == null)
|
|
{
|
|
provider = new ILPPMessageProvider();
|
|
}
|
|
var allowedTypes = provider.GetMessages();
|
|
|
|
allowedTypes.Sort((a, b) => string.CompareOrdinal(a.MessageType.FullName, b.MessageType.FullName));
|
|
allowedTypes = PrioritizeMessageOrder(allowedTypes);
|
|
foreach (var type in allowedTypes)
|
|
{
|
|
RegisterMessageType(type);
|
|
}
|
|
}
|
|
catch (Exception)
|
|
{
|
|
Dispose();
|
|
throw;
|
|
}
|
|
}
|
|
|
|
public unsafe void Dispose()
|
|
{
|
|
if (m_Disposed)
|
|
{
|
|
return;
|
|
}
|
|
|
|
// Can't just iterate SendQueues or SendQueues.Keys because ClientDisconnected removes
|
|
// from the queue.
|
|
foreach (var kvp in m_SendQueues)
|
|
{
|
|
CleanupDisconnectedClient(kvp.Key);
|
|
}
|
|
|
|
for (var queueIndex = 0; queueIndex < m_IncomingMessageQueue.Length; ++queueIndex)
|
|
{
|
|
// Avoid copies...
|
|
ref var item = ref m_IncomingMessageQueue.ElementAt(queueIndex);
|
|
item.Reader.Dispose();
|
|
}
|
|
|
|
m_IncomingMessageQueue.Dispose();
|
|
m_Disposed = true;
|
|
}
|
|
|
|
~MessagingSystem()
|
|
{
|
|
Dispose();
|
|
}
|
|
|
|
public void Hook(INetworkHooks hooks)
|
|
{
|
|
m_Hooks.Add(hooks);
|
|
}
|
|
|
|
public void Unhook(INetworkHooks hooks)
|
|
{
|
|
m_Hooks.Remove(hooks);
|
|
}
|
|
|
|
private void RegisterMessageType(MessageWithHandler messageWithHandler)
|
|
{
|
|
// if we are out of space, perform amortized linear growth
|
|
if (m_HighMessageType == m_MessageHandlers.Length)
|
|
{
|
|
Array.Resize(ref m_MessageHandlers, 2 * m_MessageHandlers.Length);
|
|
Array.Resize(ref m_ReverseTypeMap, 2 * m_ReverseTypeMap.Length);
|
|
}
|
|
|
|
m_MessageHandlers[m_HighMessageType] = messageWithHandler.Handler;
|
|
m_ReverseTypeMap[m_HighMessageType] = messageWithHandler.MessageType;
|
|
m_MessagesByHash[XXHash.Hash32(messageWithHandler.MessageType.FullName)] = messageWithHandler.MessageType;
|
|
m_MessageTypes[messageWithHandler.MessageType] = m_HighMessageType++;
|
|
m_LocalVersions[messageWithHandler.MessageType] = messageWithHandler.GetVersion();
|
|
}
|
|
|
|
public int GetLocalVersion(Type messageType)
|
|
{
|
|
return m_LocalVersions[messageType];
|
|
}
|
|
|
|
internal void HandleIncomingData(ulong clientId, ArraySegment<byte> data, float receiveTime)
|
|
{
|
|
unsafe
|
|
{
|
|
fixed (byte* nativeData = data.Array)
|
|
{
|
|
var batchReader =
|
|
new FastBufferReader(nativeData + data.Offset, Allocator.None, data.Count);
|
|
if (!batchReader.TryBeginRead(sizeof(BatchHeader)))
|
|
{
|
|
NetworkLog.LogWarning("Received a packet too small to contain a BatchHeader. Ignoring it.");
|
|
return;
|
|
}
|
|
|
|
batchReader.ReadValue(out BatchHeader batchHeader);
|
|
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
m_Hooks[hookIdx].OnBeforeReceiveBatch(clientId, batchHeader.BatchSize, batchReader.Length);
|
|
}
|
|
|
|
for (var messageIdx = 0; messageIdx < batchHeader.BatchSize; ++messageIdx)
|
|
{
|
|
|
|
var messageHeader = new MessageHeader();
|
|
var position = batchReader.Position;
|
|
try
|
|
{
|
|
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageType);
|
|
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageSize);
|
|
}
|
|
catch (OverflowException)
|
|
{
|
|
NetworkLog.LogWarning("Received a batch that didn't have enough data for all of its batches, ending early!");
|
|
throw;
|
|
}
|
|
|
|
var receivedHeaderSize = batchReader.Position - position;
|
|
|
|
if (!batchReader.TryBeginRead((int)messageHeader.MessageSize))
|
|
{
|
|
NetworkLog.LogWarning("Received a message that claimed a size larger than the packet, ending early!");
|
|
return;
|
|
}
|
|
m_IncomingMessageQueue.Add(new ReceiveQueueItem
|
|
{
|
|
Header = messageHeader,
|
|
SenderId = clientId,
|
|
Timestamp = receiveTime,
|
|
// Copy the data for this message into a new FastBufferReader that owns that memory.
|
|
// We can't guarantee the memory in the ArraySegment stays valid because we don't own it,
|
|
// so we must move it to memory we do own.
|
|
Reader = new FastBufferReader(batchReader.GetUnsafePtrAtCurrentPosition(), Allocator.TempJob, (int)messageHeader.MessageSize),
|
|
MessageHeaderSerializedSize = receivedHeaderSize,
|
|
});
|
|
batchReader.Seek(batchReader.Position + (int)messageHeader.MessageSize);
|
|
}
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
m_Hooks[hookIdx].OnAfterReceiveBatch(clientId, batchHeader.BatchSize, batchReader.Length);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private bool CanReceive(ulong clientId, Type messageType, FastBufferReader messageContent, ref NetworkContext context)
|
|
{
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
if (!m_Hooks[hookIdx].OnVerifyCanReceive(clientId, messageType, messageContent, ref context))
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
internal Type GetMessageForHash(uint messageHash)
|
|
{
|
|
if (!m_MessagesByHash.ContainsKey(messageHash))
|
|
{
|
|
return null;
|
|
}
|
|
return m_MessagesByHash[messageHash];
|
|
}
|
|
|
|
internal void SetVersion(ulong clientId, uint messageHash, int version)
|
|
{
|
|
if (!m_MessagesByHash.ContainsKey(messageHash))
|
|
{
|
|
return;
|
|
}
|
|
var messageType = m_MessagesByHash[messageHash];
|
|
|
|
if (!m_PerClientMessageVersions.ContainsKey(clientId))
|
|
{
|
|
m_PerClientMessageVersions[clientId] = new Dictionary<Type, int>();
|
|
}
|
|
|
|
m_PerClientMessageVersions[clientId][messageType] = version;
|
|
}
|
|
|
|
internal void SetServerMessageOrder(NativeArray<uint> messagesInIdOrder)
|
|
{
|
|
var oldHandlers = m_MessageHandlers;
|
|
var oldTypes = m_MessageTypes;
|
|
m_ReverseTypeMap = new Type[messagesInIdOrder.Length];
|
|
m_MessageHandlers = new MessageHandler[messagesInIdOrder.Length];
|
|
m_MessageTypes = new Dictionary<Type, uint>();
|
|
|
|
for (var i = 0; i < messagesInIdOrder.Length; ++i)
|
|
{
|
|
if (!m_MessagesByHash.ContainsKey(messagesInIdOrder[i]))
|
|
{
|
|
continue;
|
|
}
|
|
var messageType = m_MessagesByHash[messagesInIdOrder[i]];
|
|
var oldId = oldTypes[messageType];
|
|
var handler = oldHandlers[oldId];
|
|
var newId = (uint)i;
|
|
m_MessageTypes[messageType] = newId;
|
|
m_MessageHandlers[newId] = handler;
|
|
m_ReverseTypeMap[newId] = messageType;
|
|
}
|
|
}
|
|
|
|
public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulong senderId, float timestamp, int serializedHeaderSize)
|
|
{
|
|
if (header.MessageType >= m_HighMessageType)
|
|
{
|
|
Debug.LogWarning($"Received a message with invalid message type value {header.MessageType}");
|
|
reader.Dispose();
|
|
return;
|
|
}
|
|
var context = new NetworkContext
|
|
{
|
|
SystemOwner = m_Owner,
|
|
SenderId = senderId,
|
|
Timestamp = timestamp,
|
|
Header = header,
|
|
SerializedHeaderSize = serializedHeaderSize,
|
|
MessageSize = header.MessageSize,
|
|
};
|
|
|
|
var type = m_ReverseTypeMap[header.MessageType];
|
|
if (!CanReceive(senderId, type, reader, ref context))
|
|
{
|
|
reader.Dispose();
|
|
return;
|
|
}
|
|
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
m_Hooks[hookIdx].OnBeforeReceiveMessage(senderId, type, reader.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
|
|
}
|
|
|
|
var handler = m_MessageHandlers[header.MessageType];
|
|
using (reader)
|
|
{
|
|
// This will also log an exception is if the server knows about a message type the client doesn't know
|
|
// about. In this case the handler will be null. It is still an issue the user must deal with: If the
|
|
// two connecting builds know about different messages, the server should not send a message to a client
|
|
// that doesn't know about it
|
|
if (handler == null)
|
|
{
|
|
Debug.LogException(new HandlerNotRegisteredException(header.MessageType.ToString()));
|
|
}
|
|
else
|
|
{
|
|
// No user-land message handler exceptions should escape the receive loop.
|
|
// If an exception is throw, the message is ignored.
|
|
// Example use case: A bad message is received that can't be deserialized and throws
|
|
// an OverflowException because it specifies a length greater than the number of bytes in it
|
|
// for some dynamic-length value.
|
|
try
|
|
{
|
|
handler.Invoke(reader, ref context, this);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Debug.LogException(e);
|
|
}
|
|
}
|
|
}
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
m_Hooks[hookIdx].OnAfterReceiveMessage(senderId, type, reader.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
|
|
}
|
|
}
|
|
|
|
internal unsafe void ProcessIncomingMessageQueue()
|
|
{
|
|
for (var index = 0; index < m_IncomingMessageQueue.Length; ++index)
|
|
{
|
|
// Avoid copies...
|
|
ref var item = ref m_IncomingMessageQueue.ElementAt(index);
|
|
HandleMessage(item.Header, item.Reader, item.SenderId, item.Timestamp, item.MessageHeaderSerializedSize);
|
|
if (m_Disposed)
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
|
|
m_IncomingMessageQueue.Clear();
|
|
}
|
|
|
|
internal void ClientConnected(ulong clientId)
|
|
{
|
|
if (m_SendQueues.ContainsKey(clientId))
|
|
{
|
|
return;
|
|
}
|
|
m_SendQueues[clientId] = new NativeList<SendQueueItem>(16, Allocator.Persistent);
|
|
}
|
|
|
|
internal void ClientDisconnected(ulong clientId)
|
|
{
|
|
if (!m_SendQueues.ContainsKey(clientId))
|
|
{
|
|
return;
|
|
}
|
|
CleanupDisconnectedClient(clientId);
|
|
m_SendQueues.Remove(clientId);
|
|
}
|
|
|
|
private void CleanupDisconnectedClient(ulong clientId)
|
|
{
|
|
var queue = m_SendQueues[clientId];
|
|
for (var i = 0; i < queue.Length; ++i)
|
|
{
|
|
queue.ElementAt(i).Writer.Dispose();
|
|
}
|
|
|
|
queue.Dispose();
|
|
}
|
|
|
|
internal void CleanupDisconnectedClients()
|
|
{
|
|
var removeList = new NativeList<ulong>(Allocator.Temp);
|
|
foreach (var clientId in m_PerClientMessageVersions.Keys)
|
|
{
|
|
if (!m_SendQueues.ContainsKey(clientId))
|
|
{
|
|
removeList.Add(clientId);
|
|
}
|
|
}
|
|
|
|
foreach (var clientId in removeList)
|
|
{
|
|
m_PerClientMessageVersions.Remove(clientId);
|
|
}
|
|
}
|
|
|
|
public static int CreateMessageAndGetVersion<T>() where T : INetworkMessage, new()
|
|
{
|
|
return new T().Version;
|
|
}
|
|
|
|
internal int GetMessageVersion(Type type, ulong clientId, bool forReceive = false)
|
|
{
|
|
if (!m_PerClientMessageVersions.TryGetValue(clientId, out var versionMap))
|
|
{
|
|
if (forReceive)
|
|
{
|
|
Debug.LogWarning($"Trying to receive {type.Name} from client {clientId} which is not in a connected state.");
|
|
|
|
}
|
|
else
|
|
{
|
|
Debug.LogWarning($"Trying to send {type.Name} to client {clientId} which is not in a connected state.");
|
|
}
|
|
return -1;
|
|
}
|
|
if (!versionMap.TryGetValue(type, out var messageVersion))
|
|
{
|
|
return -1;
|
|
}
|
|
|
|
return messageVersion;
|
|
}
|
|
|
|
public static void ReceiveMessage<T>(FastBufferReader reader, ref NetworkContext context, MessagingSystem system) where T : INetworkMessage, new()
|
|
{
|
|
var message = new T();
|
|
var messageVersion = 0;
|
|
// Special cases because these are the messages that carry the version info - thus the version info isn't
|
|
// populated yet when we get these. The first part of these messages always has to be the version data
|
|
// and can't change.
|
|
if (typeof(T) != typeof(ConnectionRequestMessage) && typeof(T) != typeof(ConnectionApprovedMessage) && typeof(T) != typeof(DisconnectReasonMessage))
|
|
{
|
|
messageVersion = system.GetMessageVersion(typeof(T), context.SenderId, true);
|
|
if (messageVersion < 0)
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
if (message.Deserialize(reader, ref context, messageVersion))
|
|
{
|
|
for (var hookIdx = 0; hookIdx < system.m_Hooks.Count; ++hookIdx)
|
|
{
|
|
system.m_Hooks[hookIdx].OnBeforeHandleMessage(ref message, ref context);
|
|
}
|
|
|
|
message.Handle(ref context);
|
|
|
|
for (var hookIdx = 0; hookIdx < system.m_Hooks.Count; ++hookIdx)
|
|
{
|
|
system.m_Hooks[hookIdx].OnAfterHandleMessage(ref message, ref context);
|
|
}
|
|
}
|
|
}
|
|
|
|
private bool CanSend(ulong clientId, Type messageType, NetworkDelivery delivery)
|
|
{
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
if (!m_Hooks[hookIdx].OnVerifyCanSend(clientId, messageType, delivery))
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
internal int SendMessage<TMessageType, TClientIdListType>(ref TMessageType message, NetworkDelivery delivery, in TClientIdListType clientIds)
|
|
where TMessageType : INetworkMessage
|
|
where TClientIdListType : IReadOnlyList<ulong>
|
|
{
|
|
if (clientIds.Count == 0)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
var largestSerializedSize = 0;
|
|
var sentMessageVersions = new NativeHashSet<int>(clientIds.Count, Allocator.Temp);
|
|
for (var i = 0; i < clientIds.Count; ++i)
|
|
{
|
|
var messageVersion = 0;
|
|
// Special case because this is the message that carries the version info - thus the version info isn't
|
|
// populated yet when we get this. The first part of this message always has to be the version data
|
|
// and can't change.
|
|
if (typeof(TMessageType) != typeof(ConnectionRequestMessage))
|
|
{
|
|
messageVersion = GetMessageVersion(typeof(TMessageType), clientIds[i]);
|
|
if (messageVersion < 0)
|
|
{
|
|
// Client doesn't know this message exists, don't send it at all.
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (sentMessageVersions.Contains(messageVersion))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
sentMessageVersions.Add(messageVersion);
|
|
|
|
var maxSize = delivery == NetworkDelivery.ReliableFragmentedSequenced ? FRAGMENTED_MESSAGE_MAX_SIZE : NON_FRAGMENTED_MESSAGE_MAX_SIZE;
|
|
|
|
using var tmpSerializer = new FastBufferWriter(NON_FRAGMENTED_MESSAGE_MAX_SIZE - FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp, maxSize - FastBufferWriter.GetWriteSize<MessageHeader>());
|
|
|
|
message.Serialize(tmpSerializer, messageVersion);
|
|
|
|
var size = SendPreSerializedMessage(tmpSerializer, maxSize, ref message, delivery, clientIds, messageVersion);
|
|
largestSerializedSize = size > largestSerializedSize ? size : largestSerializedSize;
|
|
}
|
|
|
|
sentMessageVersions.Dispose();
|
|
|
|
return largestSerializedSize;
|
|
}
|
|
|
|
internal unsafe int SendPreSerializedMessage<TMessageType>(in FastBufferWriter tmpSerializer, int maxSize, ref TMessageType message, NetworkDelivery delivery, in IReadOnlyList<ulong> clientIds, int messageVersionFilter)
|
|
where TMessageType : INetworkMessage
|
|
{
|
|
using var headerSerializer = new FastBufferWriter(FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp);
|
|
|
|
var header = new MessageHeader
|
|
{
|
|
MessageSize = (uint)tmpSerializer.Length,
|
|
MessageType = m_MessageTypes[typeof(TMessageType)],
|
|
};
|
|
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageType);
|
|
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageSize);
|
|
|
|
for (var i = 0; i < clientIds.Count; ++i)
|
|
{
|
|
var messageVersion = 0;
|
|
// Special case because this is the message that carries the version info - thus the version info isn't
|
|
// populated yet when we get this. The first part of this message always has to be the version data
|
|
// and can't change.
|
|
if (typeof(TMessageType) != typeof(ConnectionRequestMessage))
|
|
{
|
|
messageVersion = GetMessageVersion(typeof(TMessageType), clientIds[i]);
|
|
if (messageVersion < 0)
|
|
{
|
|
// Client doesn't know this message exists, don't send it at all.
|
|
continue;
|
|
}
|
|
|
|
if (messageVersion != messageVersionFilter)
|
|
{
|
|
continue;
|
|
}
|
|
}
|
|
|
|
var clientId = clientIds[i];
|
|
|
|
if (!CanSend(clientId, typeof(TMessageType), delivery))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
m_Hooks[hookIdx].OnBeforeSendMessage(clientId, ref message, delivery);
|
|
}
|
|
|
|
var sendQueueItem = m_SendQueues[clientId];
|
|
if (sendQueueItem.Length == 0)
|
|
{
|
|
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob,
|
|
maxSize));
|
|
sendQueueItem.ElementAt(0).Writer.Seek(sizeof(BatchHeader));
|
|
}
|
|
else
|
|
{
|
|
ref var lastQueueItem = ref sendQueueItem.ElementAt(sendQueueItem.Length - 1);
|
|
if (lastQueueItem.NetworkDelivery != delivery ||
|
|
lastQueueItem.Writer.MaxCapacity - lastQueueItem.Writer.Position
|
|
< tmpSerializer.Length + headerSerializer.Length)
|
|
{
|
|
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob,
|
|
maxSize));
|
|
sendQueueItem.ElementAt(sendQueueItem.Length - 1).Writer.Seek(sizeof(BatchHeader));
|
|
}
|
|
}
|
|
|
|
ref var writeQueueItem = ref sendQueueItem.ElementAt(sendQueueItem.Length - 1);
|
|
writeQueueItem.Writer.TryBeginWrite(tmpSerializer.Length + headerSerializer.Length);
|
|
|
|
writeQueueItem.Writer.WriteBytes(headerSerializer.GetUnsafePtr(), headerSerializer.Length);
|
|
writeQueueItem.Writer.WriteBytes(tmpSerializer.GetUnsafePtr(), tmpSerializer.Length);
|
|
writeQueueItem.BatchHeader.BatchSize++;
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
m_Hooks[hookIdx].OnAfterSendMessage(clientId, ref message, delivery, tmpSerializer.Length + headerSerializer.Length);
|
|
}
|
|
}
|
|
|
|
return tmpSerializer.Length + headerSerializer.Length;
|
|
}
|
|
|
|
internal unsafe int SendPreSerializedMessage<TMessageType>(in FastBufferWriter tmpSerializer, int maxSize, ref TMessageType message, NetworkDelivery delivery, ulong clientId)
|
|
where TMessageType : INetworkMessage
|
|
{
|
|
var messageVersion = 0;
|
|
// Special case because this is the message that carries the version info - thus the version info isn't
|
|
// populated yet when we get this. The first part of this message always has to be the version data
|
|
// and can't change.
|
|
if (typeof(TMessageType) != typeof(ConnectionRequestMessage))
|
|
{
|
|
messageVersion = GetMessageVersion(typeof(TMessageType), clientId);
|
|
if (messageVersion < 0)
|
|
{
|
|
// Client doesn't know this message exists, don't send it at all.
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
ulong* clientIds = stackalloc ulong[] { clientId };
|
|
return SendPreSerializedMessage(tmpSerializer, maxSize, ref message, delivery, new PointerListWrapper<ulong>(clientIds, 1), messageVersion);
|
|
}
|
|
|
|
private struct PointerListWrapper<T> : IReadOnlyList<T>
|
|
where T : unmanaged
|
|
{
|
|
private unsafe T* m_Value;
|
|
private int m_Length;
|
|
|
|
internal unsafe PointerListWrapper(T* ptr, int length)
|
|
{
|
|
m_Value = ptr;
|
|
m_Length = length;
|
|
}
|
|
|
|
public int Count
|
|
{
|
|
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
get => m_Length;
|
|
}
|
|
|
|
public unsafe T this[int index]
|
|
{
|
|
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
get => m_Value[index];
|
|
}
|
|
|
|
public IEnumerator<T> GetEnumerator()
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
IEnumerator IEnumerable.GetEnumerator()
|
|
{
|
|
return GetEnumerator();
|
|
}
|
|
}
|
|
|
|
internal unsafe int SendMessage<T>(ref T message, NetworkDelivery delivery,
|
|
ulong* clientIds, int numClientIds)
|
|
where T : INetworkMessage
|
|
{
|
|
return SendMessage(ref message, delivery, new PointerListWrapper<ulong>(clientIds, numClientIds));
|
|
}
|
|
|
|
internal unsafe int SendMessage<T>(ref T message, NetworkDelivery delivery, ulong clientId)
|
|
where T : INetworkMessage
|
|
{
|
|
ulong* clientIds = stackalloc ulong[] { clientId };
|
|
return SendMessage(ref message, delivery, new PointerListWrapper<ulong>(clientIds, 1));
|
|
}
|
|
|
|
internal unsafe int SendMessage<T>(ref T message, NetworkDelivery delivery, in NativeArray<ulong> clientIds)
|
|
where T : INetworkMessage
|
|
{
|
|
return SendMessage(ref message, delivery, new PointerListWrapper<ulong>((ulong*)clientIds.GetUnsafePtr(), clientIds.Length));
|
|
}
|
|
|
|
internal unsafe void ProcessSendQueues()
|
|
{
|
|
foreach (var kvp in m_SendQueues)
|
|
{
|
|
var clientId = kvp.Key;
|
|
var sendQueueItem = kvp.Value;
|
|
for (var i = 0; i < sendQueueItem.Length; ++i)
|
|
{
|
|
ref var queueItem = ref sendQueueItem.ElementAt(i);
|
|
if (queueItem.BatchHeader.BatchSize == 0)
|
|
{
|
|
queueItem.Writer.Dispose();
|
|
continue;
|
|
}
|
|
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
m_Hooks[hookIdx].OnBeforeSendBatch(clientId, queueItem.BatchHeader.BatchSize, queueItem.Writer.Length, queueItem.NetworkDelivery);
|
|
}
|
|
|
|
queueItem.Writer.Seek(0);
|
|
#if UNITY_EDITOR || DEVELOPMENT_BUILD
|
|
// Skipping the Verify and sneaking the write mark in because we know it's fine.
|
|
queueItem.Writer.Handle->AllowedWriteMark = 2;
|
|
#endif
|
|
queueItem.Writer.WriteValue(queueItem.BatchHeader);
|
|
|
|
try
|
|
{
|
|
m_MessageSender.Send(clientId, queueItem.NetworkDelivery, queueItem.Writer);
|
|
|
|
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
|
|
{
|
|
m_Hooks[hookIdx].OnAfterSendBatch(clientId, queueItem.BatchHeader.BatchSize, queueItem.Writer.Length, queueItem.NetworkDelivery);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
queueItem.Writer.Dispose();
|
|
}
|
|
}
|
|
sendQueueItem.Clear();
|
|
}
|
|
}
|
|
}
|
|
}
|