该项目的目的是同时测试和演示来自 Unity DOTS 技术堆栈的多个新包。
您最多选择25个主题 主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

542 行
22 KiB

using System;
using System.Collections.Generic;
using System.IO;
using UnityEngine;
using NetworkCompression;
using Unity.Sample.Core;
using UnityEngine.Profiling;
public class NetworkConnectionCounters
{
public int bytesIn; // The number of user bytes received on this connection
public int bytesOut; // The number of user bytes sent on this connection
public int headerBitsIn; // The number of header bytes received on this connection
public int packagesIn; // The number of packages received on this connection (including package fragments)
public int packagesOut; // The number of packages sent on this connection (including package fragments)
public int packagesStaleIn; // The number of state packages we received
public int packagesDuplicateIn; // The number of duplicate packages we received
public int packagesOutOfOrderIn; // The number of packages we received out of order
public int packagesLostIn; // The number of incoming packages that was lost (i.e. holes in the package sequence)
public int packagesLostOut; // The number of outgoing packages that wasn't acked (either due to choke or network)
public int fragmentedPackagesIn; // The number of incoming packages that was fragmented
public int fragmentedPackagesOut; // The number of outgoing packages that was fragmented
public int fragmentedPackagesLostIn; // The number of incoming fragmented packages we couldn't reassemble
public int fragmentedPackagesLostOut; // The number of outgoing fragmented packages that wasn't acked
public int chokedPackagesOut; // The number of packages we dropped due to choke
public int eventsIn; // The total number of events received
public int eventsOut; // The total number of events sent
public int eventsLostOut; // The number of events that was lost
public int reliableEventsOut; // The number of reliable events sent
public int reliableEventResendOut; // The number of reliable events we had to resend
public Aggregator avgBytesIn = new Aggregator();
public Aggregator avgBytesOut = new Aggregator();
public Aggregator avgPackagesIn = new Aggregator();
public Aggregator avgPackagesOut = new Aggregator();
public Aggregator avgPackageSize = new Aggregator();
public void UpdateAverages()
{
avgBytesIn.Update(bytesIn);
avgBytesOut.Update(bytesOut);
avgPackagesIn.Update(packagesIn);
avgPackagesOut.Update(packagesOut);
}
}
public class PackageInfo
{
public long sentTime;
public bool fragmented;
public NetworkMessage content;
public List<NetworkEvent> events = new List<NetworkEvent>(10);
public virtual void Reset()
{
sentTime = 0;
fragmented = false;
content = 0;
foreach (var eventInfo in events)
eventInfo.Release();
events.Clear();
}
}
public class NetworkConnection<TCounters, TPackageInfo>
where TCounters : NetworkConnectionCounters, new()
where TPackageInfo : PackageInfo, new()
{
public int connectionId;
public INetworkTransport transport;
public TCounters counters = new TCounters();
public int rtt; // Round trip time (ping + time lost due to read/send frequencies)
public int inSequence; // The highest sequence of packages we have received
public ushort inSequenceAckMask; // The mask describing which of the last packages we have received relative to inSequence
public long inSequenceTime; // The time the last package was received
public int outSequence = 1; // The sequence of the next outgoing package
public int outSequenceAck; // The highest sequence of packages that have been acked
public ushort outSequenceAckMask; // The mask describing which of the last packaged have been acked related to outSequence
public NetworkConnection(int connectionId, INetworkTransport transport)
{
this.connectionId = connectionId;
this.transport = transport;
chokedTimeToNextPackage = 0;
}
public BinaryWriter debugSendStreamWriter;
/// <summary>
/// Called when the connection released (e.g. when the connection was disconnected)
/// unlike Reset, which can be called multiple times on the connection in order to reset any
/// state cached on the connection
/// </summary>
public virtual void Shutdown()
{
if (debugSendStreamWriter != null)
{
debugSendStreamWriter.Close();
debugSendStreamWriter.Dispose();
debugSendStreamWriter = null;
}
}
/// <summary>
/// Resets all cached connection state including reliable data pending acknowledgments
/// </summary>
public virtual void Reset()
{
}
protected bool CanSendPackage(ref BitOutputStream output)
{
if (!outstandingPackages.Available(outSequence)) // running out here means we hit 64 packs without any acks from client...
{
// We have too many outstanding packages. We need the other end to send something to us, so we know he
// is alive. This happens for example when we break the client in the debugger while the server is still
// sending messages but potentially it could also happen in extreme cases of congestion or package loss.
// We will try to send empty packages with low frequency to see if we can get the connection up and running again
if (Game.frameTime >= chokedTimeToNextPackage)
{
chokedTimeToNextPackage = Game.frameTime + NetworkConfig.netChokeSendInterval.FloatValue;
// Treat the last package as lost
int chokedSequence;
var info = outstandingPackages.TryGetByIndex(outSequence % outstandingPackages.Capacity, out chokedSequence);
GameDebug.Assert(info != null);
NotifyDelivered(chokedSequence, info, false);
counters.chokedPackagesOut++;
info.Reset();
outstandingPackages.Remove(chokedSequence);
// Send empty package
TPackageInfo emptyPackage;
BeginSendPackage(ref output, out emptyPackage);
CompleteSendPackage(emptyPackage, ref output);
}
return false;
}
return true;
}
// Returns the 'wide' packageSequenceNumber (i.e. 32 bit reconstructed from the 16bits sent over wire)
protected int ProcessPackageHeader(byte[] packageData, int packageSize, out NetworkMessage content, out byte[] assembledData, out int assembledSize, out int headerSize)
{
counters.packagesIn++;
assembledData = packageData;
assembledSize = packageSize;
headerSize = 0;
var input = new BitInputStream(packageData);
int headerStartInBits = input.GetBitPosition();
content = (NetworkMessage)input.ReadBits(8);
// TODO: Possible improvement is to ack on individual fragments not just entire message
if ((content & NetworkMessage.FRAGMENT) != 0)
{
// Package fragment
var fragmentPackageSequence = Sequence.FromUInt16((ushort)input.ReadBits(16), inSequence);
var numFragments = (int)input.ReadBits(8);
var fragmentIndex = (int)input.ReadBits(8);
var fragmentSize = (int)input.ReadBits(16);
FragmentReassemblyInfo assembly;
if (!m_FragmentReassembly.TryGetValue(fragmentPackageSequence, out assembly))
{
// If we run out of room in the reassembly buffer we will not be able to reassemble this package
if (!m_FragmentReassembly.Available(fragmentPackageSequence))
counters.fragmentedPackagesLostIn++;
GameDebug.Assert(numFragments <= NetworkConfig.maxFragments);
assembly = m_FragmentReassembly.Acquire(fragmentPackageSequence);
assembly.numFragments = numFragments;
assembly.receivedMask = 0;
assembly.receivedCount = 0;
}
GameDebug.Assert(assembly.numFragments == numFragments);
GameDebug.Assert(fragmentIndex < assembly.numFragments);
counters.headerBitsIn += input.GetBitPosition() - headerStartInBits;
if ((assembly.receivedMask & (1U << fragmentIndex)) != 0)
{
// Duplicate package fragment
counters.packagesDuplicateIn++;
return 0;
}
assembly.receivedMask |= 1U << fragmentIndex;
assembly.receivedCount++;
input.ReadBytes(assembly.data, fragmentIndex * NetworkConfig.packageFragmentSize, fragmentSize);
if (assembly.receivedCount < assembly.numFragments)
{
return 0; // Not fully assembled
}
// Continue processing package as we have now reassembled the package
assembledData = assembly.data;
assembledSize = fragmentIndex * NetworkConfig.packageFragmentSize + fragmentSize;
input.Initialize(assembledData);
headerStartInBits = 0;
content = (NetworkMessage)input.ReadBits(8);
}
var inSequenceNew = Sequence.FromUInt16((ushort)input.ReadBits(16), inSequence);
var outSequenceAckNew = Sequence.FromUInt16((ushort)input.ReadBits(16), outSequenceAck);
var outSequenceAckMaskNew = (ushort)input.ReadBits(16);
if (inSequenceNew > inSequence)
{
// If we have a hole in the package sequence that will fall off the ack mask that
// means the package (inSequenceNew-15 and before) will be considered lost (either it will never come or we will
// reject it as being stale if we get it at a later point in time)
var distance = inSequenceNew - inSequence;
for (var i = 0; i < Math.Min(distance, 15); ++i) // TODO : Fix this contant
{
if ((inSequenceAckMask & 1 << (15 - i)) == 0)
counters.packagesLostIn++;
}
// If there is a really big hole then those packages are considered lost as well
// Update the incoming ack mask.
if (distance > 15)
{
counters.packagesLostIn += distance - 15;
inSequenceAckMask = 1; // all is lost except current package
}
else
{
inSequenceAckMask <<= distance;
inSequenceAckMask |= 1;
}
inSequence = inSequenceNew;
inSequenceTime = NetworkUtils.stopwatch.ElapsedMilliseconds;
}
else if (inSequenceNew < inSequence)
{
// Package is out of order
// Check if the package is stale
// NOTE : We rely on the fact that we will reject packages that we cannot ack due to the size
// of the ack mask, so we don't have to worry about resending messages as long as we do that
// after the original package has fallen off the ack mask.
var distance = inSequence - inSequenceNew;
if (distance > 15) // TODO : Fix this constant
{
counters.packagesStaleIn++;
return 0;
}
// Check if the package is a duplicate
var ackBit = 1 << distance;
if ((ackBit & inSequenceAckMask) != 0)
{
// Duplicate package
counters.packagesDuplicateIn++;
return 0;
}
// Accept the package out of order
counters.packagesOutOfOrderIn++;
inSequenceAckMask |= (ushort)ackBit;
}
else
{
// Duplicate package
counters.packagesDuplicateIn++;
return 0;
}
if (inSequenceNew % 3 == 0)
{
var timeOnServer = (ushort)input.ReadBits(8);
TPackageInfo info;
if (outstandingPackages.TryGetValue(outSequenceAckNew, out info))
{
var now = NetworkUtils.stopwatch.ElapsedMilliseconds;
rtt = (int)(now - info.sentTime - timeOnServer);
}
}
// If the ack sequence is not higher we have nothing new to do
if (outSequenceAckNew <= outSequenceAck)
{
headerSize = input.Align();
return inSequenceNew;
}
// Find the sequence numbers that we have to consider lost
var seqsBeforeThisAlreadyNotifedAsLost = outSequenceAck - 15;
var seqsBeforeThisAreLost = outSequenceAckNew - 15;
for (int sequence = seqsBeforeThisAlreadyNotifedAsLost; sequence <= seqsBeforeThisAreLost; ++sequence)
{
// Handle conditions before first 15 packets
if (sequence < 0)
continue;
// If seqence covered by old ack mask, we may already have received it (and notified)
int bitnum = outSequenceAck - sequence;
var ackBit = bitnum >= 0 ? 1 << bitnum : 0;
var notNotified = (ackBit & outSequenceAckMask) == 0;
if (outstandingPackages.Exists(sequence) && notNotified)
{
var info = outstandingPackages[sequence];
NotifyDelivered(sequence, info, false);
counters.packagesLostOut++;
if (info.fragmented)
counters.fragmentedPackagesLostOut++;
info.Reset();
outstandingPackages.Remove(sequence);
}
}
outSequenceAck = outSequenceAckNew;
outSequenceAckMask = outSequenceAckMaskNew;
// Ack packages if they haven't been acked already
for (var sequence = Math.Max(outSequenceAck - 15, 0); sequence <= outSequenceAck; ++sequence)
{
var ackBit = 1 << outSequenceAck - sequence;
if (outstandingPackages.Exists(sequence) && (ackBit & outSequenceAckMask) != 0)
{
var info = outstandingPackages[sequence];
NotifyDelivered(sequence, info, true);
info.Reset();
outstandingPackages.Remove(sequence);
}
}
counters.headerBitsIn += input.GetBitPosition() - headerStartInBits;
headerSize = input.Align();
return inSequenceNew;
}
protected void BeginSendPackage(ref BitOutputStream output, out TPackageInfo info)
{
GameDebug.Assert(outstandingPackages.Available(outSequence), "NetworkConnection.BeginSendPackage : package info not available for sequence : {0}", outSequence);
output.WriteBits(0, 8); // Package content flags (will set later as we add messages)
output.WriteBits(Sequence.ToUInt16(outSequence), 16);
output.WriteBits(Sequence.ToUInt16(inSequence), 16);
output.WriteBits(inSequenceAckMask, 16);
// Send rtt info every 3th package. We calculate the RTT as the time from sending the package
// and receiving the ack for the package minus the time the package spent on the server
// TODO should this be sent from client to server?
if (outSequence % 3 == 0)
{
var now = NetworkUtils.stopwatch.ElapsedMilliseconds;
// TOULF Is 255 enough?
var timeOnServer = (byte)Math.Min(now - inSequenceTime, 255);
output.WriteBits(timeOnServer, 8);
}
info = outstandingPackages.Acquire(outSequence);
}
protected void AddMessageContentFlag(NetworkMessage message)
{
m_PackageBuffer[0] |= (byte)message;
}
protected int CompleteSendPackage(TPackageInfo info, ref BitOutputStream output)
{
Profiler.BeginSample("NetworkConnection.CompleteSendPackage()");
info.sentTime = NetworkUtils.stopwatch.ElapsedMilliseconds;
info.content = (NetworkMessage)m_PackageBuffer[0];
int packageSize = output.Flush();
GameDebug.Assert(packageSize < NetworkConfig.maxPackageSize, "packageSize < NetworkConfig.maxPackageSize");
if (debugSendStreamWriter != null)
{
debugSendStreamWriter.Write(m_PackageBuffer, 0, packageSize);
debugSendStreamWriter.Write((UInt32)0xedededed);
}
if (packageSize > NetworkConfig.packageFragmentSize)
{
// Package is too big and needs to be sent as fragments
var numFragments = packageSize / NetworkConfig.packageFragmentSize;
//GameDebug.Log("FRAGMENTING: " + connectionId + ": " + packageSize + " (" + numFragments + ")");
var lastFragmentSize = packageSize % NetworkConfig.packageFragmentSize;
if (lastFragmentSize != 0)
++numFragments;
else
lastFragmentSize = NetworkConfig.packageFragmentSize;
for (var i = 0; i < numFragments; ++i)
{
var fragmentSize = i < numFragments - 1 ? NetworkConfig.packageFragmentSize : lastFragmentSize;
var fragmentOutput = new BitOutputStream(m_FragmentBuffer);
fragmentOutput.WriteBits((uint)NetworkMessage.FRAGMENT, 8); // Package fragment identifier
fragmentOutput.WriteBits(Sequence.ToUInt16(outSequence), 16);
fragmentOutput.WriteBits((uint)numFragments, 8);
fragmentOutput.WriteBits((uint)i, 8);
fragmentOutput.WriteBits((uint)fragmentSize, 16);
fragmentOutput.WriteBytes(m_PackageBuffer, i * NetworkConfig.packageFragmentSize, fragmentSize);
int fragmentPackageSize = fragmentOutput.Flush();
transport.SendData(connectionId, m_FragmentBuffer, fragmentPackageSize);
counters.packagesOut++;
counters.bytesOut += fragmentPackageSize;
}
counters.fragmentedPackagesOut++;
}
else
{
transport.SendData(connectionId, m_PackageBuffer, packageSize);
counters.packagesOut++;
counters.bytesOut += packageSize;
}
++outSequence;
Profiler.EndSample();
return packageSize;
}
protected virtual void NotifyDelivered(int sequence, TPackageInfo info, bool madeIt)
{
if (madeIt)
{
// Release received reliable events
foreach (var eventInfo in info.events)
{
if (!ackedEventTypes.Contains(eventInfo.type))
ackedEventTypes.Add(eventInfo.type);
eventInfo.Release();
}
}
else
{
foreach (var eventInfo in info.events)
{
counters.eventsLostOut++;
if (eventInfo.reliable)
{
// Re-add dropped reliable events to outgoing events
counters.reliableEventResendOut++;
GameDebug.Log("Resending lost reliable event: " + ((GameNetworkEvents.EventType)eventInfo.type.typeId) + ":" + eventInfo.sequence);
eventsOut.Add(eventInfo);
}
else
eventInfo.Release();
}
}
info.events.Clear();
}
// Events handling
public void QueueEvent(NetworkEvent info)
{
eventsOut.Add(info);
info.AddRef();
}
public void ReadEvents<TInputStream>(ref TInputStream input, INetworkCallbacks networkConsumer) where TInputStream : NetworkCompression.IInputStream
{
//input.SetStatsType(NetworkCompressionReader.Type.Event);
var numEvents = NetworkEvent.ReadEvents(eventTypesIn, connectionId, ref input, networkConsumer);
counters.eventsIn += numEvents;
}
public void WriteEvents<TOutputStream>(TPackageInfo info, ref TOutputStream output) where TOutputStream : NetworkCompression.IOutputStream
{
if (eventsOut.Count == 0)
return;
foreach (var eventInfo in eventsOut)
{
counters.eventsOut++;
if (eventInfo.reliable)
counters.reliableEventsOut++;
}
AddMessageContentFlag(NetworkMessage.Events);
GameDebug.Assert(info.events.Count == 0);
NetworkEvent.WriteEvents(eventsOut, ackedEventTypes, ref output);
info.events.AddRange(eventsOut);
eventsOut.Clear();
}
double chokedTimeToNextPackage;
public SequenceBuffer<TPackageInfo> outstandingPackages = new SequenceBuffer<TPackageInfo>(64, () => new TPackageInfo());
class FragmentReassemblyInfo
{
public int numFragments;
public uint receivedMask;
public uint receivedCount;
public byte[] data = new byte[1024 * 64];
}
SequenceBuffer<FragmentReassemblyInfo> m_FragmentReassembly = new SequenceBuffer<FragmentReassemblyInfo>(8, () => new FragmentReassemblyInfo());
byte[] m_FragmentBuffer = new byte[2048];
public byte[] m_PackageBuffer = new byte[1024 * 64]; //TODO: fix this
// Events
Dictionary<ushort, NetworkEventType> eventTypesIn = new Dictionary<ushort, NetworkEventType>();
List<NetworkEventType> ackedEventTypes = new List<NetworkEventType>();
public List<NetworkEvent> eventsOut = new List<NetworkEvent>(); // TODO : Should be private (content calc issue)
}