using System;
using System.Runtime.InteropServices;
using System.Threading;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
namespace Unity.Networking.Transport
{
[StructLayout(LayoutKind.Explicit)]
public struct NetworkEvent
{
///
/// NetworkEvent.Type enumerates available network events for this driver.
///
public enum Type : short
{
Empty = 0,
Data,
Connect,
Disconnect
}
[FieldOffset(0)] public Type type;
[FieldOffset(2)] public short pipelineId;
[FieldOffset(4)] public int connectionId;
[FieldOffset(8)] public int status;
[FieldOffset(8)] public int offset;
[FieldOffset(12)] public int size;
}
public struct NetworkEventQueue : IDisposable
{
private int MaxEvents {
get { return m_ConnectionEventQ.Length / (m_ConnectionEventHeadTail.Length/2); }
}
public NetworkEventQueue(int queueSizePerConnection)
{
m_MasterEventQ = new NativeQueue(Allocator.Persistent);
m_ConnectionEventQ = new NativeList(queueSizePerConnection, Allocator.Persistent);
m_ConnectionEventHeadTail = new NativeList(2, Allocator.Persistent);
m_ConnectionEventQ.ResizeUninitialized(queueSizePerConnection);
m_ConnectionEventHeadTail.Add(0);
m_ConnectionEventHeadTail.Add(0);
}
public void Dispose()
{
m_MasterEventQ.Dispose();
m_ConnectionEventQ.Dispose();
m_ConnectionEventHeadTail.Dispose();
}
// The returned stream is valid until PopEvent is called again or until the main driver updates
public NetworkEvent.Type PopEvent(out int id, out int offset, out int size)
{
return PopEvent(out id, out offset, out size, out var _);
}
public NetworkEvent.Type PopEvent(out int id, out int offset, out int size, out int pipelineId)
{
offset = 0;
size = 0;
id = -1;
pipelineId = 0;
while (true)
{
SubQueueItem ev;
if (!m_MasterEventQ.TryDequeue(out ev))
{
return NetworkEvent.Type.Empty;
}
if (m_ConnectionEventHeadTail[ev.connection * 2] == ev.idx)
{
id = ev.connection;
return PopEventForConnection(ev.connection, out offset, out size, out pipelineId);
}
}
}
public NetworkEvent.Type PopEventForConnection(int connectionId, out int offset, out int size)
{
return PopEventForConnection(connectionId, out offset, out size, out var _);
}
public NetworkEvent.Type PopEventForConnection(int connectionId, out int offset, out int size, out int pipelineId)
{
offset = 0;
size = 0;
pipelineId = 0;
if (connectionId < 0 || connectionId >= m_ConnectionEventHeadTail.Length / 2)
return NetworkEvent.Type.Empty;
int idx = m_ConnectionEventHeadTail[connectionId * 2];
if (idx >= m_ConnectionEventHeadTail[connectionId * 2 + 1])
return NetworkEvent.Type.Empty;
m_ConnectionEventHeadTail[connectionId * 2] = idx + 1;
NetworkEvent ev = m_ConnectionEventQ[connectionId * MaxEvents + idx];
pipelineId = ev.pipelineId;
if (ev.type == NetworkEvent.Type.Data)
{
offset = ev.offset;
size = ev.size;
}
else if (ev.type == NetworkEvent.Type.Disconnect && ev.status != (int)Error.DisconnectReason.Default)
{
offset = -ev.status;
}
return ev.type;
}
public int GetCountForConnection(int connectionId)
{
if (connectionId < 0 || connectionId >= m_ConnectionEventHeadTail.Length / 2)
return 0;
return m_ConnectionEventHeadTail[connectionId * 2 + 1] - m_ConnectionEventHeadTail[connectionId * 2];
}
/// ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
/// internal helper functions ::::::::::::::::::::::::::::::::::::::::::
public void PushEvent(NetworkEvent ev)
{
int curMaxEvents = MaxEvents;
if (ev.connectionId >= m_ConnectionEventHeadTail.Length / 2)
{
// Connection id out of range, grow the number of connections in the queue
int oldSize = m_ConnectionEventHeadTail.Length;
m_ConnectionEventHeadTail.ResizeUninitialized((ev.connectionId + 1)*2);
for (;oldSize < m_ConnectionEventHeadTail.Length; ++oldSize)
m_ConnectionEventHeadTail[oldSize] = 0;
m_ConnectionEventQ.ResizeUninitialized((m_ConnectionEventHeadTail.Length / 2) * curMaxEvents);
}
int idx = m_ConnectionEventHeadTail[ev.connectionId * 2 + 1];
if (idx >= curMaxEvents)
{
// Grow the max items per queue and remap the queues
int oldMax = curMaxEvents;
while (idx >= curMaxEvents)
curMaxEvents *= 2;
int maxConnections = m_ConnectionEventHeadTail.Length / 2;
m_ConnectionEventQ.ResizeUninitialized(maxConnections * curMaxEvents);
for (int con = maxConnections-1; con >= 0; --con)
{
for (int i = m_ConnectionEventHeadTail[con*2+1]-1; i >= m_ConnectionEventHeadTail[con * 2]; --i)
{
m_ConnectionEventQ[con * curMaxEvents + i] = m_ConnectionEventQ[con * oldMax + i];
}
}
}
m_ConnectionEventQ[ev.connectionId * curMaxEvents + idx] = ev;
m_ConnectionEventHeadTail[ev.connectionId * 2 + 1] = idx + 1;
m_MasterEventQ.Enqueue(new SubQueueItem {connection = ev.connectionId, idx = idx});
}
internal void Clear()
{
m_MasterEventQ.Clear();
for (int i = 0; i < m_ConnectionEventHeadTail.Length; ++i)
{
m_ConnectionEventHeadTail[i] = 0;
}
}
struct SubQueueItem
{
public int connection;
public int idx;
}
private NativeQueue m_MasterEventQ;
private NativeList m_ConnectionEventQ;
private NativeList m_ConnectionEventHeadTail;
public Concurrent ToConcurrent()
{
Concurrent concurrent;
concurrent.m_ConnectionEventQ = m_ConnectionEventQ;
concurrent.m_ConnectionEventHeadTail = new Concurrent.ConcurrentConnectionQueue(m_ConnectionEventHeadTail);
return concurrent;
}
public struct Concurrent
{
[NativeContainer]
[NativeContainerIsAtomicWriteOnly]
internal unsafe struct ConcurrentConnectionQueue
{
[NativeDisableUnsafePtrRestriction] private UnsafeList* m_ConnectionEventHeadTail;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
private AtomicSafetyHandle m_Safety;
#endif
public ConcurrentConnectionQueue(NativeList queue)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
m_Safety = NativeListUnsafeUtility.GetAtomicSafetyHandle(ref queue);
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
#endif
m_ConnectionEventHeadTail = (UnsafeList*) NativeListUnsafeUtility.GetInternalListDataPtrUnchecked(ref queue);
}
public int Length
{
get { return m_ConnectionEventHeadTail->Length; }
}
public int Dequeue(int connectionId)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
#endif
int idx = -1;
if (connectionId < 0 || connectionId >= m_ConnectionEventHeadTail->Length / 2)
return -1;
while (idx < 0)
{
idx = ((int*)m_ConnectionEventHeadTail->Ptr)[connectionId * 2];
if (idx >= ((int*)m_ConnectionEventHeadTail->Ptr)[connectionId * 2 + 1])
return -1;
if (Interlocked.CompareExchange(ref ((int*)m_ConnectionEventHeadTail->Ptr)[connectionId * 2], idx + 1,
idx) != idx)
idx = -1;
}
return idx;
}
}
private int MaxEvents {
get { return m_ConnectionEventQ.Length / (m_ConnectionEventHeadTail.Length/2); }
}
public NetworkEvent.Type PopEventForConnection(int connectionId, out int offset, out int size)
{
return PopEventForConnection(connectionId, out offset, out size, out var _);
}
public NetworkEvent.Type PopEventForConnection(int connectionId, out int offset, out int size, out int pipelineId)
{
offset = 0;
size = 0;
pipelineId = 0;
int idx = m_ConnectionEventHeadTail.Dequeue(connectionId);
if (idx < 0)
return NetworkEvent.Type.Empty;
NetworkEvent ev = m_ConnectionEventQ[connectionId * MaxEvents + idx];
pipelineId = ev.pipelineId;
if (ev.type == NetworkEvent.Type.Data)
{
offset = ev.offset;
size = ev.size;
}
else if (ev.type == NetworkEvent.Type.Disconnect && ev.status != (int)Error.DisconnectReason.Default)
{
offset = -ev.status;
}
return ev.type;
}
[ReadOnly] internal NativeList m_ConnectionEventQ;
internal ConcurrentConnectionQueue m_ConnectionEventHeadTail;
}
}
}