using System; using System.Diagnostics; using System.Collections.Generic; using System.Runtime.InteropServices; using Unity.Burst; using Unity.Collections; using Unity.Collections.LowLevel.Unsafe; using Unity.Networking.Transport.Protocols; using Unity.Jobs; using Unity.Jobs.LowLevel.Unsafe; using Unity.Mathematics; using Unity.Networking.Transport.Error; using Unity.Networking.Transport.Utilities; namespace Unity.Networking.Transport { public unsafe struct QueuedSendMessage { public fixed byte Data[NetworkParameterConstants.MTU]; public NetworkInterfaceEndPoint Dest; public int DataLength; } /// /// The NetworkDriver is an implementation of Virtual Connections over any transport. /// /// Basic usage: /// /// var driver = new NetworkDriver.Create(); /// /// public struct NetworkDriver : IDisposable { /// /// Create a Concurrent Copy of the NetworkDriver. /// public Concurrent ToConcurrent() { return new Concurrent { m_NetworkSendInterface = m_NetworkSendInterface, m_NetworkProtocolInterface = m_NetworkProtocolInterface, m_EventQueue = m_EventQueue.ToConcurrent(), m_ConnectionList = m_ConnectionList, m_DataStream = m_DataStream, m_DisconnectReasons = m_DisconnectReasons, m_PipelineProcessor = m_PipelineProcessor.ToConcurrent(), m_DefaultHeaderFlags = m_DefaultHeaderFlags, m_ConcurrentParallelSendQueue = m_ParallelSendQueue.AsParallelWriter(), #if ENABLE_UNITY_COLLECTIONS_CHECKS m_ThreadIndex = 0, m_PendingBeginSend = m_PendingBeginSend #endif }; } private Concurrent ToConcurrentSendOnly() { return new Concurrent { m_NetworkSendInterface = m_NetworkSendInterface, m_NetworkProtocolInterface = m_NetworkProtocolInterface, m_EventQueue = default, m_ConnectionList = m_ConnectionList, m_DataStream = m_DataStream, m_DisconnectReasons = m_DisconnectReasons, m_PipelineProcessor = m_PipelineProcessor.ToConcurrent(), m_DefaultHeaderFlags = m_DefaultHeaderFlags, m_ConcurrentParallelSendQueue = m_ParallelSendQueue.AsParallelWriter(), #if ENABLE_UNITY_COLLECTIONS_CHECKS m_ThreadIndex = 0, m_PendingBeginSend = m_PendingBeginSend #endif }; } /// /// The Concurrent struct is used to create an Concurrent instance of the GenericNetworkDriver. /// public struct Concurrent { public NetworkEvent.Type PopEventForConnection(NetworkConnection connectionId, out DataStreamReader reader) { return PopEventForConnection(connectionId, out reader, out var _); } public NetworkEvent.Type PopEventForConnection(NetworkConnection connectionId, out DataStreamReader reader, out NetworkPipeline pipeline) { pipeline = default(NetworkPipeline); reader = default(DataStreamReader); if (connectionId.m_NetworkId < 0 || connectionId.m_NetworkId >= m_ConnectionList.Length || m_ConnectionList[connectionId.m_NetworkId].Version != connectionId.m_NetworkVersion) return (int) NetworkEvent.Type.Empty; var type = m_EventQueue.PopEventForConnection(connectionId.m_NetworkId, out var offset, out var size, out var pipelineId); pipeline = new NetworkPipeline { Id = pipelineId }; if (type == NetworkEvent.Type.Disconnect && offset < 0) reader = new DataStreamReader(((NativeArray)m_DisconnectReasons).GetSubArray(math.abs(offset), 1)); else if (size > 0) reader = new DataStreamReader(((NativeArray)m_DataStream).GetSubArray(offset, size)); return type; } public int MaxHeaderSize(NetworkPipeline pipe) { var headerSize = m_NetworkProtocolInterface.PaddingSize; if (pipe.Id > 0) { // All headers plus one byte for pipeline id headerSize += m_PipelineProcessor.SendHeaderCapacity(pipe) + 1; } return headerSize; } struct PendingSend { public NetworkPipeline Pipeline; public NetworkConnection Connection; public NetworkInterfaceSendHandle SendHandle; public int headerSize; } /// /// Acquires a DataStreamWriter for starting a asynchronous send. /// /// The NetworkConnection id to write through /// A DataStreamWriter to write to /// If you require the payload to be of certain size /// Returns on a successful acquire. Otherwise returns an indicating the error. /// Will throw a if the connection is in a Connecting state. public unsafe int BeginSend(NetworkConnection id, out DataStreamWriter writer, int requiredPayloadSize = 0) { return BeginSend(NetworkPipeline.Null, id, out writer, requiredPayloadSize); } /// /// Acquires a DataStreamWriter for starting a asynchronous send. /// /// The NetworkPipeline to write through /// The NetworkConnection id to write through /// A DataStreamWriter to write to /// If you require the payload to be of certain size /// Returns on a successful acquire. Otherwise returns an indicating the error. /// Will throw a if the connection is in a Connecting state. public unsafe int BeginSend(NetworkPipeline pipe, NetworkConnection id, out DataStreamWriter writer, int requiredPayloadSize = 0) { writer = default; if (id.m_NetworkId < 0 || id.m_NetworkId >= m_ConnectionList.Length) return (int)Error.StatusCode.NetworkIdMismatch; var connection = m_ConnectionList[id.m_NetworkId]; if (connection.Version != id.m_NetworkVersion) return (int)Error.StatusCode.NetworkVersionMismatch; if (connection.State == NetworkConnection.State.Connecting) { #if ENABLE_UNITY_COLLECTIONS_CHECKS UnityEngine.Debug.LogError("Cannot send data while connecting"); #endif return (int)Error.StatusCode.NetworkStateMismatch; } var pipelineHeader = (pipe.Id > 0) ? m_PipelineProcessor.SendHeaderCapacity(pipe) + 1 : 0; var payloadCapacity = requiredPayloadSize > 0 ? requiredPayloadSize + pipelineHeader : 0; // This will set the payloadCapacity value to the actual capacity provided by the protocol var packetAllocationSize = m_NetworkProtocolInterface.ComputePacketAllocationSize.Ptr.Invoke(ref connection, ref payloadCapacity, out var payloadOffset); payloadCapacity -= pipelineHeader; if (packetAllocationSize > m_PipelineProcessor.PayloadCapacity(pipe) || payloadCapacity < requiredPayloadSize) return (int) Error.StatusCode.NetworkPacketOverflow; var result = 0; if ((result = m_NetworkSendInterface.BeginSendMessage.Ptr.Invoke(out var sendHandle, m_NetworkSendInterface.UserData, packetAllocationSize)) != 0) { sendHandle.data = (IntPtr)UnsafeUtility.Malloc(packetAllocationSize, 8, Allocator.Temp); sendHandle.capacity = packetAllocationSize; sendHandle.id = 0; sendHandle.size = 0; sendHandle.flags = SendHandleFlags.AllocatedByDriver; } if (sendHandle.capacity < packetAllocationSize) return (int) Error.StatusCode.NetworkPacketOverflow; var slice = NativeArrayUnsafeUtility.ConvertExistingDataToNativeArray((byte*) sendHandle.data + payloadOffset + pipelineHeader, payloadCapacity, Allocator.Invalid); #if ENABLE_UNITY_COLLECTIONS_CHECKS var safety = AtomicSafetyHandle.GetTempMemoryHandle(); NativeArrayUnsafeUtility.SetAtomicSafetyHandle(ref slice, safety); #endif writer = new DataStreamWriter(slice); writer.m_SendHandleData = (IntPtr)UnsafeUtility.Malloc(UnsafeUtility.SizeOf(), UnsafeUtility.AlignOf(), Allocator.Temp); *(PendingSend*)writer.m_SendHandleData = new PendingSend { Pipeline = pipe, Connection = id, SendHandle = sendHandle, headerSize = payloadOffset, }; #if ENABLE_UNITY_COLLECTIONS_CHECKS m_PendingBeginSend[m_ThreadIndex * JobsUtility.CacheLineSize/4] = m_PendingBeginSend[m_ThreadIndex * JobsUtility.CacheLineSize/4] + 1; #endif return (int)Error.StatusCode.Success; } /// /// Ends a asynchronous send. /// /// If you require the payload to be of certain size. /// The length of the buffer sent if nothing went wrong. /// If endsend is called with a matching BeginSend call. /// If the connection got closed between the call of being and end send. public unsafe int EndSend(DataStreamWriter writer) { #if ENABLE_UNITY_COLLECTIONS_CHECKS // Just here to trigger a safety check on the writer if (writer.Capacity == 0) throw new InvalidOperationException("EndSend without matching BeginSend"); #endif PendingSend* pendingSendPtr = (PendingSend*)writer.m_SendHandleData; if (pendingSendPtr == null || pendingSendPtr->Connection == default) throw new InvalidOperationException("EndSend without matching BeginSend"); if (m_ConnectionList[pendingSendPtr->Connection.m_NetworkId].Version != pendingSendPtr->Connection.m_NetworkVersion) throw new InvalidOperationException("Connection closed between begin and end send"); PendingSend pendingSend = *(PendingSend*)writer.m_SendHandleData; pendingSendPtr->Connection = default; #if ENABLE_UNITY_COLLECTIONS_CHECKS m_PendingBeginSend[m_ThreadIndex * JobsUtility.CacheLineSize/4] = m_PendingBeginSend[m_ThreadIndex * JobsUtility.CacheLineSize/4] - 1; #endif pendingSend.SendHandle.size = pendingSend.headerSize + writer.Length; int retval = 0; if (pendingSend.Pipeline.Id > 0) { pendingSend.SendHandle.size += m_PipelineProcessor.SendHeaderCapacity(pendingSend.Pipeline) + 1; var oldHeaderFlags = m_DefaultHeaderFlags; m_DefaultHeaderFlags = UdpCHeader.HeaderFlags.HasPipeline; retval = m_PipelineProcessor.Send(this, pendingSend.Pipeline, pendingSend.Connection, pendingSend.SendHandle, pendingSend.headerSize); m_DefaultHeaderFlags = oldHeaderFlags; } else // TODO: Is there a better way we could set the hasPipeline value correctly? // this case is when the message is sent from the pipeline directly, "without a pipeline" so the hasPipeline flag is set in m_DefaultHeaderFlags // allowing us to capture it here retval = CompleteSend(pendingSend.Connection, pendingSend.SendHandle, (m_DefaultHeaderFlags & UdpCHeader.HeaderFlags.HasPipeline) != 0); if (retval <= 0) return retval; return writer.Length; } /// /// Aborts a asynchronous send. /// /// If you require the payload to be of certain size. /// The length of the buffer sent if nothing went wrong. /// If endsend is called with a matching BeginSend call. /// If the connection got closed between the call of being and end send. public unsafe void AbortSend(DataStreamWriter writer) { #if ENABLE_UNITY_COLLECTIONS_CHECKS // Just here to trigger a safety check on the writer if (writer.Capacity == 0) throw new InvalidOperationException("EndSend without matching BeginSend"); #endif PendingSend* pendingSendPtr = (PendingSend*)writer.m_SendHandleData; if (pendingSendPtr == null || pendingSendPtr->Connection == default) throw new InvalidOperationException("EndSend without matching BeginSend"); PendingSend pendingSend = *(PendingSend*)writer.m_SendHandleData; pendingSendPtr->Connection = default; #if ENABLE_UNITY_COLLECTIONS_CHECKS m_PendingBeginSend[m_ThreadIndex * JobsUtility.CacheLineSize/4] = m_PendingBeginSend[m_ThreadIndex * JobsUtility.CacheLineSize/4] - 1; #endif AbortSend(pendingSend.SendHandle); } internal unsafe int CompleteSend(NetworkConnection sendConnection, NetworkInterfaceSendHandle sendHandle, bool hasPipeline) { if (0 != (sendHandle.flags & SendHandleFlags.AllocatedByDriver)) { var ret = 0; NetworkInterfaceSendHandle originalHandle = sendHandle; if ((ret = m_NetworkSendInterface.BeginSendMessage.Ptr.Invoke(out sendHandle, m_NetworkSendInterface.UserData, originalHandle.size)) != 0) { return ret; } UnsafeUtility.MemCpy((void*) sendHandle.data, (void*) originalHandle.data, originalHandle.size); sendHandle.size = originalHandle.size; } var connection = m_ConnectionList[sendConnection.m_NetworkId]; var queueHandle = NetworkSendQueueHandle.ToTempHandle(m_ConcurrentParallelSendQueue); return m_NetworkProtocolInterface.ProcessSend.Ptr.Invoke(ref connection, hasPipeline, ref m_NetworkSendInterface, ref sendHandle, ref queueHandle, m_NetworkProtocolInterface.UserData); } internal void AbortSend(NetworkInterfaceSendHandle sendHandle) { if(0 == (sendHandle.flags & SendHandleFlags.AllocatedByDriver)) { m_NetworkSendInterface.AbortSendMessage.Ptr.Invoke(ref sendHandle, m_NetworkSendInterface.UserData); } } public NetworkConnection.State GetConnectionState(NetworkConnection id) { if (id.m_NetworkId < 0 || id.m_NetworkId >= m_ConnectionList.Length) return NetworkConnection.State.Disconnected; var connection = m_ConnectionList[id.m_NetworkId]; if (connection.Version != id.m_NetworkVersion) return NetworkConnection.State.Disconnected; return connection.State; } internal NetworkSendInterface m_NetworkSendInterface; internal NetworkProtocol m_NetworkProtocolInterface; internal NetworkEventQueue.Concurrent m_EventQueue; internal NativeArray m_DisconnectReasons; [ReadOnly] internal NativeList m_ConnectionList; [ReadOnly] internal NativeList m_DataStream; internal NetworkPipelineProcessor.Concurrent m_PipelineProcessor; internal UdpCHeader.HeaderFlags m_DefaultHeaderFlags; internal NativeQueue.ParallelWriter m_ConcurrentParallelSendQueue; #if ENABLE_UNITY_COLLECTIONS_CHECKS [NativeSetThreadIndex] internal int m_ThreadIndex; [NativeDisableParallelForRestriction] internal NativeArray m_PendingBeginSend; #endif } internal struct Connection { public NetworkInterfaceEndPoint Address; public long LastAttempt; public int Id; public int Version; public int Attempts; public NetworkConnection.State State; public ushort ReceiveToken; public ushort SendToken; public byte DidReceiveData; public static bool operator ==(Connection lhs, Connection rhs) { return lhs.Id == rhs.Id && lhs.Version == rhs.Version && lhs.Address == rhs.Address; } public static bool operator !=(Connection lhs, Connection rhs) { return lhs.Id != rhs.Id || lhs.Version != rhs.Version || lhs.Address != rhs.Address; } public override bool Equals(object compare) { return this == (Connection) compare; } public static Connection Null => new Connection() {Id = 0, Version = 0}; public override int GetHashCode() { return Id; } public bool Equals(Connection connection) { return connection.Id == Id && connection.Version == Version && connection.Address == Address; } } // internal variables ::::::::::::::::::::::::::::::::::::::::::::::::: static List s_NetworkInterfaces = new List(); static List s_NetworkProtocols = new List(); int m_NetworkInterfaceIndex; NetworkSendInterface m_NetworkSendInterface; int m_NetworkProtocolIndex; NetworkProtocol m_NetworkProtocolInterface; NativeQueue m_ParallelSendQueue; #if ENABLE_UNITY_COLLECTIONS_CHECKS NativeArray m_PendingBeginSend; #endif NetworkEventQueue m_EventQueue; private NativeArray m_DisconnectReasons; NativeQueue m_FreeList; NativeQueue m_NetworkAcceptQueue; NativeList m_ConnectionList; NativeArray m_InternalState; NativeQueue m_PendingFree; NativeArray m_SessionIdCounter; NativeArray m_ErrorCodes; enum ErrorCodeType { ReceiveError = 0, SendError = 1, NumErrorCodes } #pragma warning disable 649 struct Parameters { public NetworkDataStreamParameter dataStream; public NetworkConfigParameter config; public Parameters(params INetworkParameter[] param) { config = new NetworkConfigParameter { maxConnectAttempts = NetworkParameterConstants.MaxConnectAttempts, connectTimeoutMS = NetworkParameterConstants.ConnectTimeoutMS, disconnectTimeoutMS = NetworkParameterConstants.DisconnectTimeoutMS, maxFrameTimeMS = 0 }; dataStream = default(NetworkDataStreamParameter); for (int i = 0; i < param.Length; ++i) { if (param[i] is NetworkConfigParameter) config = (NetworkConfigParameter)param[i]; else if (param[i] is NetworkDataStreamParameter) dataStream = (NetworkDataStreamParameter)param[i]; } } [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")] public static void ValidateParameters(Parameters param) { if (param.dataStream.size < 0) throw new ArgumentException($"Value for NetworkDataStreamParameter.size must be larger then zero."); } } #pragma warning restore 649 private Parameters m_NetworkParams; private NativeList m_DataStream; private NativeArray m_DataStreamSize; private NetworkPipelineProcessor m_PipelineProcessor; private UdpCHeader.HeaderFlags m_DefaultHeaderFlags; private long m_updateTime; private long m_updateTimeAdjustment; // properties ::::::::::::::::::::::::::::::::::::::::::::::::::::::::: private const int InternalStateListening = 0; private const int InternalStateBound = 1; public bool Listening { get { return (m_InternalState[InternalStateListening] != 0); } internal set { m_InternalState[InternalStateListening] = value ? 1 : 0; } } // 0 = Unbound // 1 = Binding // 2 = Bound public bool Bound => m_InternalState[InternalStateBound] == 2; /// /// Helper function for creating a NetworkDriver. /// /// /// An optional array of INetworkParameter. There are currently only two , /// the and the . /// /// public static NetworkDriver Create(params INetworkParameter[] param) { #if UNITY_WEBGL return new NetworkDriver(new IPCNetworkInterface(), param); #else return new NetworkDriver(new BaselibNetworkInterface(), param); #endif } private static int InsertInAvailableIndex(List list, T element) { var index = -1; for (int i = 0; i < list.Count; ++i) { if (list[i] == null) { index = i; list[i] = element; break; } } if (index < 0) { index = list.Count; list.Add(element); } return index; } private static INetworkProtocol GetProtocolForParameters(INetworkParameter[] parameters) { foreach (var parameter in parameters) { if (parameter is Unity.Networking.Transport.Relay.RelayNetworkParameter) return new Unity.Networking.Transport.Relay.RelayNetworkProtocol(); } return new UnityTransportProtocol(); } public NetworkDriver(INetworkInterface netIf, params INetworkParameter[] param) : this(netIf, GetProtocolForParameters(param), param) { } /// /// Constructor for NetworkDriver. /// /// /// An array of INetworkParameter. There are currently only two , /// the and the . /// /// Thrown if the value for NetworkDataStreamParameter.size is smaller then zero. internal NetworkDriver(INetworkInterface netIf, INetworkProtocol netProtocol, params INetworkParameter[] param) { m_NetworkParams = new Parameters(param); Parameters.ValidateParameters(m_NetworkParams); NetworkPipelineParams.ValidateParameters(param); netProtocol.Initialize(param); m_NetworkProtocolIndex = InsertInAvailableIndex(s_NetworkProtocols, netProtocol); m_NetworkProtocolInterface = netProtocol.CreateProtocolInterface(); m_NetworkInterfaceIndex = InsertInAvailableIndex(s_NetworkInterfaces, netIf); var result = netIf.Initialize(param); if (0 != result) { throw new InvalidOperationException($"Failed to initialize the NetworkInterface. Error Code: {result}."); } m_NetworkSendInterface = netIf.CreateSendInterface(); m_PipelineProcessor = new NetworkPipelineProcessor(param); m_ParallelSendQueue = new NativeQueue(Allocator.Persistent); #if ENABLE_UNITY_COLLECTIONS_CHECKS m_PendingBeginSend = new NativeArray(JobsUtility.MaxJobThreadCount * JobsUtility.CacheLineSize/4, Allocator.Persistent); #endif var time = Stopwatch.GetTimestamp() / (Stopwatch.Frequency/1000); m_updateTime = m_NetworkParams.config.fixedFrameTimeMS > 0 ? 1 : time; m_updateTimeAdjustment = 0; int initialStreamSize = m_NetworkParams.dataStream.size; if (initialStreamSize == 0) initialStreamSize = NetworkParameterConstants.DriverDataStreamSize; m_DataStream = new NativeList(initialStreamSize, Allocator.Persistent); m_DataStream.ResizeUninitialized(initialStreamSize); m_DataStreamSize = new NativeArray(1, Allocator.Persistent); m_DefaultHeaderFlags = 0; m_NetworkAcceptQueue = new NativeQueue(Allocator.Persistent); m_ConnectionList = new NativeList(1, Allocator.Persistent); m_FreeList = new NativeQueue(Allocator.Persistent); m_EventQueue = new NetworkEventQueue(NetworkParameterConstants.InitialEventQueueSize); const int reasons = (int)DisconnectReason.Count; m_DisconnectReasons = new NativeArray(reasons, Allocator.Persistent); for (var idx = 0; idx < reasons; ++idx) m_DisconnectReasons[idx] = (byte)idx; m_InternalState = new NativeArray(2, Allocator.Persistent); m_PendingFree = new NativeQueue(Allocator.Persistent); m_ReceiveCount = new NativeArray(1, Allocator.Persistent); m_SessionIdCounter = new NativeArray(1, Allocator.Persistent) {[0] = RandomHelpers.GetRandomUShort()}; m_ErrorCodes = new NativeArray((int)ErrorCodeType.NumErrorCodes, Allocator.Persistent); ReceiveCount = 0; Listening = false; } // interface implementation ::::::::::::::::::::::::::::::::::::::::::: public void Dispose() { s_NetworkProtocols[m_NetworkProtocolIndex].Dispose(); s_NetworkProtocols[m_NetworkProtocolIndex] = null; s_NetworkInterfaces[m_NetworkInterfaceIndex].Dispose(); s_NetworkInterfaces[m_NetworkInterfaceIndex] = null; m_NetworkProtocolIndex = -1; m_NetworkInterfaceIndex = -1; m_DataStream.Dispose(); m_DataStreamSize.Dispose(); m_PipelineProcessor.Dispose(); m_EventQueue.Dispose(); m_DisconnectReasons.Dispose(); m_NetworkAcceptQueue.Dispose(); m_ConnectionList.Dispose(); m_FreeList.Dispose(); m_InternalState.Dispose(); m_PendingFree.Dispose(); m_ReceiveCount.Dispose(); m_SessionIdCounter.Dispose(); m_ErrorCodes.Dispose(); m_ParallelSendQueue.Dispose(); #if ENABLE_UNITY_COLLECTIONS_CHECKS m_PendingBeginSend.Dispose(); #endif } public bool IsCreated => m_InternalState.IsCreated; [BurstCompile] struct UpdateJob : IJob { public NetworkDriver driver; public void Execute() { driver.InternalUpdate(); } } struct ClearEventQueue : IJob { public NativeList dataStream; public NativeArray dataStreamSize; public NetworkEventQueue eventQueue; #if ENABLE_UNITY_COLLECTIONS_CHECKS public NativeArray pendingSend; [ReadOnly] public NativeList connectionList; [ReadOnly] public NativeArray internalState; #endif public void Execute() { #if ENABLE_UNITY_COLLECTIONS_CHECKS for (int i = 0; i < connectionList.Length; ++i) { int conCount = eventQueue.GetCountForConnection(i); if (conCount != 0 && connectionList[i].State != NetworkConnection.State.Disconnected) { UnityEngine.Debug.LogError($"Resetting event queue with pending events (Count={conCount}, ConnectionID={i}) Listening: {internalState[InternalStateListening]}"); } } bool didPrint = false; for (int i = 0; i < JobsUtility.MaxJobThreadCount; ++i) { if (pendingSend[i * JobsUtility.CacheLineSize / 4] > 0) { pendingSend[i * JobsUtility.CacheLineSize / 4] = 0; if (!didPrint) { UnityEngine.Debug.LogError( "Missing EndSend, calling BeginSend without calling EndSend will result in a memory leak"); didPrint = true; } } } #endif eventQueue.Clear(); dataStream.ResizeUninitialized(dataStream.Length); dataStreamSize[0] = 0; } } public long LastUpdateTime => m_updateTime; public JobHandle ScheduleUpdate(JobHandle dep = default(JobHandle)) { long timeNow = m_NetworkParams.config.fixedFrameTimeMS > 0 ? m_updateTime + m_NetworkParams.config.fixedFrameTimeMS : Stopwatch.GetTimestamp() / (Stopwatch.Frequency/1000) - m_updateTimeAdjustment; if (m_NetworkParams.config.maxFrameTimeMS > 0 && (timeNow - m_updateTime) > m_NetworkParams.config.maxFrameTimeMS) { m_updateTimeAdjustment += (timeNow - m_updateTime) - m_NetworkParams.config.maxFrameTimeMS; timeNow = m_updateTime + m_NetworkParams.config.maxFrameTimeMS; } m_updateTime = timeNow; var job = new UpdateJob {driver = this}; JobHandle handle; var clearJob = new ClearEventQueue { dataStream = m_DataStream, dataStreamSize = m_DataStreamSize, eventQueue = m_EventQueue, #if ENABLE_UNITY_COLLECTIONS_CHECKS pendingSend = m_PendingBeginSend, connectionList = m_ConnectionList, internalState = m_InternalState #endif }; handle = clearJob.Schedule(dep); handle = job.Schedule(handle); handle = s_NetworkInterfaces[m_NetworkInterfaceIndex].ScheduleReceive(new NetworkPacketReceiver{m_Driver = this}, handle); handle = s_NetworkInterfaces[m_NetworkInterfaceIndex].ScheduleSend(m_ParallelSendQueue, handle); return handle; } public JobHandle ScheduleFlushSend(JobHandle dep) { return s_NetworkInterfaces[m_NetworkInterfaceIndex].ScheduleSend(m_ParallelSendQueue, dep); } void InternalUpdate() { m_PipelineProcessor.Timestamp = m_updateTime; while (m_PendingFree.TryDequeue(out var free)) { int ver = m_ConnectionList[free].Version + 1; if (ver == 0) ver = 1; m_ConnectionList[free] = new Connection {Id = free, Version = ver}; m_FreeList.Enqueue(free); } CheckTimeouts(); if (m_NetworkProtocolInterface.NeedsUpdate) { var queueHandle = NetworkSendQueueHandle.ToTempHandle(m_ParallelSendQueue.AsParallelWriter()); m_NetworkProtocolInterface.Update.Ptr.Invoke(m_updateTime, ref m_NetworkSendInterface, ref queueHandle, m_NetworkProtocolInterface.UserData); } m_PipelineProcessor.UpdateReceive(this, out var updateCount); // TODO: Find a good way to establish a good limit (connections*pipelines/2?) if (updateCount > (m_ConnectionList.Length - m_FreeList.Count) * 64) { UnityEngine.Debug.LogWarning( FixedString.Format("A lot of pipeline updates have been queued, possibly too many being scheduled in pipeline logic, queue count: {0}", updateCount)); } m_DefaultHeaderFlags = UdpCHeader.HeaderFlags.HasPipeline; m_PipelineProcessor.UpdateSend(ToConcurrentSendOnly(), out updateCount); if (updateCount > (m_ConnectionList.Length - m_FreeList.Count) * 64) { UnityEngine.Debug.LogWarning( FixedString.Format("A lot of pipeline updates have been queued, possibly too many being scheduled in pipeline logic, queue count: {0}", updateCount)); } m_DefaultHeaderFlags = 0; } /// /// Create a new pipeline. /// /// /// An array of stages the pipeline should contain. /// /// If the driver is not created properly /// A connection has already been established public NetworkPipeline CreatePipeline(params Type[] stages) { #if ENABLE_UNITY_COLLECTIONS_CHECKS if (!m_InternalState.IsCreated) throw new InvalidOperationException( "Driver must be constructed with a populated or empty INetworkParameter params list"); if (m_ConnectionList.Length > 0) throw new InvalidOperationException( "Pipelines cannot be created after establishing connections"); #endif return m_PipelineProcessor.CreatePipeline(stages); } /// /// Bind the driver to a endpoint. /// /// The endpoint to bind to. /// Returns 0 on success. And a negative value if a error occured. /// If the driver is not created properly /// If bind is called more then once on the driver /// If bind is called after a connection has already been established public int Bind(NetworkEndPoint endpoint) { var ifEndPoint = new NetworkInterfaceEndPoint(); if (s_NetworkInterfaces[m_NetworkInterfaceIndex].CreateInterfaceEndPoint(endpoint, out ifEndPoint) != 0) { return -1; } #if ENABLE_UNITY_COLLECTIONS_CHECKS if (!m_InternalState.IsCreated) throw new InvalidOperationException( "Driver must be constructed with a populated or empty INetworkParameter params list"); // question: should this really be an error? if (m_InternalState[InternalStateBound] != 0) throw new InvalidOperationException( "Bind can only be called once per NetworkDriver"); if (m_ConnectionList.Length > 0) throw new InvalidOperationException( "Bind cannot be called after establishing connections"); #endif var protocolBind = s_NetworkProtocols[m_NetworkProtocolIndex].Bind(s_NetworkInterfaces[m_NetworkInterfaceIndex], ref ifEndPoint); if (protocolBind < 0) return protocolBind; m_InternalState[InternalStateBound] = protocolBind; return 0; } /// /// Set the driver to Listen for incomming connections /// /// Returns 0 on success. /// If the driver is not created properly /// If listen is called more then once on the driver /// If bind has not been called before calling Listen. public int Listen() { #if ENABLE_UNITY_COLLECTIONS_CHECKS if (!m_InternalState.IsCreated) throw new InvalidOperationException( "Driver must be constructed with a populated or empty INetworkParameter params list"); if (Listening) throw new InvalidOperationException( "Listen can only be called once per NetworkDriver"); if (!Bound) throw new InvalidOperationException( "Listen can only be called after a successful call to Bind"); #endif if (!Bound) return -1; var ret = s_NetworkInterfaces[m_NetworkInterfaceIndex].Listen(); if (ret == 0) Listening = true; return ret; } /// /// Checks to see if there are any new connections to Accept. /// /// If accept fails it returnes a default NetworkConnection. public NetworkConnection Accept() { if (!Listening) return default(NetworkConnection); if (!m_NetworkAcceptQueue.TryDequeue(out var id)) return default(NetworkConnection); return new NetworkConnection {m_NetworkId = id, m_NetworkVersion = m_ConnectionList[id].Version}; } /// /// Connects the driver to a endpoint /// /// If connect fails it returns a default NetworkConnection. /// If the driver is not created properly public NetworkConnection Connect(NetworkEndPoint endpoint) { var address = new NetworkInterfaceEndPoint(); if (s_NetworkProtocols[m_NetworkProtocolIndex].Connect(s_NetworkInterfaces[m_NetworkInterfaceIndex], endpoint, out address) != 0) { return default; } #if ENABLE_UNITY_COLLECTIONS_CHECKS if (!m_InternalState.IsCreated) throw new InvalidOperationException( "Driver must be constructed with a populated or empty INetworkParameter params list"); #endif if (!m_FreeList.TryDequeue(out var id)) { id = m_ConnectionList.Length; m_ConnectionList.Add(new Connection{Id = id, Version = 1}); } int ver = m_ConnectionList[id].Version; var c = new Connection { Id = id, Version = ver, State = NetworkConnection.State.Connecting, Address = address, Attempts = 1, LastAttempt = m_updateTime, SendToken = 0, ReceiveToken = m_SessionIdCounter[0] }; m_SessionIdCounter[0] = (ushort)(m_SessionIdCounter[0] + 1); m_ConnectionList[id] = c; var netcon = new NetworkConnection {m_NetworkId = id, m_NetworkVersion = ver}; SendConnectionRequest(c); m_PipelineProcessor.initializeConnection(netcon); return netcon; } void SendConnectionRequest(Connection c) { var queueHandle = NetworkSendQueueHandle.ToTempHandle(m_ParallelSendQueue.AsParallelWriter()); m_NetworkProtocolInterface.ProcessSendConnectionRequest.Ptr.Invoke(ref c, ref m_NetworkSendInterface, ref queueHandle, m_NetworkProtocolInterface.UserData); } /// /// Disconnects a NetworkConnection /// /// The NetworkConnection we want to Disconnect. /// Return 0 on success. public int Disconnect(NetworkConnection id) { Connection connection; if ((connection = GetConnection(id)) == Connection.Null) return 0; if (connection.State == NetworkConnection.State.Connected) { var queueHandle = NetworkSendQueueHandle.ToTempHandle(m_ParallelSendQueue.AsParallelWriter()); m_NetworkProtocolInterface.ProcessSendDisconnect.Ptr.Invoke(ref connection, ref m_NetworkSendInterface, ref queueHandle, m_NetworkProtocolInterface.UserData); } RemoveConnection(connection); return 0; } /// /// Returns the PipelineBuffers for a specific pipeline and stage. /// /// /// /// /// /// /// /// If the the connection is invalid. public void GetPipelineBuffers(NetworkPipeline pipeline, NetworkPipelineStageId stageId, NetworkConnection connection, out NativeArray readProcessingBuffer, out NativeArray writeProcessingBuffer, out NativeArray sharedBuffer) { if (connection.m_NetworkId < 0 || connection.m_NetworkId >= m_ConnectionList.Length || m_ConnectionList[connection.m_NetworkId].Version != connection.m_NetworkVersion) throw new InvalidOperationException("Invalid connection"); m_PipelineProcessor.GetPipelineBuffers(pipeline, stageId, connection, out readProcessingBuffer, out writeProcessingBuffer, out sharedBuffer); } public NetworkConnection.State GetConnectionState(NetworkConnection con) { Connection connection; if ((connection = GetConnection(con)) == Connection.Null) return NetworkConnection.State.Disconnected; return connection.State; } public NetworkEndPoint RemoteEndPoint(NetworkConnection id) { if (id == default(NetworkConnection)) return default(NetworkEndPoint); Connection connection; if ((connection = GetConnection(id)) == Connection.Null) return default(NetworkEndPoint); return s_NetworkProtocols[m_NetworkProtocolIndex].GetRemoteEndPoint(s_NetworkInterfaces[m_NetworkInterfaceIndex], connection.Address); } public NetworkEndPoint LocalEndPoint() { var ep = s_NetworkInterfaces[m_NetworkInterfaceIndex].LocalEndPoint; return s_NetworkInterfaces[m_NetworkInterfaceIndex].GetGenericEndPoint(ep); } public int MaxHeaderSize(NetworkPipeline pipe) { return ToConcurrentSendOnly().MaxHeaderSize(pipe); } public int BeginSend(NetworkPipeline pipe, NetworkConnection id, out DataStreamWriter writer, int requiredPayloadSize = 0) { return ToConcurrentSendOnly().BeginSend(pipe, id, out writer, requiredPayloadSize); } public int BeginSend(NetworkConnection id, out DataStreamWriter writer, int requiredPayloadSize = 0) { return ToConcurrentSendOnly().BeginSend(NetworkPipeline.Null, id, out writer, requiredPayloadSize); } public int EndSend(DataStreamWriter writer) { return ToConcurrentSendOnly().EndSend(writer); } public void AbortSend(DataStreamWriter writer) { ToConcurrentSendOnly().AbortSend(writer); } /// /// Pops an event /// /// /// /// Returns the type of event received, if the value is a event /// then the DataStreamReader will contain the disconnect reason. public NetworkEvent.Type PopEvent(out NetworkConnection con, out DataStreamReader reader) { return PopEvent(out con, out reader, out var _); } public NetworkEvent.Type PopEvent(out NetworkConnection con, out DataStreamReader reader, out NetworkPipeline pipeline) { reader = default(DataStreamReader); var type = m_EventQueue.PopEvent(out var id, out var offset, out var size, out var pipelineId); pipeline = new NetworkPipeline { Id = pipelineId }; if (type == NetworkEvent.Type.Disconnect && offset < 0) reader = new DataStreamReader(((NativeArray)m_DisconnectReasons).GetSubArray(math.abs(offset), 1)); else if (size > 0) reader = new DataStreamReader(((NativeArray)m_DataStream).GetSubArray(offset, size)); con = id < 0 ? default(NetworkConnection) : new NetworkConnection {m_NetworkId = id, m_NetworkVersion = m_ConnectionList[id].Version}; return type; } /// /// Pops an event for a specific connection /// /// /// /// Returns the type of event received, if the value is a event /// then the DataStreamReader will contain the disconnect reason. public NetworkEvent.Type PopEventForConnection(NetworkConnection connectionId, out DataStreamReader reader) { return PopEventForConnection(connectionId, out reader, out var _); } public NetworkEvent.Type PopEventForConnection(NetworkConnection connectionId, out DataStreamReader reader, out NetworkPipeline pipeline) { reader = default(DataStreamReader); pipeline = default(NetworkPipeline); if (connectionId.m_NetworkId < 0 || connectionId.m_NetworkId >= m_ConnectionList.Length || m_ConnectionList[connectionId.m_NetworkId].Version != connectionId.m_NetworkVersion) return (int) NetworkEvent.Type.Empty; var type = m_EventQueue.PopEventForConnection(connectionId.m_NetworkId, out var offset, out var size, out var pipelineId); pipeline = new NetworkPipeline { Id = pipelineId }; if (type == NetworkEvent.Type.Disconnect && offset < 0) reader = new DataStreamReader(((NativeArray)m_DisconnectReasons).GetSubArray(math.abs(offset), 1)); else if (size > 0) reader = new DataStreamReader(((NativeArray)m_DataStream).GetSubArray(offset, size)); return type; } /// /// Returns the size of the eventqueue for a specific connection /// /// /// If the connection is valid it returns the size of the event queue otherwise it returns 0. public int GetEventQueueSizeForConnection(NetworkConnection connectionId) { if (connectionId.m_NetworkId < 0 || connectionId.m_NetworkId >= m_ConnectionList.Length || m_ConnectionList[connectionId.m_NetworkId].Version != connectionId.m_NetworkVersion) return 0; return m_EventQueue.GetCountForConnection(connectionId.m_NetworkId); } // internal helper functions :::::::::::::::::::::::::::::::::::::::::: void AddConnection(int id) { m_EventQueue.PushEvent(new NetworkEvent {connectionId = id, type = NetworkEvent.Type.Connect}); } void AddDisconnection(int id, Error.DisconnectReason reason = DisconnectReason.Default) { m_EventQueue.PushEvent(new NetworkEvent { connectionId = id, type = NetworkEvent.Type.Disconnect, status = (int)reason }); } Connection GetConnection(NetworkConnection id) { if (id.m_NetworkId < 0 || id.m_NetworkId >= m_ConnectionList.Length) return Connection.Null; var con = m_ConnectionList[id.m_NetworkId]; if (con.Version != id.m_NetworkVersion) return Connection.Null; return con; } Connection GetConnection(NetworkInterfaceEndPoint address, ushort sessionId) { for (int i = 0; i < m_ConnectionList.Length; i++) { if (address == m_ConnectionList[i].Address && m_ConnectionList[i].ReceiveToken == sessionId ) return m_ConnectionList[i]; } return Connection.Null; } Connection GetNewConnection(NetworkInterfaceEndPoint address, ushort sessionId) { for (int i = 0; i < m_ConnectionList.Length; i++) { if (address == m_ConnectionList[i].Address && m_ConnectionList[i].SendToken == sessionId ) return m_ConnectionList[i]; } return Connection.Null; } void SetConnection(Connection connection) { m_ConnectionList[connection.Id] = connection; } bool RemoveConnection(Connection connection) { if (connection.State != NetworkConnection.State.Disconnected && connection == m_ConnectionList[connection.Id]) { connection.State = NetworkConnection.State.Disconnected; m_ConnectionList[connection.Id] = connection; m_PendingFree.Enqueue(connection.Id); return true; } return false; } bool UpdateConnection(Connection connection) { if (connection == m_ConnectionList[connection.Id]) { SetConnection(connection); return true; } return false; } void CheckTimeouts() { for (int i = 0; i < m_ConnectionList.Length; ++i) { var connection = m_ConnectionList[i]; if (connection == Connection.Null) continue; long now = m_updateTime; var netcon = new NetworkConnection {m_NetworkId = connection.Id, m_NetworkVersion = connection.Version}; if ((connection.State == NetworkConnection.State.Connecting || connection.State == NetworkConnection.State.AwaitingResponse) && now - connection.LastAttempt > m_NetworkParams.config.connectTimeoutMS) { if (connection.Attempts >= m_NetworkParams.config.maxConnectAttempts) { RemoveConnection(connection); AddDisconnection(connection.Id, DisconnectReason.MaxConnectionAttempts); continue; } connection.Attempts = ++connection.Attempts; connection.LastAttempt = now; SetConnection(connection); if (connection.State == NetworkConnection.State.Connecting) SendConnectionRequest(connection); else { var queueHandle = NetworkSendQueueHandle.ToTempHandle(m_ParallelSendQueue.AsParallelWriter()); m_NetworkProtocolInterface.ProcessSendConnectionAccept.Ptr.Invoke(ref connection, ref m_NetworkSendInterface, ref queueHandle, m_NetworkProtocolInterface.UserData); } } if (connection.State == NetworkConnection.State.Connected && now - connection.LastAttempt > m_NetworkParams.config.disconnectTimeoutMS) { Disconnect(netcon); AddDisconnection(connection.Id, DisconnectReason.Timeout); } } } public int ReceiveErrorCode { get { return m_ErrorCodes[(int)ErrorCodeType.ReceiveError]; } internal set { if (value != 0) { UnityEngine.Debug.LogError(FixedString.Format("Error on receive, errorCode = {0}", value)); } m_ErrorCodes[(int)ErrorCodeType.ReceiveError] = value; } } // Interface for receiving packages from a NetworkInterface internal NativeList GetDataStream() { return m_DataStream; } internal int GetDataStreamSize() { return m_DataStreamSize[0]; } private NativeArray m_ReceiveCount; internal int ReceiveCount { get { return m_ReceiveCount[0]; } set { m_ReceiveCount[0] = value; } } internal bool DynamicDataStreamSize() { return m_NetworkParams.dataStream.size == 0; } internal bool IsAddressUsed(NetworkInterfaceEndPoint address) { for (int i = 0; i < m_ConnectionList.Length; i++) { if (address == m_ConnectionList[i].Address) return true; } return false; } internal unsafe int AppendPacket(NetworkInterfaceEndPoint endpoint, int dataLen) { var count = 0; var command = default(ProcessPacketCommand); byte* dataStream = (byte*)m_DataStream.GetUnsafePtr() + m_DataStreamSize[0]; var queueHandle = NetworkSendQueueHandle.ToTempHandle(m_ParallelSendQueue.AsParallelWriter()); m_NetworkProtocolInterface.ProcessReceive.Ptr.Invoke((IntPtr)dataStream, ref endpoint, dataLen, ref m_NetworkSendInterface, ref queueHandle, m_NetworkProtocolInterface.UserData, ref command); switch (command.Type) { case ProcessPacketCommandType.AddressUpdate: { for (int i = 0; i < m_ConnectionList.Length; i++) { if (command.AsAddressUpdate.Address == m_ConnectionList[i].Address && command.AsAddressUpdate.SessionToken == m_ConnectionList[i].ReceiveToken) m_ConnectionList.ElementAt(i).Address = command.AsAddressUpdate.NewAddress; } } break; case ProcessPacketCommandType.BindAccept: { m_InternalState[InternalStateBound] = 2; } break; case ProcessPacketCommandType.ConnectionAccept: { Connection c = GetConnection(command.AsConnectionAccept.Address, command.AsConnectionAccept.SessionId); if (c != Connection.Null) { c.DidReceiveData = 1; if (c.State == NetworkConnection.State.Connected) { //DebugLog("Dropping connect request for an already connected endpoint [" + address + "]"); return 0; } if (c.State == NetworkConnection.State.Connecting) { c.SendToken = command.AsConnectionAccept.ConnectionToken; c.State = NetworkConnection.State.Connected; UpdateConnection(c); AddConnection(c.Id); count++; } } } break; case ProcessPacketCommandType.ConnectionReject: break; case ProcessPacketCommandType.ConnectionRequest: { if (!Listening) return 0; Connection c = GetNewConnection(command.AsConnectionRequest.Address, command.AsConnectionRequest.SessionId); if (c == Connection.Null || c.State == NetworkConnection.State.Disconnected) { var sessionId = m_SessionIdCounter[0]; m_SessionIdCounter[0] = (ushort) (m_SessionIdCounter[0] + 1); if (!m_FreeList.TryDequeue(out var id)) { id = m_ConnectionList.Length; m_ConnectionList.Add(new Connection{Id = id, Version = 1}); } int ver = m_ConnectionList[id].Version; c = new Connection { Id = id, Version = ver, ReceiveToken = sessionId, SendToken = command.AsConnectionRequest.SessionId, State = NetworkConnection.State.Connected, Address = command.AsConnectionRequest.Address, Attempts = 1, LastAttempt = m_updateTime }; SetConnection(c); m_PipelineProcessor.initializeConnection(new NetworkConnection{m_NetworkId = id, m_NetworkVersion = c.Version}); m_NetworkAcceptQueue.Enqueue(id); count++; } else { c.Attempts++; c.LastAttempt = m_updateTime; SetConnection(c); } m_NetworkProtocolInterface.ProcessSendConnectionAccept.Ptr.Invoke(ref c, ref m_NetworkSendInterface, ref queueHandle, m_NetworkProtocolInterface.UserData); } break; case ProcessPacketCommandType.Disconnect: { Connection c = GetConnection(command.AsDisconnect.Address, command.AsDisconnect.SessionId); if (c != Connection.Null) { if (RemoveConnection(c)) AddDisconnection(c.Id, DisconnectReason.ClosedByRemote); count++; } } break; case ProcessPacketCommandType.DataWithImplicitConnectionAccept: { Connection c = GetConnection(command.AsDataWithImplicitConnectionAccept.Address, command.AsDataWithImplicitConnectionAccept.SessionId); if (c == Connection.Null) return 0; c.DidReceiveData = 1; c.LastAttempt = m_updateTime; UpdateConnection(c); if (c.State == NetworkConnection.State.Connecting) { c.SendToken = command.AsDataWithImplicitConnectionAccept.ConnectionToken; c.State = NetworkConnection.State.Connected; UpdateConnection(c); UnityEngine.Assertions.Assert.IsTrue(!Listening); AddConnection(c.Id); count++; } if (c.State == NetworkConnection.State.Connected) { var sliceOffset = m_DataStreamSize[0] + command.AsDataWithImplicitConnectionAccept.Offset; m_DataStreamSize[0] = sliceOffset + command.AsDataWithImplicitConnectionAccept.Length; if (command.AsDataWithImplicitConnectionAccept.HasPipeline) { var netCon = new NetworkConnection {m_NetworkId = c.Id, m_NetworkVersion = c.Version}; m_PipelineProcessor.Receive(this, netCon, ((NativeArray)m_DataStream).GetSubArray(sliceOffset, command.AsDataWithImplicitConnectionAccept.Length)); // TODO: is this return 0 intended? shouldn't this be return count + 1 return 0; } m_EventQueue.PushEvent(new NetworkEvent { connectionId = c.Id, type = NetworkEvent.Type.Data, offset = sliceOffset, size = command.AsDataWithImplicitConnectionAccept.Length }); count++; } } break; case ProcessPacketCommandType.Data: { Connection c = GetConnection(command.AsData.Address, command.AsData.SessionId); if (c == Connection.Null) return 0; c.DidReceiveData = 1; c.LastAttempt = m_updateTime; UpdateConnection(c); if (c.State == NetworkConnection.State.Connected) { var sliceOffset = m_DataStreamSize[0] + command.AsData.Offset; m_DataStreamSize[0] = sliceOffset + command.AsData.Length; if (command.AsData.HasPipeline) { var netCon = new NetworkConnection {m_NetworkId = c.Id, m_NetworkVersion = c.Version}; m_PipelineProcessor.Receive(this, netCon, ((NativeArray)m_DataStream).GetSubArray(sliceOffset, command.AsData.Length)); // TODO: is this return 0 intended? shouldn't this be return count + 1 return 0; } m_EventQueue.PushEvent(new NetworkEvent { connectionId = c.Id, type = NetworkEvent.Type.Data, offset = sliceOffset, size = command.AsData.Length }); count++; } } break; case ProcessPacketCommandType.Drop: break; } return count; } // Interface for receiving data from a pipeline internal unsafe void PushDataEvent(NetworkConnection con, int pipelineId, byte* dataPtr, int dataLength) { byte* streamBasePtr = (byte*)m_DataStream.GetUnsafePtr(); int sliceOffset = 0; if (dataPtr >= streamBasePtr && dataPtr + dataLength <= streamBasePtr + m_DataStreamSize[0]) { // Pointer is a subset of our receive buffer, no need to copy sliceOffset = (int)(dataPtr - streamBasePtr); } else { if (DynamicDataStreamSize()) { while (m_DataStreamSize[0] + dataLength >= m_DataStream.Length) m_DataStream.ResizeUninitialized(m_DataStream.Length * 2); } else if (m_DataStreamSize[0] + dataLength >= m_DataStream.Length) return; // FIXME: how do we signal this error? sliceOffset = m_DataStreamSize[0]; streamBasePtr = (byte*)m_DataStream.GetUnsafePtr(); UnsafeUtility.MemCpy(streamBasePtr + sliceOffset, dataPtr, dataLength); m_DataStreamSize[0] = sliceOffset + dataLength; } m_EventQueue.PushEvent(new NetworkEvent { pipelineId = (short)pipelineId, connectionId = con.m_NetworkId, type = NetworkEvent.Type.Data, offset = sliceOffset, size = dataLength }); } } }