您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
978 行
48 KiB
978 行
48 KiB
using System;
|
|
using System.Threading;
|
|
using Unity.Collections;
|
|
using Unity.Collections.LowLevel.Unsafe;
|
|
using Unity.Burst;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using Unity.Networking.Transport.Protocols;
|
|
using Unity.Networking.Transport.Utilities;
|
|
using System.Runtime.InteropServices;
|
|
|
|
namespace Unity.Networking.Transport
|
|
{
|
|
public unsafe struct InboundSendBuffer
|
|
{
|
|
public byte* buffer;
|
|
public byte* bufferWithHeaders;
|
|
public int bufferLength;
|
|
public int bufferWithHeadersLength;
|
|
public int headerPadding;
|
|
|
|
public void SetBufferFrombufferWithHeaders()
|
|
{
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
if (bufferWithHeadersLength < headerPadding)
|
|
throw new IndexOutOfRangeException("Buffer is too small to fit headers");
|
|
#endif
|
|
buffer = bufferWithHeaders + headerPadding;
|
|
bufferLength = bufferWithHeadersLength - headerPadding;
|
|
}
|
|
}
|
|
public unsafe struct InboundRecvBuffer
|
|
{
|
|
public byte* buffer;
|
|
public int bufferLength;
|
|
|
|
public InboundRecvBuffer Slice(int offset)
|
|
{
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
if (bufferLength < offset)
|
|
throw new ArgumentOutOfRangeException("Buffer does not contain enough data");
|
|
#endif
|
|
InboundRecvBuffer slice;
|
|
slice.buffer = buffer + offset;
|
|
slice.bufferLength = bufferLength - offset;
|
|
return slice;
|
|
}
|
|
}
|
|
public unsafe struct NetworkPipelineContext
|
|
{
|
|
public byte* staticInstanceBuffer;
|
|
public byte* internalSharedProcessBuffer;
|
|
public byte* internalProcessBuffer;
|
|
public DataStreamWriter header;
|
|
public long timestamp;
|
|
public int staticInstanceBufferLength;
|
|
public int internalSharedProcessBufferLength;
|
|
public int internalProcessBufferLength;
|
|
public int accumulatedHeaderCapacity;
|
|
}
|
|
|
|
public unsafe interface INetworkPipelineStage
|
|
{
|
|
NetworkPipelineStage StaticInitialize(byte* staticInstanceBuffer, int staticInstanceBufferLength, INetworkParameter[] param);
|
|
int StaticSize { get; }
|
|
}
|
|
public unsafe struct NetworkPipelineStage
|
|
{
|
|
public NetworkPipelineStage(TransportFunctionPointer<ReceiveDelegate> Receive,
|
|
TransportFunctionPointer<SendDelegate> Send,
|
|
TransportFunctionPointer<InitializeConnectionDelegate> InitializeConnection,
|
|
int ReceiveCapacity,
|
|
int SendCapacity,
|
|
int HeaderCapacity,
|
|
int SharedStateCapacity,
|
|
int PayloadCapacity = 0) // 0 means any size
|
|
{
|
|
this.Receive = Receive;
|
|
this.Send = Send;
|
|
this.InitializeConnection = InitializeConnection;
|
|
this.ReceiveCapacity = ReceiveCapacity;
|
|
this.SendCapacity = SendCapacity;
|
|
this.HeaderCapacity = HeaderCapacity;
|
|
this.SharedStateCapacity = SharedStateCapacity;
|
|
this.PayloadCapacity = PayloadCapacity;
|
|
StaticStateStart = StaticStateCapcity = 0;
|
|
}
|
|
[Flags]
|
|
public enum Requests
|
|
{
|
|
None = 0,
|
|
Resume = 1,
|
|
Update = 2,
|
|
SendUpdate = 4,
|
|
Error = 8
|
|
}
|
|
|
|
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
|
|
public delegate void ReceiveDelegate(ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer, ref Requests requests);
|
|
|
|
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
|
|
public delegate int SendDelegate(ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref Requests requests);
|
|
|
|
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
|
|
public delegate void InitializeConnectionDelegate(byte* staticInstanceBuffer, int staticInstanceBufferLength,
|
|
byte* sendProcessBuffer, int sendProcessBufferLength, byte* recvProcessBuffer, int recvProcessBufferLength,
|
|
byte* sharedProcessBuffer, int sharedProcessBufferLength);
|
|
|
|
public TransportFunctionPointer<ReceiveDelegate> Receive;
|
|
public TransportFunctionPointer<SendDelegate> Send;
|
|
public TransportFunctionPointer<InitializeConnectionDelegate> InitializeConnection;
|
|
|
|
public readonly int ReceiveCapacity;
|
|
public readonly int SendCapacity;
|
|
public readonly int HeaderCapacity;
|
|
public readonly int SharedStateCapacity;
|
|
public readonly int PayloadCapacity;
|
|
|
|
internal int StaticStateStart;
|
|
internal int StaticStateCapcity;
|
|
}
|
|
|
|
public struct NetworkPipelineStageId
|
|
{
|
|
internal int Index;
|
|
internal int IsValid;
|
|
}
|
|
public static class NetworkPipelineStageCollection
|
|
{
|
|
static NetworkPipelineStageCollection()
|
|
{
|
|
m_stages = new List<INetworkPipelineStage>();
|
|
RegisterPipelineStage(new NullPipelineStage());
|
|
RegisterPipelineStage(new FragmentationPipelineStage());
|
|
RegisterPipelineStage(new ReliableSequencedPipelineStage());
|
|
RegisterPipelineStage(new UnreliableSequencedPipelineStage());
|
|
RegisterPipelineStage(new SimulatorPipelineStage());
|
|
RegisterPipelineStage(new SimulatorPipelineStageInSend());
|
|
}
|
|
|
|
public static void RegisterPipelineStage(INetworkPipelineStage stage)
|
|
{
|
|
for (int i = 0; i < m_stages.Count; ++i)
|
|
{
|
|
if (m_stages[i].GetType() == stage.GetType())
|
|
{
|
|
// TODO: should this be an error?
|
|
m_stages[i] = stage;
|
|
return;
|
|
}
|
|
|
|
}
|
|
m_stages.Add(stage);
|
|
}
|
|
|
|
public static NetworkPipelineStageId GetStageId(Type stageType)
|
|
{
|
|
for (int i = 0; i < m_stages.Count; ++i)
|
|
{
|
|
if (stageType == m_stages[i].GetType())
|
|
return new NetworkPipelineStageId{Index=i, IsValid = 1};
|
|
}
|
|
throw new InvalidOperationException($"Pipeline stage {stageType} is not registered");
|
|
}
|
|
internal static List<INetworkPipelineStage> m_stages;
|
|
}
|
|
|
|
public struct NetworkPipeline
|
|
{
|
|
internal int Id;
|
|
public static NetworkPipeline Null => default(NetworkPipeline);
|
|
|
|
public static bool operator ==(NetworkPipeline lhs, NetworkPipeline rhs)
|
|
{
|
|
return lhs.Id == rhs.Id;
|
|
}
|
|
|
|
public static bool operator !=(NetworkPipeline lhs, NetworkPipeline rhs)
|
|
{
|
|
return lhs.Id != rhs.Id;
|
|
}
|
|
|
|
public override bool Equals(object compare)
|
|
{
|
|
return this == (NetworkPipeline) compare;
|
|
}
|
|
|
|
public override int GetHashCode()
|
|
{
|
|
return Id;
|
|
}
|
|
|
|
public bool Equals(NetworkPipeline connection)
|
|
{
|
|
return connection.Id == Id;
|
|
}
|
|
}
|
|
|
|
public struct NetworkPipelineParams : INetworkParameter
|
|
{
|
|
public int initialCapacity;
|
|
|
|
[Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
|
|
public static void ValidateParameters(params INetworkParameter[] param)
|
|
{
|
|
foreach (var parameter in param)
|
|
{
|
|
if (parameter is NetworkPipelineParams @params && @params.initialCapacity < 0)
|
|
throw new ArgumentException($"Value for NetworkPipelineParams.initialCapacity must be larger then zero.");
|
|
}
|
|
}
|
|
}
|
|
|
|
internal struct NetworkPipelineProcessor : IDisposable
|
|
{
|
|
public const int Alignment = 8;
|
|
public const int AlignmentMinusOne = Alignment-1;
|
|
|
|
public int PayloadCapacity(NetworkPipeline pipeline)
|
|
{
|
|
if (pipeline.Id > 0)
|
|
{
|
|
var p = m_Pipelines[pipeline.Id - 1];
|
|
return p.payloadCapacity;
|
|
}
|
|
return NetworkParameterConstants.MTU;
|
|
}
|
|
|
|
public Concurrent ToConcurrent()
|
|
{
|
|
var concurrent = new Concurrent
|
|
{
|
|
m_StageCollection = m_StageCollection,
|
|
m_StaticInstanceBuffer = m_StaticInstanceBuffer,
|
|
m_Pipelines = m_Pipelines,
|
|
m_StageList = m_StageList,
|
|
m_AccumulatedHeaderCapacity = m_AccumulatedHeaderCapacity,
|
|
m_SendStageNeedsUpdateWrite = m_SendStageNeedsUpdateRead.AsParallelWriter(),
|
|
sizePerConnection = sizePerConnection,
|
|
sendBuffer = m_SendBuffer,
|
|
sharedBuffer = m_SharedBuffer,
|
|
m_timestamp = m_timestamp
|
|
};
|
|
return concurrent;
|
|
}
|
|
public struct Concurrent
|
|
{
|
|
[ReadOnly] internal NativeArray<NetworkPipelineStage> m_StageCollection;
|
|
[ReadOnly] internal NativeArray<byte> m_StaticInstanceBuffer;
|
|
[ReadOnly] internal NativeList<PipelineImpl> m_Pipelines;
|
|
[ReadOnly] internal NativeList<int> m_StageList;
|
|
[ReadOnly] internal NativeList<int> m_AccumulatedHeaderCapacity;
|
|
internal NativeQueue<UpdatePipeline>.ParallelWriter m_SendStageNeedsUpdateWrite;
|
|
[ReadOnly] internal NativeArray<int> sizePerConnection;
|
|
// TODO: not really read-only, just hacking the safety system
|
|
[ReadOnly] internal NativeList<byte> sharedBuffer;
|
|
[ReadOnly] internal NativeList<byte> sendBuffer;
|
|
[ReadOnly] internal NativeArray<long> m_timestamp;
|
|
|
|
public int SendHeaderCapacity(NetworkPipeline pipeline)
|
|
{
|
|
var p = m_Pipelines[pipeline.Id-1];
|
|
return p.headerCapacity;
|
|
}
|
|
public int PayloadCapacity(NetworkPipeline pipeline)
|
|
{
|
|
if (pipeline.Id > 0)
|
|
{
|
|
var p = m_Pipelines[pipeline.Id - 1];
|
|
return p.payloadCapacity;
|
|
}
|
|
return NetworkParameterConstants.MTU;
|
|
}
|
|
|
|
public unsafe int Send(NetworkDriver.Concurrent driver, NetworkPipeline pipeline, NetworkConnection connection, NetworkInterfaceSendHandle sendHandle, int headerSize)
|
|
{
|
|
if (sendHandle.data == IntPtr.Zero)
|
|
{
|
|
return (int) Error.StatusCode.NetworkSendHandleInvalid;
|
|
}
|
|
|
|
var p = m_Pipelines[pipeline.Id-1];
|
|
|
|
var connectionId = connection.m_NetworkId;
|
|
|
|
// TODO: not really read-only, just hacking the safety system
|
|
NativeArray<byte> tmpBuffer = sendBuffer;
|
|
int* sendBufferLock = (int*) tmpBuffer.GetUnsafeReadOnlyPtr();
|
|
sendBufferLock += connectionId * sizePerConnection[SendSizeOffset] / 4;
|
|
|
|
if (Interlocked.CompareExchange(ref *sendBufferLock, 1, 0) != 0)
|
|
{
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
UnityEngine.Debug.LogError("The parallel network driver needs to process a single unique connection per job, processing a single connection multiple times in a parallel for is not supported.");
|
|
return (int) Error.StatusCode.NetworkDriverParallelForErr;
|
|
#else
|
|
return (int) Error.StatusCode.NetworkDriverParallelForErr;
|
|
#endif
|
|
}
|
|
NativeList<UpdatePipeline> currentUpdates = new NativeList<UpdatePipeline>(128, Allocator.Temp);
|
|
|
|
int retval = ProcessPipelineSend(driver, 0, pipeline, connection, sendHandle, headerSize, currentUpdates);
|
|
|
|
Interlocked.Exchange(ref *sendBufferLock, 0);
|
|
// Move the updates requested in this iteration to the concurrent queue so it can be read/parsed in update routine
|
|
for (int i = 0; i < currentUpdates.Length; ++i)
|
|
m_SendStageNeedsUpdateWrite.Enqueue(currentUpdates[i]);
|
|
|
|
return retval;
|
|
}
|
|
|
|
internal unsafe int ProcessPipelineSend(NetworkDriver.Concurrent driver, int startStage, NetworkPipeline pipeline, NetworkConnection connection,
|
|
NetworkInterfaceSendHandle sendHandle, int headerSize, NativeList<UpdatePipeline> currentUpdates)
|
|
{
|
|
int initialHeaderSize = headerSize;
|
|
int retval = sendHandle.size;
|
|
NetworkPipelineContext ctx = default(NetworkPipelineContext);
|
|
ctx.timestamp = m_timestamp[0];
|
|
var p = m_Pipelines[pipeline.Id-1];
|
|
var connectionId = connection.m_NetworkId;
|
|
|
|
var resumeQ = new NativeList<int>(16, Allocator.Temp);
|
|
int resumeQStart = 0;
|
|
|
|
// If the call comes from update, the sendHandle is set to default.
|
|
var inboundBuffer = default(InboundSendBuffer);
|
|
if (sendHandle.data != IntPtr.Zero)
|
|
{
|
|
inboundBuffer.bufferWithHeaders = (byte*)sendHandle.data + initialHeaderSize + 1;
|
|
inboundBuffer.bufferWithHeadersLength = sendHandle.size - initialHeaderSize - 1;
|
|
inboundBuffer.buffer = inboundBuffer.bufferWithHeaders + p.headerCapacity;
|
|
inboundBuffer.bufferLength = inboundBuffer.bufferWithHeadersLength - p.headerCapacity;
|
|
}
|
|
|
|
while (true)
|
|
{
|
|
headerSize = p.headerCapacity;
|
|
|
|
int internalBufferOffset = p.sendBufferOffset + sizePerConnection[SendSizeOffset] * connectionId;
|
|
int internalSharedBufferOffset = p.sharedBufferOffset + sizePerConnection[SharedSizeOffset] * connectionId;
|
|
|
|
// If this is not the first stage we need to fast forward the buffer offset to the correct place
|
|
if (startStage > 0)
|
|
{
|
|
if (inboundBuffer.bufferWithHeadersLength > 0)
|
|
{
|
|
UnityEngine.Debug.LogError("Can't start from a stage with a buffer");
|
|
return (int)Error.StatusCode.NetworkStateMismatch;
|
|
}
|
|
for (int i = 0; i < startStage; ++i)
|
|
{
|
|
internalBufferOffset += (m_StageCollection[m_StageList[p.FirstStageIndex + i]].SendCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
internalSharedBufferOffset += (m_StageCollection[m_StageList[p.FirstStageIndex + i]].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
headerSize -= m_StageCollection[m_StageList[p.FirstStageIndex + i]].HeaderCapacity;
|
|
}
|
|
}
|
|
|
|
for (int i = startStage; i < p.NumStages; ++i)
|
|
{
|
|
int stageHeaderCapacity = m_StageCollection[m_StageList[p.FirstStageIndex + i]].HeaderCapacity;
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
if (stageHeaderCapacity > headerSize)
|
|
throw new InvalidOperationException("The stage does not contain enough header space to send the message");
|
|
#endif
|
|
inboundBuffer.headerPadding = headerSize;
|
|
headerSize -= stageHeaderCapacity;
|
|
if (stageHeaderCapacity > 0 && inboundBuffer.bufferWithHeadersLength > 0)
|
|
{
|
|
var headerArray = NativeArrayUnsafeUtility.ConvertExistingDataToNativeArray<byte>(inboundBuffer.bufferWithHeaders + headerSize, stageHeaderCapacity, Allocator.Invalid);
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
NativeArrayUnsafeUtility.SetAtomicSafetyHandle(ref headerArray, AtomicSafetyHandle.GetTempMemoryHandle());
|
|
#endif
|
|
ctx.header = new DataStreamWriter(headerArray);
|
|
|
|
}
|
|
else
|
|
ctx.header = new DataStreamWriter(stageHeaderCapacity, Allocator.Temp);
|
|
var prevInbound = inboundBuffer;
|
|
NetworkPipelineStage.Requests requests = NetworkPipelineStage.Requests.None;
|
|
|
|
var sendResult = ProcessSendStage(i, internalBufferOffset, internalSharedBufferOffset, p, ref resumeQ, ref ctx, ref inboundBuffer, ref requests);
|
|
|
|
if ((requests & NetworkPipelineStage.Requests.Update) != 0)
|
|
AddSendUpdate(connection, i, pipeline, currentUpdates);
|
|
|
|
if (inboundBuffer.bufferWithHeadersLength == 0)
|
|
{
|
|
if ((requests & NetworkPipelineStage.Requests.Error) != 0 && sendHandle.data != IntPtr.Zero)
|
|
retval = sendResult;
|
|
break;
|
|
}
|
|
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
if (inboundBuffer.headerPadding != prevInbound.headerPadding)
|
|
throw new InvalidOperationException("Changing the header padding in a pipeline is not supported");
|
|
#endif
|
|
if (inboundBuffer.buffer != prevInbound.buffer)
|
|
{
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
if (inboundBuffer.buffer != inboundBuffer.bufferWithHeaders + inboundBuffer.headerPadding ||
|
|
inboundBuffer.bufferLength + inboundBuffer.headerPadding > inboundBuffer.bufferWithHeadersLength)
|
|
throw new InvalidOperationException("When creating an internal buffer in pipelines the buffer must be a subset of the buffer with headers");
|
|
#endif
|
|
// Copy header to new buffer so it is part of the payload
|
|
UnsafeUtility.MemCpy(inboundBuffer.bufferWithHeaders + headerSize, ctx.header.AsNativeArray().GetUnsafeReadOnlyPtr(), ctx.header.Length);
|
|
}
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
else
|
|
{
|
|
if (inboundBuffer.bufferWithHeaders != prevInbound.bufferWithHeaders)
|
|
throw new InvalidOperationException("Changing the send buffer with headers without changing the buffer is not supported");
|
|
}
|
|
#endif
|
|
if (ctx.header.Length < stageHeaderCapacity)
|
|
{
|
|
int wastedSpace = stageHeaderCapacity - ctx.header.Length;
|
|
// Remove wasted space in the header
|
|
UnsafeUtility.MemMove(inboundBuffer.buffer - wastedSpace, inboundBuffer.buffer, inboundBuffer.bufferLength);
|
|
}
|
|
|
|
// Update the inbound buffer for next iteration
|
|
inboundBuffer.buffer = inboundBuffer.bufferWithHeaders + headerSize;
|
|
inboundBuffer.bufferLength = ctx.header.Length + inboundBuffer.bufferLength;
|
|
|
|
|
|
internalBufferOffset += (ctx.internalProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
internalSharedBufferOffset += (ctx.internalSharedProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
}
|
|
|
|
if (inboundBuffer.bufferLength != 0)
|
|
{
|
|
if (sendHandle.data != IntPtr.Zero && inboundBuffer.bufferWithHeaders == (byte*)sendHandle.data + initialHeaderSize + 1)
|
|
{
|
|
// Actually send the data - after collapsing it again
|
|
if (inboundBuffer.buffer != inboundBuffer.bufferWithHeaders)
|
|
{
|
|
UnsafeUtility.MemMove(inboundBuffer.bufferWithHeaders, inboundBuffer.buffer, inboundBuffer.bufferLength);
|
|
inboundBuffer.buffer = inboundBuffer.bufferWithHeaders;
|
|
}
|
|
((byte*)sendHandle.data)[initialHeaderSize] = (byte)pipeline.Id;
|
|
int sendSize = initialHeaderSize + 1 + inboundBuffer.bufferLength;
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
if (sendSize > sendHandle.size)
|
|
throw new InvalidOperationException("Pipeline increased the data in the buffer, this is not allowed");
|
|
#endif
|
|
sendHandle.size = sendSize;
|
|
if ((retval = driver.CompleteSend(connection, sendHandle, true)) < 0)
|
|
{
|
|
UnityEngine.Debug.LogWarning(FixedString.Format("CompleteSend failed with the following error code: {0}", retval));
|
|
}
|
|
sendHandle = default;
|
|
}
|
|
else
|
|
{
|
|
// TODO: This sends the packet directly, bypassing the pipeline process. The problem is that in that way
|
|
// we can't set the hasPipeline flag in the headers. There is a workaround for now.
|
|
// Sending without pipeline, the correct pipeline will be added by the default flags when this is called
|
|
|
|
if (driver.BeginSend(connection, out var writer) == 0)
|
|
{
|
|
writer.WriteByte((byte)pipeline.Id);
|
|
writer.WriteBytes(inboundBuffer.buffer, inboundBuffer.bufferLength);
|
|
if (writer.HasFailedWrites)
|
|
driver.AbortSend(writer);
|
|
else
|
|
{
|
|
if ((retval = driver.EndSend(writer)) <= 0)
|
|
{
|
|
UnityEngine.Debug.Log(FixedString.Format("An error occurred during EndSend. ErrorCode: {0}", retval));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (resumeQStart >= resumeQ.Length)
|
|
{
|
|
break;
|
|
}
|
|
|
|
startStage = resumeQ[resumeQStart++];
|
|
|
|
inboundBuffer = default(InboundSendBuffer);
|
|
}
|
|
if (sendHandle.data != IntPtr.Zero)
|
|
driver.AbortSend(sendHandle);
|
|
return retval;
|
|
}
|
|
|
|
private unsafe int ProcessSendStage(int startStage, int internalBufferOffset, int internalSharedBufferOffset,
|
|
PipelineImpl p, ref NativeList<int> resumeQ, ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
|
|
{
|
|
var stageIndex = p.FirstStageIndex + startStage;
|
|
var pipelineStage = m_StageCollection[m_StageList[stageIndex]];
|
|
ctx.accumulatedHeaderCapacity = m_AccumulatedHeaderCapacity[stageIndex];
|
|
ctx.staticInstanceBuffer = (byte*)m_StaticInstanceBuffer.GetUnsafeReadOnlyPtr() + pipelineStage.StaticStateStart;
|
|
ctx.staticInstanceBufferLength = pipelineStage.StaticStateCapcity;
|
|
ctx.internalProcessBuffer = (byte*)sendBuffer.GetUnsafeReadOnlyPtr() + internalBufferOffset;
|
|
ctx.internalProcessBufferLength = pipelineStage.SendCapacity;
|
|
|
|
ctx.internalSharedProcessBuffer = (byte*)sharedBuffer.GetUnsafeReadOnlyPtr() + internalSharedBufferOffset;
|
|
ctx.internalSharedProcessBufferLength = pipelineStage.SharedStateCapacity;
|
|
|
|
requests = NetworkPipelineStage.Requests.None;
|
|
var retval = pipelineStage.Send.Ptr.Invoke(ref ctx, ref inboundBuffer, ref requests);
|
|
if ((requests & NetworkPipelineStage.Requests.Resume) != 0)
|
|
resumeQ.Add(startStage);
|
|
return retval;
|
|
}
|
|
}
|
|
private NativeArray<NetworkPipelineStage> m_StageCollection;
|
|
private NativeArray<byte> m_StaticInstanceBuffer;
|
|
private NativeList<int> m_StageList;
|
|
private NativeList<int> m_AccumulatedHeaderCapacity;
|
|
private NativeList<PipelineImpl> m_Pipelines;
|
|
private NativeList<byte> m_ReceiveBuffer;
|
|
private NativeList<byte> m_SendBuffer;
|
|
private NativeList<byte> m_SharedBuffer;
|
|
private NativeList<UpdatePipeline> m_ReceiveStageNeedsUpdate;
|
|
private NativeList<UpdatePipeline> m_SendStageNeedsUpdate;
|
|
private NativeQueue<UpdatePipeline> m_SendStageNeedsUpdateRead;
|
|
|
|
private NativeArray<int> sizePerConnection;
|
|
|
|
private NativeArray<long> m_timestamp;
|
|
|
|
private const int SendSizeOffset = 0;
|
|
private const int RecveiveSizeOffset = 1;
|
|
private const int SharedSizeOffset = 2;
|
|
|
|
internal struct PipelineImpl
|
|
{
|
|
public int FirstStageIndex;
|
|
public int NumStages;
|
|
|
|
public int receiveBufferOffset;
|
|
public int sendBufferOffset;
|
|
public int sharedBufferOffset;
|
|
public int headerCapacity;
|
|
public int payloadCapacity;
|
|
}
|
|
|
|
public unsafe NetworkPipelineProcessor(params INetworkParameter[] param)
|
|
{
|
|
NetworkPipelineParams config = default(NetworkPipelineParams);
|
|
for (int i = 0; i < param.Length; ++i)
|
|
{
|
|
if (param[i] is NetworkPipelineParams)
|
|
config = (NetworkPipelineParams)param[i];
|
|
}
|
|
|
|
int staticBufferSize = 0;
|
|
for (int i = 0; i < NetworkPipelineStageCollection.m_stages.Count; ++i)
|
|
{
|
|
staticBufferSize += NetworkPipelineStageCollection.m_stages[i].StaticSize;
|
|
staticBufferSize = (staticBufferSize+15)&(~15);
|
|
}
|
|
m_StaticInstanceBuffer = new NativeArray<byte>(staticBufferSize, Allocator.Persistent);
|
|
m_StageCollection = new NativeArray<NetworkPipelineStage>(NetworkPipelineStageCollection.m_stages.Count, Allocator.Persistent);
|
|
staticBufferSize = 0;
|
|
for (int i = 0; i < NetworkPipelineStageCollection.m_stages.Count; ++i)
|
|
{
|
|
var stageStruct = NetworkPipelineStageCollection.m_stages[i].StaticInitialize((byte*)m_StaticInstanceBuffer.GetUnsafePtr() + staticBufferSize, NetworkPipelineStageCollection.m_stages[i].StaticSize, param);
|
|
stageStruct.StaticStateStart = staticBufferSize;
|
|
stageStruct.StaticStateCapcity = NetworkPipelineStageCollection.m_stages[i].StaticSize;
|
|
m_StageCollection[i] = stageStruct;
|
|
staticBufferSize += NetworkPipelineStageCollection.m_stages[i].StaticSize;
|
|
staticBufferSize = (staticBufferSize+15)&(~15);
|
|
}
|
|
|
|
m_StageList = new NativeList<int>(16, Allocator.Persistent);
|
|
m_AccumulatedHeaderCapacity = new NativeList<int>(16, Allocator.Persistent);
|
|
m_Pipelines = new NativeList<PipelineImpl>(16, Allocator.Persistent);
|
|
m_ReceiveBuffer = new NativeList<byte>(config.initialCapacity, Allocator.Persistent);
|
|
m_SendBuffer = new NativeList<byte>(config.initialCapacity, Allocator.Persistent);
|
|
m_SharedBuffer = new NativeList<byte>(config.initialCapacity, Allocator.Persistent);
|
|
sizePerConnection = new NativeArray<int>(3, Allocator.Persistent);
|
|
// Store an int for the spinlock first in each connections send buffer, round up to alignment of 8
|
|
sizePerConnection[SendSizeOffset] = Alignment;
|
|
m_ReceiveStageNeedsUpdate = new NativeList<UpdatePipeline>(128, Allocator.Persistent);
|
|
m_SendStageNeedsUpdate = new NativeList<UpdatePipeline>(128, Allocator.Persistent);
|
|
m_SendStageNeedsUpdateRead = new NativeQueue<UpdatePipeline>(Allocator.Persistent);
|
|
m_timestamp = new NativeArray<long>(1, Allocator.Persistent);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
m_StageList.Dispose();
|
|
m_AccumulatedHeaderCapacity.Dispose();
|
|
m_ReceiveBuffer.Dispose();
|
|
m_SendBuffer.Dispose();
|
|
m_SharedBuffer.Dispose();
|
|
m_Pipelines.Dispose();
|
|
sizePerConnection.Dispose();
|
|
m_ReceiveStageNeedsUpdate.Dispose();
|
|
m_SendStageNeedsUpdate.Dispose();
|
|
m_SendStageNeedsUpdateRead.Dispose();
|
|
m_timestamp.Dispose();
|
|
m_StageCollection.Dispose();
|
|
m_StaticInstanceBuffer.Dispose();
|
|
}
|
|
|
|
public long Timestamp
|
|
{
|
|
get { return m_timestamp[0]; }
|
|
internal set { m_timestamp[0] = value; }
|
|
}
|
|
|
|
public unsafe void initializeConnection(NetworkConnection con)
|
|
{
|
|
var requiredReceiveSize = (con.m_NetworkId + 1) * sizePerConnection[RecveiveSizeOffset];
|
|
var requiredSendSize = (con.m_NetworkId + 1) * sizePerConnection[SendSizeOffset];
|
|
var requiredSharedSize = (con.m_NetworkId + 1) * sizePerConnection[SharedSizeOffset];
|
|
if (m_ReceiveBuffer.Length < requiredReceiveSize)
|
|
m_ReceiveBuffer.ResizeUninitialized(requiredReceiveSize);
|
|
if (m_SendBuffer.Length < requiredSendSize)
|
|
m_SendBuffer.ResizeUninitialized(requiredSendSize);
|
|
if (m_SharedBuffer.Length < requiredSharedSize)
|
|
m_SharedBuffer.ResizeUninitialized(requiredSharedSize);
|
|
|
|
UnsafeUtility.MemClear((byte*)m_ReceiveBuffer.GetUnsafePtr() + con.m_NetworkId * sizePerConnection[RecveiveSizeOffset], sizePerConnection[RecveiveSizeOffset]);
|
|
UnsafeUtility.MemClear((byte*)m_SendBuffer.GetUnsafePtr() + con.m_NetworkId * sizePerConnection[SendSizeOffset], sizePerConnection[SendSizeOffset]);
|
|
UnsafeUtility.MemClear((byte*)m_SharedBuffer.GetUnsafePtr() + con.m_NetworkId * sizePerConnection[SharedSizeOffset], sizePerConnection[SharedSizeOffset]);
|
|
|
|
InitializeStages(con.m_NetworkId);
|
|
}
|
|
|
|
unsafe void InitializeStages(int networkId)
|
|
{
|
|
var connectionId = networkId;
|
|
|
|
for (int i = 0; i < m_Pipelines.Length; i++)
|
|
{
|
|
var pipeline = m_Pipelines[i];
|
|
|
|
int recvBufferOffset = pipeline.receiveBufferOffset + sizePerConnection[RecveiveSizeOffset] * connectionId;
|
|
int sendBufferOffset = pipeline.sendBufferOffset + sizePerConnection[SendSizeOffset] * connectionId;
|
|
int sharedBufferOffset = pipeline.sharedBufferOffset + sizePerConnection[SharedSizeOffset] * connectionId;
|
|
|
|
for (int stage = pipeline.FirstStageIndex;
|
|
stage < pipeline.FirstStageIndex + pipeline.NumStages;
|
|
stage++)
|
|
{
|
|
var pipelineStage = m_StageCollection[m_StageList[stage]];
|
|
var sendProcessBuffer = (byte*)m_SendBuffer.GetUnsafePtr() + sendBufferOffset;
|
|
var sendProcessBufferLength = pipelineStage.SendCapacity;
|
|
var recvProcessBuffer = (byte*)m_ReceiveBuffer.GetUnsafePtr() + recvBufferOffset;
|
|
var recvProcessBufferLength = pipelineStage.ReceiveCapacity;
|
|
var sharedProcessBuffer = (byte*)m_SharedBuffer.GetUnsafePtr() + sharedBufferOffset;
|
|
var sharedProcessBufferLength = pipelineStage.SharedStateCapacity;
|
|
|
|
var staticInstanceBuffer = (byte*)m_StaticInstanceBuffer.GetUnsafePtr() + pipelineStage.StaticStateStart;
|
|
var staticInstanceBufferLength = pipelineStage.StaticStateCapcity;
|
|
pipelineStage.InitializeConnection.Ptr.Invoke(staticInstanceBuffer, staticInstanceBufferLength,
|
|
sendProcessBuffer, sendProcessBufferLength, recvProcessBuffer, recvProcessBufferLength,
|
|
sharedProcessBuffer, sharedProcessBufferLength);
|
|
|
|
sendBufferOffset += (sendProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
recvBufferOffset += (recvProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
sharedBufferOffset += (sharedProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a new NetworkPipeline.
|
|
/// </summary>
|
|
/// <param name="stages">The stages we want the pipeline to contain.</param>
|
|
/// <value>A valid pipeline is returned.</value>
|
|
/// <exception cref="InvalidOperationException">Thrown if you try to create more then 255 pipelines.</exception>
|
|
/// <exception cref="InvalidOperationException">Thrown if you try to use a invalid pipeline stage.</exception>
|
|
public NetworkPipeline CreatePipeline(params Type[] stages)
|
|
{
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
if (m_Pipelines.Length > 255)
|
|
throw new InvalidOperationException("Cannot create more than 255 pipelines on a single driver");
|
|
#endif
|
|
var receiveCap = 0;
|
|
var sharedCap = 0;
|
|
var sendCap = 0;
|
|
var headerCap = 0;
|
|
var payloadCap = 0;
|
|
var pipeline = new PipelineImpl();
|
|
pipeline.FirstStageIndex = m_StageList.Length;
|
|
pipeline.NumStages = stages.Length;
|
|
for (int i = 0; i < stages.Length; i++)
|
|
{
|
|
var stageId = NetworkPipelineStageCollection.GetStageId(stages[i]).Index;
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
if (stageId < 0)
|
|
throw new InvalidOperationException("Trying to create pipeline with invalid stage " + stages[i]);
|
|
#endif
|
|
m_StageList.Add(stageId);
|
|
m_AccumulatedHeaderCapacity.Add(headerCap); // For every stage, compute how much header space has already bee used by other stages when sending
|
|
// Make sure all data buffers are aligned
|
|
receiveCap += (m_StageCollection[stageId].ReceiveCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
sendCap += (m_StageCollection[stageId].SendCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
headerCap += m_StageCollection[stageId].HeaderCapacity;
|
|
sharedCap += (m_StageCollection[stageId].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
if (payloadCap == 0)
|
|
{
|
|
payloadCap = m_StageCollection[stageId].PayloadCapacity; // The first non-zero stage determines the pipeline capacity
|
|
}
|
|
}
|
|
|
|
pipeline.receiveBufferOffset = sizePerConnection[RecveiveSizeOffset];
|
|
sizePerConnection[RecveiveSizeOffset] = sizePerConnection[RecveiveSizeOffset] + receiveCap;
|
|
|
|
pipeline.sendBufferOffset = sizePerConnection[SendSizeOffset];
|
|
sizePerConnection[SendSizeOffset] = sizePerConnection[SendSizeOffset] + sendCap;
|
|
|
|
pipeline.sharedBufferOffset = sizePerConnection[SharedSizeOffset];
|
|
sizePerConnection[SharedSizeOffset] = sizePerConnection[SharedSizeOffset] + sharedCap;
|
|
|
|
pipeline.headerCapacity = headerCap;
|
|
// If no stage explicitly supports more tha MTU the pipeline as a whole does not support more than one MTU
|
|
pipeline.payloadCapacity = (payloadCap!=0) ? payloadCap : NetworkParameterConstants.MTU;
|
|
|
|
m_Pipelines.Add(pipeline);
|
|
return new NetworkPipeline {Id = m_Pipelines.Length};
|
|
}
|
|
|
|
public void GetPipelineBuffers(NetworkPipeline pipelineId, NetworkPipelineStageId stageId, NetworkConnection connection,
|
|
out NativeArray<byte> readProcessingBuffer, out NativeArray<byte> writeProcessingBuffer,
|
|
out NativeArray<byte> sharedBuffer)
|
|
{
|
|
if (pipelineId.Id < 1)
|
|
throw new InvalidOperationException("The specified pipeline is not valid");
|
|
if (stageId.IsValid == 0)
|
|
throw new InvalidOperationException("The specified pipeline stage is not valid");
|
|
var pipeline = m_Pipelines[pipelineId.Id-1];
|
|
|
|
int recvBufferOffset = pipeline.receiveBufferOffset + sizePerConnection[RecveiveSizeOffset] * connection.InternalId;
|
|
int sendBufferOffset = pipeline.sendBufferOffset + sizePerConnection[SendSizeOffset] * connection.InternalId;
|
|
int sharedBufferOffset = pipeline.sharedBufferOffset + sizePerConnection[SharedSizeOffset] * connection.InternalId;
|
|
|
|
int stageIndexInList;
|
|
bool stageNotFound = true;
|
|
for (stageIndexInList = pipeline.FirstStageIndex;
|
|
stageIndexInList < pipeline.FirstStageIndex + pipeline.NumStages;
|
|
stageIndexInList++)
|
|
{
|
|
if (m_StageList[stageIndexInList] == stageId.Index)
|
|
{
|
|
stageNotFound = false;
|
|
break;
|
|
}
|
|
sendBufferOffset += (m_StageCollection[m_StageList[stageIndexInList]].SendCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
recvBufferOffset += (m_StageCollection[m_StageList[stageIndexInList]].ReceiveCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
sharedBufferOffset += (m_StageCollection[m_StageList[stageIndexInList]].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
}
|
|
|
|
if (stageNotFound)
|
|
{
|
|
#if ENABLE_UNITY_COLLECTIONS_CHECKS
|
|
throw new InvalidOperationException($"Could not find stage ID {stageId} make sure the type for this stage ID is added when the pipeline is created.");
|
|
#else
|
|
writeProcessingBuffer = default;
|
|
readProcessingBuffer = default;
|
|
sharedBuffer = default;
|
|
return;
|
|
#endif
|
|
}
|
|
|
|
writeProcessingBuffer = ((NativeArray<byte>)m_SendBuffer).GetSubArray(sendBufferOffset, m_StageCollection[m_StageList[stageIndexInList]].SendCapacity);
|
|
readProcessingBuffer = ((NativeArray<byte>)m_ReceiveBuffer).GetSubArray(recvBufferOffset, m_StageCollection[m_StageList[stageIndexInList]].ReceiveCapacity);
|
|
sharedBuffer = ((NativeArray<byte>)m_SharedBuffer).GetSubArray(sharedBufferOffset, m_StageCollection[m_StageList[stageIndexInList]].SharedStateCapacity);
|
|
}
|
|
|
|
internal struct UpdatePipeline
|
|
{
|
|
public NetworkPipeline pipeline;
|
|
public int stage;
|
|
public NetworkConnection connection;
|
|
}
|
|
|
|
internal unsafe void UpdateSend(NetworkDriver.Concurrent driver, out int updateCount)
|
|
{
|
|
// Clear the send lock since it cannot be kept here and can be lost if there are exceptions in send
|
|
NativeArray<byte> tmpBuffer = m_SendBuffer;
|
|
int* sendBufferLock = (int*) tmpBuffer.GetUnsafePtr();
|
|
for (int connectionOffset = 0; connectionOffset < m_SendBuffer.Length; connectionOffset += sizePerConnection[SendSizeOffset])
|
|
sendBufferLock[connectionOffset / 4] = 0;
|
|
|
|
NativeArray<UpdatePipeline> sendUpdates = new NativeArray<UpdatePipeline>(m_SendStageNeedsUpdateRead.Count + m_SendStageNeedsUpdate.Length, Allocator.Temp);
|
|
|
|
UpdatePipeline updateItem;
|
|
updateCount = 0;
|
|
while (m_SendStageNeedsUpdateRead.TryDequeue(out updateItem))
|
|
{
|
|
if (driver.GetConnectionState(updateItem.connection) == NetworkConnection.State.Connected)
|
|
sendUpdates[updateCount++] = updateItem;
|
|
}
|
|
|
|
int startLength = updateCount;
|
|
for (int i = 0; i < m_SendStageNeedsUpdate.Length; i++)
|
|
{
|
|
if (driver.GetConnectionState(m_SendStageNeedsUpdate[i].connection) == NetworkConnection.State.Connected)
|
|
sendUpdates[updateCount++] = m_SendStageNeedsUpdate[i];
|
|
}
|
|
|
|
NativeList<UpdatePipeline> currentUpdates = new NativeList<UpdatePipeline>(128, Allocator.Temp);
|
|
// Move the updates requested in this iteration to the concurrent queue so it can be read/parsed in update routine
|
|
for (int i = 0; i < updateCount; ++i)
|
|
{
|
|
updateItem = sendUpdates[i];
|
|
var result = ToConcurrent().ProcessPipelineSend(driver, updateItem.stage, updateItem.pipeline, updateItem.connection, default, 0, currentUpdates);
|
|
if (result < 0)
|
|
{
|
|
UnityEngine.Debug.LogWarning(FixedString.Format("ProcessPipelineSend failed with the following error code {0}.", result));
|
|
}
|
|
}
|
|
for (int i = 0; i < currentUpdates.Length; ++i)
|
|
m_SendStageNeedsUpdateRead.Enqueue(currentUpdates[i]);
|
|
}
|
|
|
|
private static void AddSendUpdate(NetworkConnection connection, int stageId, NetworkPipeline pipelineId, NativeList<UpdatePipeline> currentUpdates)
|
|
{
|
|
var newUpdate = new UpdatePipeline
|
|
{connection = connection, stage = stageId, pipeline = pipelineId};
|
|
bool uniqueItem = true;
|
|
for (int j = 0; j < currentUpdates.Length; ++j)
|
|
{
|
|
if (currentUpdates[j].stage == newUpdate.stage &&
|
|
currentUpdates[j].pipeline.Id == newUpdate.pipeline.Id &&
|
|
currentUpdates[j].connection == newUpdate.connection)
|
|
uniqueItem = false;
|
|
}
|
|
if (uniqueItem)
|
|
currentUpdates.Add(newUpdate);
|
|
}
|
|
|
|
public void UpdateReceive(NetworkDriver driver, out int updateCount)
|
|
{
|
|
NativeArray<UpdatePipeline> receiveUpdates = new NativeArray<UpdatePipeline>(m_ReceiveStageNeedsUpdate.Length, Allocator.Temp);
|
|
|
|
// Move current update requests to a new queue
|
|
updateCount = 0;
|
|
for (int i = 0; i < m_ReceiveStageNeedsUpdate.Length; ++i)
|
|
{
|
|
if (driver.GetConnectionState(m_ReceiveStageNeedsUpdate[i].connection) == NetworkConnection.State.Connected)
|
|
receiveUpdates[updateCount++] = m_ReceiveStageNeedsUpdate[i];
|
|
}
|
|
m_ReceiveStageNeedsUpdate.Clear();
|
|
|
|
// Process all current requested updates, new update requests will (possibly) be generated from the pipeline stages
|
|
for (int i = 0; i < updateCount; ++i)
|
|
{
|
|
UpdatePipeline updateItem = receiveUpdates[i];
|
|
ProcessReceiveStagesFrom(driver, updateItem.stage, updateItem.pipeline, updateItem.connection, default);
|
|
}
|
|
}
|
|
|
|
public unsafe void Receive(NetworkDriver driver, NetworkConnection connection, NativeArray<byte> buffer)
|
|
{
|
|
byte pipelineId = buffer[0];
|
|
if (pipelineId == 0 || pipelineId > m_Pipelines.Length)
|
|
{
|
|
UnityEngine.Debug.LogError("Received a packet with an invalid pipeline.");
|
|
return;
|
|
}
|
|
var p = m_Pipelines[pipelineId-1];
|
|
int startStage = p.NumStages - 1;
|
|
|
|
InboundRecvBuffer inBuffer;
|
|
inBuffer.buffer = (byte*)buffer.GetUnsafePtr() + 1;
|
|
inBuffer.bufferLength = buffer.Length - 1;
|
|
ProcessReceiveStagesFrom(driver, startStage, new NetworkPipeline{Id = pipelineId}, connection, inBuffer);
|
|
}
|
|
|
|
|
|
private unsafe void ProcessReceiveStagesFrom(NetworkDriver driver, int startStage, NetworkPipeline pipeline, NetworkConnection connection, InboundRecvBuffer buffer)
|
|
{
|
|
var p = m_Pipelines[pipeline.Id-1];
|
|
var connectionId = connection.m_NetworkId;
|
|
var resumeQ = new NativeList<int>(16, Allocator.Temp);
|
|
int resumeQStart = 0;
|
|
|
|
NetworkPipelineContext ctx = default(NetworkPipelineContext);
|
|
ctx.timestamp = Timestamp;
|
|
var inboundBuffer = buffer;
|
|
ctx.header = default(DataStreamWriter);
|
|
NativeList<UpdatePipeline> sendUpdates = new NativeList<UpdatePipeline>(128, Allocator.Temp);
|
|
|
|
while (true)
|
|
{
|
|
bool needsUpdate = false;
|
|
bool needsSendUpdate = false;
|
|
int internalBufferOffset = p.receiveBufferOffset + sizePerConnection[RecveiveSizeOffset] * connectionId;
|
|
int internalSharedBufferOffset = p.sharedBufferOffset + sizePerConnection[SharedSizeOffset] * connectionId;
|
|
|
|
// Adjust offset accounting for stages in front of the starting stage, since we're parsing the stages in reverse order
|
|
for (int st = 0; st < startStage; ++st)
|
|
{
|
|
internalBufferOffset += (m_StageCollection[m_StageList[p.FirstStageIndex+st]].ReceiveCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
internalSharedBufferOffset += (m_StageCollection[m_StageList[p.FirstStageIndex+st]].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
}
|
|
|
|
for (int i = startStage; i >= 0; --i)
|
|
{
|
|
ProcessReceiveStage(i, pipeline, internalBufferOffset, internalSharedBufferOffset, ref ctx, ref inboundBuffer, ref resumeQ, ref needsUpdate, ref needsSendUpdate);
|
|
if (needsUpdate)
|
|
{
|
|
var newUpdate = new UpdatePipeline
|
|
{connection = connection, stage = i, pipeline = pipeline};
|
|
bool uniqueItem = true;
|
|
for (int j = 0; j < m_ReceiveStageNeedsUpdate.Length; ++j)
|
|
{
|
|
if (m_ReceiveStageNeedsUpdate[j].stage == newUpdate.stage &&
|
|
m_ReceiveStageNeedsUpdate[j].pipeline.Id == newUpdate.pipeline.Id &&
|
|
m_ReceiveStageNeedsUpdate[j].connection == newUpdate.connection)
|
|
uniqueItem = false;
|
|
}
|
|
if (uniqueItem)
|
|
m_ReceiveStageNeedsUpdate.Add(newUpdate);
|
|
}
|
|
|
|
if (needsSendUpdate)
|
|
AddSendUpdate(connection, i, pipeline, m_SendStageNeedsUpdate);
|
|
|
|
if (inboundBuffer.bufferLength == 0)
|
|
break;
|
|
|
|
// Offset needs to be adjusted for the next pipeline (the one in front of this one)
|
|
if (i > 0)
|
|
{
|
|
internalBufferOffset -=
|
|
(m_StageCollection[m_StageList[p.FirstStageIndex + i - 1]].ReceiveCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
internalSharedBufferOffset -=
|
|
(m_StageCollection[m_StageList[p.FirstStageIndex + i - 1]].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
|
|
}
|
|
|
|
needsUpdate = false;
|
|
}
|
|
|
|
if (inboundBuffer.bufferLength != 0)
|
|
driver.PushDataEvent(connection, pipeline.Id, inboundBuffer.buffer, inboundBuffer.bufferLength);
|
|
|
|
if (resumeQStart >= resumeQ.Length)
|
|
{
|
|
return;
|
|
}
|
|
|
|
startStage = resumeQ[resumeQStart++];
|
|
inboundBuffer = default;
|
|
}
|
|
}
|
|
|
|
private unsafe void ProcessReceiveStage(int stage, NetworkPipeline pipeline, int internalBufferOffset, int internalSharedBufferOffset, ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer, ref NativeList<int> resumeQ, ref bool needsUpdate, ref bool needsSendUpdate)
|
|
{
|
|
var p = m_Pipelines[pipeline.Id-1];
|
|
|
|
var stageId = m_StageList[p.FirstStageIndex + stage];
|
|
var pipelineStage = m_StageCollection[stageId];
|
|
ctx.staticInstanceBuffer = (byte*)m_StaticInstanceBuffer.GetUnsafePtr() + pipelineStage.StaticStateStart;
|
|
ctx.staticInstanceBufferLength = pipelineStage.StaticStateCapcity;
|
|
ctx.internalProcessBuffer = (byte*)m_ReceiveBuffer.GetUnsafePtr() + internalBufferOffset;
|
|
ctx.internalProcessBufferLength = pipelineStage.ReceiveCapacity;
|
|
ctx.internalSharedProcessBuffer = (byte*)m_SharedBuffer.GetUnsafePtr() + internalSharedBufferOffset;
|
|
ctx.internalSharedProcessBufferLength = pipelineStage.SharedStateCapacity;
|
|
NetworkPipelineStage.Requests requests = NetworkPipelineStage.Requests.None;
|
|
|
|
pipelineStage.Receive.Ptr.Invoke(ref ctx, ref inboundBuffer, ref requests);
|
|
|
|
if ((requests & NetworkPipelineStage.Requests.Resume) != 0)
|
|
resumeQ.Add(stage);
|
|
needsUpdate = (requests & NetworkPipelineStage.Requests.Update) != 0;
|
|
needsSendUpdate = (requests & NetworkPipelineStage.Requests.SendUpdate) != 0;
|
|
}
|
|
|
|
[Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
|
|
public static void ValidateSendHandle(NetworkInterfaceSendHandle handle)
|
|
{
|
|
if (handle.data == IntPtr.Zero)
|
|
throw new ArgumentException($"Value for NetworkDataStreamParameter.size must be larger then zero.");
|
|
}
|
|
}
|
|
}
|