浏览代码

Updated to latest Relay Packages, and removed Candidates dependency.

Included the Transport Package locally until it's ready.
/main/staging/Open_beta_Update/open_beta_relay
当前提交
e1d78150
共有 201 个文件被更改,包括 10421 次插入31 次删除
  1. 12
      Packages/manifest.json
  2. 9
      Packages/packages-lock.json
  3. 24
      ProjectSettings/PackageManagerSettings.asset
  4. 90
      Packages/com.unity.transport/.gitattributes
  5. 3
      Packages/com.unity.transport/.gitmodules
  6. 250
      Packages/com.unity.transport/CHANGELOG.md
  7. 7
      Packages/com.unity.transport/CHANGELOG.md.meta
  8. 20
      Packages/com.unity.transport/DESIGN.md
  9. 7
      Packages/com.unity.transport/DESIGN.md.meta
  10. 20
      Packages/com.unity.transport/Documentation~/TableOfContents.md
  11. 17
      Packages/com.unity.transport/Documentation~/connection-state-machine.md
  12. 60
      Packages/com.unity.transport/Documentation~/event-consumption.md
  13. 94
      Packages/com.unity.transport/Documentation~/images/Pipeline-stages-diagram.png
  14. 117
      Packages/com.unity.transport/Documentation~/images/com.unity.transport.connection.png
  15. 78
      Packages/com.unity.transport/Documentation~/images/com.unity.transport.driver.png
  16. 90
      Packages/com.unity.transport/Documentation~/images/console-view.PNG
  17. 16
      Packages/com.unity.transport/Documentation~/images/game-object.PNG
  18. 58
      Packages/com.unity.transport/Documentation~/images/inspector.PNG
  19. 56
      Packages/com.unity.transport/Documentation~/images/layercake.png
  20. 44
      Packages/com.unity.transport/Documentation~/index.md
  21. 25
      Packages/com.unity.transport/Documentation~/install.md
  22. 136
      Packages/com.unity.transport/Documentation~/pipelines-usage.md
  23. 68
      Packages/com.unity.transport/Documentation~/samples/clientbehaviour.cs.md
  24. 95
      Packages/com.unity.transport/Documentation~/samples/jobifiedclientbehaviour.cs.md
  25. 115
      Packages/com.unity.transport/Documentation~/samples/jobifiedserverbehaviour.cs.md
  26. 81
      Packages/com.unity.transport/Documentation~/samples/serverbehaviour.cs.md
  27. 20
      Packages/com.unity.transport/Documentation~/update-flow.md
  28. 416
      Packages/com.unity.transport/Documentation~/workflow-client-server-jobs.md
  29. 408
      Packages/com.unity.transport/Documentation~/workflow-client-server.md
  30. 5
      Packages/com.unity.transport/LICENSE.md
  31. 7
      Packages/com.unity.transport/LICENSE.md.meta
  32. 10
      Packages/com.unity.transport/Pipfile
  33. 7
      Packages/com.unity.transport/Pipfile.meta
  34. 46
      Packages/com.unity.transport/README.md
  35. 7
      Packages/com.unity.transport/README.md.meta
  36. 8
      Packages/com.unity.transport/Runtime.meta
  37. 5
      Packages/com.unity.transport/Runtime/AssemblyInfo.cs
  38. 11
      Packages/com.unity.transport/Runtime/AssemblyInfo.cs.meta
  39. 89
      Packages/com.unity.transport/Runtime/AtomicFreeList.cs
  40. 11
      Packages/com.unity.transport/Runtime/AtomicFreeList.cs.meta
  41. 112
      Packages/com.unity.transport/Runtime/Base64.cs
  42. 3
      Packages/com.unity.transport/Runtime/Base64.cs.meta
  43. 91
      Packages/com.unity.transport/Runtime/BaselibNetworkArray.cs
  44. 11
      Packages/com.unity.transport/Runtime/BaselibNetworkArray.cs.meta
  45. 590
      Packages/com.unity.transport/Runtime/BaselibNetworkInterface.cs
  46. 11
      Packages/com.unity.transport/Runtime/BaselibNetworkInterface.cs.meta
  47. 1001
      Packages/com.unity.transport/Runtime/DataStream.cs
  48. 11
      Packages/com.unity.transport/Runtime/DataStream.cs.meta
  49. 57
      Packages/com.unity.transport/Runtime/HMACSHA256.cs
  50. 3
      Packages/com.unity.transport/Runtime/HMACSHA256.cs.meta
  51. 153
      Packages/com.unity.transport/Runtime/INetworkInterface.cs
  52. 11
      Packages/com.unity.transport/Runtime/INetworkInterface.cs.meta
  53. 164
      Packages/com.unity.transport/Runtime/IPCManager.cs
  54. 11
      Packages/com.unity.transport/Runtime/IPCManager.cs.meta
  55. 197
      Packages/com.unity.transport/Runtime/IPCNetworkInterface.cs
  56. 11
      Packages/com.unity.transport/Runtime/IPCNetworkInterface.cs.meta
  57. 195
      Packages/com.unity.transport/Runtime/NetworkCompressionModel.cs
  58. 11
      Packages/com.unity.transport/Runtime/NetworkCompressionModel.cs.meta
  59. 142
      Packages/com.unity.transport/Runtime/NetworkConnection.cs
  60. 11
      Packages/com.unity.transport/Runtime/NetworkConnection.cs.meta
  61. 1001
      Packages/com.unity.transport/Runtime/NetworkDriver.cs
  62. 11
      Packages/com.unity.transport/Runtime/NetworkDriver.cs.meta
  63. 403
      Packages/com.unity.transport/Runtime/NetworkEndPoint.cs
  64. 11
      Packages/com.unity.transport/Runtime/NetworkEndPoint.cs.meta
  65. 274
      Packages/com.unity.transport/Runtime/NetworkEventQueue.cs
  66. 11
      Packages/com.unity.transport/Runtime/NetworkEventQueue.cs.meta
  67. 62
      Packages/com.unity.transport/Runtime/NetworkParams.cs
  68. 11
      Packages/com.unity.transport/Runtime/NetworkParams.cs.meta
  69. 978
      Packages/com.unity.transport/Runtime/NetworkPipeline.cs
  70. 11
      Packages/com.unity.transport/Runtime/NetworkPipeline.cs.meta
  71. 271
      Packages/com.unity.transport/Runtime/NetworkProtocol.cs
  72. 11
      Packages/com.unity.transport/Runtime/NetworkProtocol.cs.meta
  73. 31
      Packages/com.unity.transport/Runtime/NetworkProtocols.cs
  74. 11
      Packages/com.unity.transport/Runtime/NetworkProtocols.cs.meta
  75. 8
      Packages/com.unity.transport/Runtime/Pipelines.meta
  76. 226
      Packages/com.unity.transport/Runtime/Pipelines/FragmentationPipelineStage.cs
  77. 11
      Packages/com.unity.transport/Runtime/Pipelines/FragmentationPipelineStage.cs.meta
  78. 11
      Packages/com.unity.transport/Runtime/Pipelines/FragmentationUtility.cs
  79. 11
      Packages/com.unity.transport/Runtime/Pipelines/FragmentationUtility.cs.meta
  80. 47
      Packages/com.unity.transport/Runtime/Pipelines/NullPipelineStage.cs
  81. 11
      Packages/com.unity.transport/Runtime/Pipelines/NullPipelineStage.cs.meta
  82. 187
      Packages/com.unity.transport/Runtime/Pipelines/ReliableSequencedPipelineStage.cs
  83. 11
      Packages/com.unity.transport/Runtime/Pipelines/ReliableSequencedPipelineStage.cs.meta
  84. 761
      Packages/com.unity.transport/Runtime/Pipelines/ReliableUtility.cs
  85. 11
      Packages/com.unity.transport/Runtime/Pipelines/ReliableUtility.cs.meta
  86. 214
      Packages/com.unity.transport/Runtime/Pipelines/SimulatorPipelineStage.cs
  87. 11
      Packages/com.unity.transport/Runtime/Pipelines/SimulatorPipelineStage.cs.meta
  88. 284
      Packages/com.unity.transport/Runtime/Pipelines/SimulatorUtility.cs
  89. 11
      Packages/com.unity.transport/Runtime/Pipelines/SimulatorUtility.cs.meta
  90. 75
      Packages/com.unity.transport/Runtime/Pipelines/UnreliableSequencedPipelineStage.cs
  91. 11
      Packages/com.unity.transport/Runtime/Pipelines/UnreliableSequencedPipelineStage.cs.meta
  92. 8
      Packages/com.unity.transport/Runtime/Relay.meta
  93. 8
      Packages/com.unity.transport/Runtime/Relay/Messages.meta

12
Packages/manifest.json


"com.unity.services.authentication": "1.0.0-pre.4",
"com.unity.services.core": "1.0.0",
"com.unity.services.lobby": "1.0.0-pre.4",
"com.unity.services.relay": "1.0.0-pre.4",
"com.unity.services.relay": "1.0.1-pre.1",
"com.unity.sysroot.linux-x86_64": "0.1.15-preview",
"com.unity.test-framework": "1.1.27",
"com.unity.textmeshpro": "3.0.6",

"com.unity.modules.wind": "1.0.0",
"com.unity.modules.xr": "1.0.0"
},
"scopedRegistries": [
{
"name": "Candidates",
"url": "https://artifactory.prd.cds.internal.unity3d.com/artifactory/api/npm/upm-candidates",
"scopes": [
"com.unity.transport"
]
}
]
"scopedRegistries": []
}

9
Packages/packages-lock.json


"url": "https://packages.unity.com"
},
"com.unity.services.relay": {
"version": "1.0.0-pre.4",
"version": "1.0.1-pre.1",
"depth": 0,
"source": "registry",
"dependencies": {

"url": "https://packages.unity.com"
},
"com.unity.transport": {
"version": "1.0.0-pre.1",
"version": "file:com.unity.transport",
"source": "registry",
"source": "embedded",
},
"url": "https://artifactory.prd.cds.internal.unity3d.com/artifactory/api/npm/upm-candidates"
}
},
"com.unity.ugui": {
"version": "1.0.0",

24
ProjectSettings/PackageManagerSettings.asset


m_Scopes: []
m_IsDefault: 1
m_Capabilities: 7
- m_Id: scoped:Candidates
m_Name: Candidates
m_Url: https://artifactory.prd.cds.internal.unity3d.com/artifactory/api/npm/upm-candidates
m_Scopes:
- com.unity.transport
m_IsDefault: 0
m_Capabilities: 0
m_UserSelectedRegistryName: Candidates
m_UserSelectedRegistryName:
m_Id: scoped:Candidates
m_Name: Candidates
m_Url: https://artifactory.prd.cds.internal.unity3d.com/artifactory/api/npm/upm-candidates
m_Scopes:
- com.unity.transport
m_Id:
m_Name:
m_Url:
m_Scopes: []
m_Name: Candidates
m_Url: https://artifactory.prd.cds.internal.unity3d.com/artifactory/api/npm/upm-candidates
m_Name:
m_Url:
- com.unity.transport
-
m_SelectedScopeIndex: 0

90
Packages/com.unity.transport/.gitattributes


* text=auto eol=lf
**/HavokNative.framework/HavokNative filter=lfs diff=lfs merge=lfs -text
**/cwebp filter=lfs diff=lfs merge=lfs -text
**/moz-cjpeg filter=lfs diff=lfs merge=lfs -text
**/pngcrush filter=lfs diff=lfs merge=lfs -text
*.3[dS][sS] filter=lfs diff=lfs merge=lfs -text
*.DOC diff=astextplain
*.DOCX diff=astextplain
*.DOT diff=astextplain
*.RTF diff=astextplain
*.[aA] filter=lfs diff=lfs merge=lfs -text
*.[aA][iI][fF][cC] filter=lfs diff=lfs merge=lfs -text
*.[aA][iI][fF][fF]? filter=lfs diff=lfs merge=lfs -text
*.[aA][pP][kK] filter=lfs diff=lfs merge=lfs -text
*.[aA][vV][iI] filter=lfs diff=lfs merge=lfs -text
*.[bB][lL][eE][nN][dD] filter=lfs diff=lfs merge=lfs -text
*.[cC][oO][lL][lL][aA][dD][dD][aA] filter=lfs diff=lfs merge=lfs -text
*.[dD][lL][lL] filter=lfs diff=lfs merge=lfs -text
*.[dD][yY][lL][iI][bB] filter=lfs diff=lfs merge=lfs -text
*.[eE][xX][eE] filter=lfs diff=lfs merge=lfs -text
*.[fF][bB][xX] filter=lfs diff=lfs merge=lfs -text
*.[fF][lL][aA] filter=lfs diff=lfs merge=lfs -text
*.[fF][lL][aA][cC] filter=lfs diff=lfs merge=lfs -text
*.[fF][lL][vV] filter=lfs diff=lfs merge=lfs -text
*.[gG][iI][fF] filter=lfs diff=lfs merge=lfs -text
*.[gG][zZ][iI][pP] filter=lfs diff=lfs merge=lfs -text
*.[iI][pP][aA] filter=lfs diff=lfs merge=lfs -text
*.[jJ][aA][rR] filter=lfs diff=lfs merge=lfs -text
*.[jJ][pP][gG] filter=lfs diff=lfs merge=lfs -text
*.[mM]4[vV] filter=lfs diff=lfs merge=lfs -text
*.[mM][kK][vV] filter=lfs diff=lfs merge=lfs -text
*.[mM][oO][vV] filter=lfs diff=lfs merge=lfs -text
*.[mM][pP]3 filter=lfs diff=lfs merge=lfs -text
*.[mM][pP][2-4]? filter=lfs diff=lfs merge=lfs -text
*.[mM][pP][eE]?[gG] filter=lfs diff=lfs merge=lfs -text
*.[oO][bB][jJ] filter=lfs diff=lfs merge=lfs -text
*.[oO][gG][gG] filter=lfs diff=lfs merge=lfs -text
*.[oO][gG][vV] filter=lfs diff=lfs merge=lfs -text
*.[oO][tT][fF] filter=lfs diff=lfs merge=lfs -text
*.[pP][dD][fF] filter=lfs diff=lfs merge=lfs -text
*.[pP][nN][gG] filter=lfs diff=lfs merge=lfs -text
*.[pP][sS][dD] filter=lfs diff=lfs merge=lfs -text
*.[rR][aA][rR] filter=lfs diff=lfs merge=lfs -text
*.[sS][oO] filter=lfs diff=lfs merge=lfs -text
*.[sS][tT][lL] filter=lfs diff=lfs merge=lfs -text
*.[sS][wW][fF] filter=lfs diff=lfs merge=lfs -text
*.[tT][aA][rR] filter=lfs diff=lfs merge=lfs -text
*.[tT][gG][zZ] filter=lfs diff=lfs merge=lfs -text
*.[tT][tT][fF] filter=lfs diff=lfs merge=lfs -text
*.[wW][aA][vV] filter=lfs diff=lfs merge=lfs -text
*.[wW][eE][bB][mM] filter=lfs diff=lfs merge=lfs -text
*.[zZ][iI][pP] filter=lfs diff=lfs merge=lfs -text
*.aif filter=lfs diff=lfs merge=lfs -text
*.anim filter=lfs diff=lfs merge=lfs -text
*.api eol=lf text
*.asset text
*.bundle filter=lfs diff=lfs merge=lfs -text
*.cginc text
*.compute text
*.compute text
*.cs diff=csharp text
*.dat filter=lfs diff=lfs merge=lfs -text
*.doc diff=astextplain
*.docx diff=astextplain
*.dot diff=astextplain
*.entities filter=lfs diff=lfs merge=lfs -text
*.entityheader filter=lfs diff=lfs merge=lfs -text
*.exr filter=lfs diff=lfs merge=lfs -text
*.gradle text eol=lf
*.mat text
*.md text
*.md5 text
*.meta text
*.pdb filter=lfs diff=lfs merge=lfs -text
*.prefab text
*.psb filter=lfs diff=lfs merge=lfs -text
*.raw filter=lfs diff=lfs merge=lfs -text
*.rtf diff=astextplain
*.shader text
*.tga filter=lfs diff=lfs merge=lfs -text
*.tif filter=lfs diff=lfs merge=lfs -text
*.tt eol=crlf text
*.txt text
*.unitypackage filter=lfs diff=lfs merge=lfs -text
*.yaml eol=lf
*.yml eol=lf
*/ProjectSettings/*.asset text
Havok.Vdb.dll filter=lfs diff=lfs merge=lfs -text
HavokNative.dll filter=lfs diff=lfs merge=lfs -text
HavokVisualDebugger.exe filter=lfs diff=lfs merge=lfs -text

3
Packages/com.unity.transport/.gitmodules


[submodule "Tools/recipe-engine"]
path = Tools~/recipe-engine
url = ../recipe-engine.git

250
Packages/com.unity.transport/CHANGELOG.md


# Change log
## [0.9.0] - 2021-05-10
### New features
* Added support for long serialization and delta compression.
* Upgraded collections to 1.0.0
* Added a new network interface for WebSockets, can be used in both native and web builds.
### Changes
* Minimum required Unity version has changed to 2020.3.0f1.
* The transport package can be compiled with the tiny c# profile and for WebGL, but WebGL builds only support IPC - not sockets.
### Fixes
### Upgrade guide
## [0.8.0] - 2021-03-23
### New features
* Added overloads of `PopEvent` and `PopEventForConnection` which return the pipeline used as an out parameter.
### Changes
### Fixes
* Fixed some compatility issues with tiny.
* Fixed a crash when sending messages slightly less than one MTU using the fragmentation pipeline.
* Fixed a bug causing `NetworkDriver.RemoteEndPoint` to return an invalid value when using the default network interface.
### Upgrade guide
## [0.7.0] - 2021-02-05
### New features
* Added `DataStreamWriter.WriteRawbits` and `DataStreamWriter.ReadRawBits` for reading and writing raw bits from a data stream.
### Changes
* Optimized the `NetworkCompressionModel` to find buckets in constant time.
* Changed the error behavior of `DataStreamReader` to be consistent between the editor and players.
### Fixes
* Fixed a crash when receiving a packet with an invalid pipeline identifier.
### Upgrade guide
## [0.6.0] - 2020-11-26
### New features
* An error handling pass has been made and `Error.StatusCode` have been added to indicate more specific errors.
* `Error.DisconnectReason` has been added, so when NetworkDriver.PopEvent returns a `NetworkEvent.Type.Disconnect` the reader returned contains 1 byte of data indicating the reason.
### Changes
* The function signature for NetworkDriver.BeginSend has changed. It now returns an `int` value indicating if the function succeeded or not and the DataStreamWriter now instead is returned as a `out` parameter.
* The function signature for INetworkInterface.Initialize has changed. It now requires you to return an `int` value indicating if the function succeeded or not.
* The function signature for INetworkInterface.CreateInterfaceEndPoint has changed. It now requires you to return an `int` value indicating if the function succeeded or not, and NetworkInterfaceEndPoint is now returned as a `out` parameter.
### Fixes
* Fixed a potential crash when receiving a malformated packet.
* Fixed an issue where the DataStream could sometimes fail writing packet uints before the buffer was full.
### Upgrade guide
* `NetworkDriver.BeginSend` now returns an `int` indicating a `Error.StatusCode`, and the `DataStreamWriter` is passed as an `out` parameter.
## [0.5.0] - 2020-10-01
### New features
### Changes
### Fixes
* Fixed display of ipv6 addresses as strings
### Upgrade guide
## [0.4.1] - 2020-09-10
### New features
* Added `NetworkDriver.GetEventQueueSizeForConnection` which allows you to check how many pending events a connection has.
### Changes
### Fixes
* Fixed a compatibility isue with DOTS Runtime.
### Upgrade guide
## [0.4.0-preview.3] - 2020-08-21
### New features
* Added a new fragmentation pipeline which allows you to send messages larger than one MTU. If the `FragmentationPipelineStage` is part of the pipeline you are trying to send with the `NetworkDriver` will allow a `requiredPayloadSize` larger than one MTU to be specified and split the message into multiple packages.
### Changes
* The methods to read and write strings in the `DataStreamReader`/`DataStreamWriter` have been changed to use `FixedString<N>` instead of `NativeString<N>`. The name of the methods have also changed from `ReadString` to `ReadFixedString64` - and similar changes for write and the packed version of the calls. The calls support `FixedString32`, `FixedString64`, `FixedString128`, `FixedString512` and `FixedString4096`.
* Minimum required Unity version has changed to 2020.1.2.
### Fixes
### Upgrade guide
The data stream methods for reading and writing strings have changed, they now take `FixedString64` instead of `NativeString64` and the names have changed as follows:
* `DataStreamReader.ReadString` -> `DataStreamReader.ReadFixedString64`
* `DataStreamReader.ReadPackedStringDelta` -> `DataStreamReader.ReadPackedFixedString64Delta`
* `DataStreamWriter.WriteString` -> `DataStreamWriter.WriteFixedString64`
* `DataStreamWriter.WritePackedStringDelta` -> `DataStreamWriter.WritePackedFixedString64Delta`
The transport now requires Unity 2020.1.2.
## [0.3.1-preview.4] - 2020-06-05
### New features
### Changes
* Added a new `requiredPayloadSize` parameter to `BeginSend`. The required size cannot be larger than `NetworkParameterConstants.MTU`.
* Added errorcode parameter to a `network_set_nonblocking`, `network_set_send_buffer_size` and `network_set_receive_buffer_size` in `NativeBindings`.
* Additional APIs added to `NativeBindings`: `network_set_blocking`, `network_get_send_buffer_size`, `network_get_receive_buffer_size`, `network_set_receive_timeout`, `network_set_send_timeout`.
* Implemented `NetworkEndPoint.AddressAsString`.
### Fixes
* Fixed an issue in the reliable pipeline which would cause it to not recover if one end did not receive packages for a while.
* Fixed `NetworkInterfaceEndPoint` and `NetworkEndPoint` `GetHashCode` implementation.
* Fixed invalid use of strings when specifying the size of socket buffers in the native bindings.
### Upgrade guide
## [0.3.0-preview.6] - 2020-02-24
### New features
### Changes
* Pipelines are now registered by calling `NetworkPipelineStageCollection.RegisterPipelineStage` before creating a `NetworkDriver`. The built-in pipelines do not require explicit registration. The interface for implementing pipelines has been changed to support this.
* NetworkDriver is no longer a generic type. You pass it an interface when creating the `NetworkDriver`, which means you can switch between backends without modifying all usage of the driver. There is a new `NetworkDriver.Create` which creates a driver with the default `NetworkInterface`. It is also possible to create a `new NetworkDriver` by passing a `NetworkInterface` instance as the first argument.
* `NetworkDriver.Send` is replaced by `BeginSend` and `EndSend`. This allows us to do less data copying when sending messages. The interface for implementing new network interfaces has been changed to support this.
* `DataStreamReader` and `DataStreamWriter` no longer owns any memory. They are just reading/writing the data of a `NativeArray<byte>`.
* `DataStreamWriter` has explicit types for all Write methods.
* `DataStreamReader.Context` has been removed.
* Error handling for `DataStreamWriter` has been improved, on failure it returns false and sets `DataStreamWriter.HasFailedWrites` to true. `DataStreamReader` returns a default value and sets `DataStreamReader.HasFailedReads` to true. `DataStreamReader` will throw an exception instead of returning a default value in the editor.
* IPCManager is no longer public, it is still possible to create a `NetworkDriver` with a `IPCNetworkInterface`.
* Added `NetworkDriver.ScheduleFlushSend` which must be called to guarantee that messages are send before next call to `NetworkDriver.ScheduleUpdate`.
* Added `NetworkDriver.LastUpdateTime` to get the update time the `NetworkDriver` used for the most recent update.
* Removed the IPC address family, use a IPv4 localhost address instead.
### Fixes
* Fixed a memory overflow in the reliability pipeline.
* Made the soaker report locale independent.
### Upgrade guide
Creation and type of `NetworkDriver` has changed, use `NetworkDriver.Create` or pass an instance of a `NetworkInterface` to the `NetworkDriver` constructor.
`NetworkDriver.Send` has been replaced by a pair of `NetworkDriver.BeginSend` and `NetworkDriver.EndSend`. Calling `BeginSend` will return a `DataStreamWriter` to which you write the data. The `DataStreamWriter` is then passed to `EndSend`.
All write calls in `DataStreamWriter` need an explicit type, for example `Write(0)` should be replaced by `WriteInt(0)`.
`DataStreamWriter` no longer shares current position between copies, if you call a method which writes you must pass it by ref for the modifications to apply.
`DataStreamWriter` no longer returns a DeferedWriter, you need to take a copy of the writer at the point you want to make modifications and use the copy to overwrite data later.
`DataStreamWriter` is no longer disposable. If you use the allocating constructor you need to use `Allocator.Temp`, if you pass a `NativeArray<byte>` to the constructor the `NativeArray` owns the memory.
`DataStreamReader.Context` no longer exists, you need to pass the `DataStreamReader` itself by ref if you read in a different function.
The interface for network pipelines has been changed.
The interface for network interfaces has been changed.
## [0.2.3-preview.0] - 2019-12-12
### New features
### Changes
* Added reading and write methods for NativeString64 to DataStream.
### Fixes
### Upgrade guide
## [0.2.2-preview.2] - 2019-12-05
### New features
### Changes
* Added a stress test for parallel sending of data.
* Upgraded collections to 0.3.0.
### Fixes
* Fixed a race condition in IPCNetworkInterface.
* Changed NetworkEventQueue to use UnsafeList to get some type safety.
* Fixed an out-of-bounds access in the reliable sequenced pipeline.
* Fixed spelling and broken links in the documentation.
### Upgrade guide
## [0.2.1-preview.1] - 2019-11-28
### New features
### Changes
### Fixes
* Added missing bindings for Linux and Android.
### Upgrade guide
## [0.2.0-preview.4] - 2019-11-26
### New features
### Changes
* Added support for unquantized floats to `DataStream` class.
* Added `NetworkConfigParameter.maxFrameTimeMS` so you to allow longer frame times when debugging to prevent disconnections due to timeout.
* Allow "1.1.1.1:1234" strings when parsing the IP string in the NetworkEndPoint class, it will use the port part when it's present.
* Reliable pipeline now doesn't require parameters passed in (uses default window size of 32)
* Added Read/Write of ulong to `DataStream`.
* Made it possible to get connection state from the parallel NetworkDriver.
* Added `LengthInBits` to the `DataStreamWriter`.
### Fixes
* Do not push data events to disconnected connections. Fixes an error about resetting the queue with pending messages.
* Made the endian checks in `DataStream` compatible with latest version of burst.
### Upgrade guide
## [0.1.2-preview.1] - 2019-07-17
### New features
* Added a new *Ping-Multiplay* sample based on the *Ping* sample.
* Created to be the main sample for demonstrating Multiplay compatibility and best practices (SQP usage, IP binding, etc.).
* Contains both client and server code. Additional details in readme in `/Assets/Samples/Ping-Multiplay/`.
* **DedicatedServerConfig**: Added arguments for `-fps` and `-timeout`.
* **NetworkEndPoint**: Added a `TryParse()` method which returns false if parsing fails
* Note: The `Parse()` method returns a default IP / Endpoint if parsing fails, but a method that could report failure was needed for the Multiplay sample.
* **CommandLine**:
* Added a `HasArgument()` method which returns true if an argument is present.
* Added a `PrintArgsToLog()` method which is a simple way to print launch args to logs.
* Added a `TryUpdateVariableWithArgValue()` method which updates a ref var only if an arg was found and successfully parsed.
### Changes
* Deleted existing SQP code and added reference to SQP Package (now in staging).
* Removed SQP server usage from basic *Ping* sample.
* Note: The SQP server was only needed for Multiplay compatibility, so the addition of *Ping-Multiplay* allowed us to remove SQP from *Ping*.
### Fixes
* **DedicatedServerConfig**: Vsync is now disabled programmatically if requesting an FPS different from the current screen refresh rate.
### Upgrade guide
## [0.1.1-preview.1] - 2019-06-05
### New features
* Moved MatchMaking to a package and supporting code to a separate folder.
### Fixes
* Fixed an issue with the reliable pipeline not resending when completely idle.
### Upgrade guide
## [0.1.0-preview.1] - 2019-04-16
### New features
* Added network pipelines to enable processing of outgoing and incomming packets. The available pipeline stages are `ReliableSequencedPipelineStage` for reliable UDP messages and `SimulatorPipelineStage` for emulating network conditions such as high latency and packet loss. See [the pipeline documentation](com.unity.transport/Documentation~/pipelines-usage.md) for more information.
* Added reading and writing of packed signed and unsigned integers to `DataStream`. These new methods use huffman encoding to reduce the size of transfered data for small numbers.
### Changes
* Enable Burst compilation for most jobs.
* Made it possible to get the remote endpoint for a connection.
* Replacing EndPoint parsing with custom code to avoid having a dependency on `System.Net`.
* Change the ping sample command-line parameters for server to `-port` and `-query_port`.
* For matchmaking, use an Assignment object containing the `ConnectionString`, the `Roster`, and an `AssignmentError` string instead of just the `ConnectionString`.
### Fixes
* Fixed an issue with building iOS on Windows.
* Fixed inconsistent error handling between platforms when the network buffer is full.
### Upgrade guide
Unity 2019.1 is now required.
`BasicNetworkDriver` has been renamed to `GenericNetworkDriver` and a new `UdpNetworkDriver` helper class is also available.
`System.Net` EndPoints can no longer be used as addresses, use the new NetworkEndpoint struct instead.

7
Packages/com.unity.transport/CHANGELOG.md.meta


fileFormatVersion: 2
guid: 2f10a9a833f64e641a5733d67911a362
TextScriptImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

20
Packages/com.unity.transport/DESIGN.md


# Unity Transport Design Rules
## All features are optional
Unity transport is conceptually a thin layer on UDP adding a connection concept. All additional features on top of UDP + connection are optional, when not used they have zero performance or complexity overhead. If possible features are implemented as pipeline stages.
Features that have a limited audience are implemented outside the package - either in game code or other packages.
## Full control over processing time and when packets are sent/received
UTP is optimized for making games. It can be used without creating any additional threads - only using the JobSystem. The layer on top has full control over when the transport schedules jobs. The layer on top also has full control over when packets are sent on the wire. There are no internal buffers delaying messages (except possibly in pipelines).
There is generally no need to continuously poll for messages since incoming data needs to be read right before simulation starts, and we cannot start using new data in the middle of the simulation
## Written in HPC#
All code is jobified and burst compiled, there is no garbage collection. The transport does not spend any processing time outside setup on the main thread, and it allows the layer on top to not sync on the main thread.
## Follows the DOTS principles, is usable in DOTS Runtime and always compatible with the latest versions of the DOTS packages
There should always be a version compatible with the latest verions of the DOTS dependencies such as Unity Collections.
## The protocol is well defined and documented
Other implementations can communicate with games written with Unity Transport, without reverse engineering or reading the transport source code

7
Packages/com.unity.transport/DESIGN.md.meta


fileFormatVersion: 2
guid: 99b333ad37d614e42b1a4776de09e34e
TextScriptImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

20
Packages/com.unity.transport/Documentation~/TableOfContents.md


# Unity Transport manual
* **Introduction**
* [Unity Transport overview](index.md)
* [Installation guide](install.md)
* **Workflows**
* [Creating a minimal client and server](workflow-client-server.md)
* [Creating a jobified client and server](workflow-client-server-jobs.md)
* [Using pipelines](pipelines-usage.md)
* **Background information**
* [Understanding the Update flow](update-flow.md)
* [Understanding the Connection State Machine](connection-state-machine.md)
* [Event consumption](event-consumption.md)
* **Samples**
* [ClientBehaviour](samples/clientbehaviour.cs.md)
* [ServerBehaviour](samples/serverbehaviour.cs.md)
* [JobifiedClientBehaviour](samples/jobifiedclientbehaviour.cs.md)
* [JobifiedServerBehaviour](samples/jobifiedserverbehaviour.cs.md)
* [Source Project for Workflows](https://oc.unity3d.com/index.php/s/PHaNZP79Va2YOLT)

17
Packages/com.unity.transport/Documentation~/connection-state-machine.md


# Understanding the Connection State Machine
It's important to at least understand how transitions occur in the connection state machine so you make decisions depending on what triggered each state. And to understand the subtle differences depending if you are `Connecting` to another host or if you simply want to Listen for incoming connections. As you can see below the state machine for the `NetworkConnection` is pretty simple.
![ConnectionState](images/com.unity.transport.connection.png)
All connections start in `Disconnected` state.
- Depending what state the `NetworkDriver` is in, the `Listening (Passive)` state might be triggered. This is when the driver acts like a server listening for incoming connections and data requests. And secondly you could try to use the driver to connect to a remote endpoint and then we would invoke another flow of the state machine.
So to give a overview we have two standard scenarios. Either you listen for incoming connections or you use and outgoing connection to connect to someone else.
In our [client/server workflow](workflow-client-server.md) we use the ServerBehaviour to `Listen` and the ClientBehaviour to `Connect`.
[Back to table of contents](TableOfContents.md)

60
Packages/com.unity.transport/Documentation~/event-consumption.md


# Event consumption
There are currently 4 types of events supplied by the `NetworkDriver`
```c#
public enum Type
{
Empty = 0,
Data,
Connect,
Disconnect
}
```
As mentioned, there are a few subtle differences running the driver as a host or client. Mainly when it comes to consumption of events.
Both your client and your server loop will want to consume the events that are produced by the `NetworkDriver`. And you do so by either calling `PopEvent` on each `NetworkConnection` similar to how we did before.
```c#
DataStreamReader strm;
NetworkEvent.Type cmd;
while ((cmd = m_Connection.PopEvent(driver, out strm)) != NetworkEvent.Type.Empty)
; // Handle Event
```
You can try calling the `PopEventForConnection` on the `NetworkDriver` as we did in the ServerBehaviour example:
```c#
DataStreamReader strm;
NetworkEvent.Type cmd;
while ((cmd = m_Driver.PopEventForConnection(m_Connections[i], out strm)) != NetworkEvent.Type.Empty)
; // Handle Event
```
There is no real difference between these calls, both calls will do the same thing. Its just how you want to phrase yourself when writing the code.
And finally to receive a new `NetworkConnection` on the Driver while Listening you can call `Accept`
```c#
NetworkConnection c;
while ((c = m_Driver.Accept()) != default(NetworkConnection))
; // Handle Connection Event.
```
| Event | Description |
| ---------- | ------------------------------------------------------------ |
| Empty | The `Empty` event signals that there are no more messages in our event queue to handle this frame. |
| Data | The `Data` event signals that we have received data from a connected endpoint. |
| Connect | The `Connect` event signals that a new connection has been established.<br> **Note**: this event is only available if the `NetworkDriver` is **not** in the `Listening` state. |
| Disconnect | The `Disconnect` event is received if;<br> 1. `Disconnect` packet was received (calling `NetworkConnection::Disconnect` will trigger this.)<br> 2. A *socket timeout* occurred.<br> 3. Maximum connect attempts on the `NetworkConnection` exceeded. <br> **Note:** That if you call `Disconnect` on your `NetworkConnection` this will **NOT** trigger an `Disconnect` event on your local `NetworkDriver`. |
Looking at this table we see that there are 2 things that stand out.
- The first thing is that the `Connect` event is only available if the `NetworkDriver` is **NOT** `Listening`
- In order to receive any `Connect` events on a `NetworkDriver` that is in the `Listening` state we need to call the special function `Accept` just as we did in the *Creating a Server* section in the [Creating a minimal client and server](workflow-client-server.md) workflow page.
- The second thing to notice is that if you call `Disconnect` on a `NetworkConnection` this will not trigger an event inside your own driver.
[Back to table of contents](TableOfContents.md)

94
Packages/com.unity.transport/Documentation~/images/Pipeline-stages-diagram.png

之前 之后
宽度: 520  |  高度: 224  |  大小: 14 KiB

117
Packages/com.unity.transport/Documentation~/images/com.unity.transport.connection.png

之前 之后
宽度: 916  |  高度: 391  |  大小: 35 KiB

78
Packages/com.unity.transport/Documentation~/images/com.unity.transport.driver.png

之前 之后
宽度: 866  |  高度: 382  |  大小: 35 KiB

90
Packages/com.unity.transport/Documentation~/images/console-view.PNG

之前 之后
宽度: 650  |  高度: 244  |  大小: 25 KiB

16
Packages/com.unity.transport/Documentation~/images/game-object.PNG

之前 之后
宽度: 514  |  高度: 210  |  大小: 4.9 KiB

58
Packages/com.unity.transport/Documentation~/images/inspector.PNG

之前 之后
宽度: 605  |  高度: 275  |  大小: 24 KiB

56
Packages/com.unity.transport/Documentation~/images/layercake.png

之前 之后
宽度: 849  |  高度: 838  |  大小: 24 KiB

44
Packages/com.unity.transport/Documentation~/index.md


# About Unity.Networking.Transport
Use the `com.unity.transport` package to add multiplayer / network features to your project.
## Overview
![Transport Overview](images/layercake.png)
# Installing Unity.Networking.Transport
For installation instructions on the `com.unity.transport` package, follow the [installation guide](install.md).
# Using Unity.Networking.Transport
To learn how to use the `com.unity.transport` package in your own project, read the [manual](TableOfContents.md).
> **Note**: Once you have installed the package, it is recommended that you read through the client and server workflows first.
# Technical details
## Requirements
This version of `com.unity.transport` is compatible with the following versions of the Unity Editor:
* 2020.1.2 and later
* All platforms supported by Unity are supported by the transport
## Package contents
The following table shows the package folder contents.
|Location|Description|
|---|---|
|`Documentation`|Contains manual part of the documentation (script reference is inline using xmldoc).|
|`Runtime`|Contains the implementation.|
|`Tests`|Contains all the tests.|
## Document revision history
|Date|Reason|
|---|---|
|Oct 22, 2018|Documentation reorganised and proofread by Technical Writer.|
|Jul 18, 2018|Document created. Matches package version 0.1.0.|

25
Packages/com.unity.transport/Documentation~/install.md


# Installing com.unity.transport
1. Open the Unity Editor.
2. Create a new Project.
1. Open the Unity Package Manager by navigating to **Window** > **Package Manager** on Unity’s main menu.
1. Click Add in the status bar.
You have the following options:
- Add package from disk...
- Add package from tarball...
- Add package from git URL...
1. Select **Add package from git URL...**
1. Enter the Git URL to the release package: For the latest version use this link:
```html
com.unity.transport@0.6.0-preview.7
```
> Note: If you encounter errors, please [report an issue](https://unity3d.com/unity/qa/bug-reporting) with the Unity Bug Reporter or on the forums.
[Back to table of contents](TableOfContents.md)

136
Packages/com.unity.transport/Documentation~/pipelines-usage.md


# Using pipelines
Pipelines are a feature which offers layers of functionality on top of the default socket implementation behaviour. In the case of the UDP socket this makes it possible to have additional functionality on top of the standard unreliable datagram, such as Quality of Service features like sequencing, reliability, fragmentation and so on. This could work with any type of socket interface which has been implemented for use in the driver.
## How it works
The way it works is that you can add any number of pipeline stages to your transport driver. So when you send a packet it will go to the first stage, then the next and so on until it's sent on the wire. On the receiving side the stages are then processed in reverse order, so the packet is correctly "unpacked" by the stages.
For example the first stage might compress a packet and a second stage could add a sequence number (just the packets header). When receiving the packet is first passed through the sequence stage and then decompressed. The sequence stage could drop the packet if it's out of order in which case it leaves the pipeline and doesn't continue to the decompression.
![PipelineStagesDiagram](images/Pipeline-stages-diagram.png)
The pipeline stages are gathered together in a collection. This is the interface between the pipeline processor in the driver to the pipeline stages you might be using. Here the pipeline stages are initialized and so on. There is a default collection provided in the driver which has all the built in pipeline stages already configured. It's possible to just use that and use a custom collection if you have your own pipeline stage you need to add to the collection.
## Example usage
The example below shows how the driver can create a new pipeline with 2 pipeline stages present (sequencer and simulator). The driver is created with the default pipeline collection and the pipeline parameters can be passed to the collection there. Multiple pipeline parameters can be passed in this way and the collection itself takes care of assigning them to the right pipeline stage.
When sending packets the pipeline can then be specified as a parameter, so the packet is passed through it, it's then automatically processed the right way on the receiving end. It's therefore important both the client and server set up their pipelines in exactly the same way. One exception is with pipeline stages which do not manipulate the packet payload or header, these do not need to be symmetric. For example the simulator stage here is only keeping packets on hold for a certain time and then releases them unmodified or drops them altogether, it can therefore be set up to only run on the client.
```c#
using Unity.Collections;
using Unity.Networking.Transport;
using Unity.Networking.Transport.Utilities;
public class Client
{
NetworkDriver m_DriverHandle;
NetworkPipeline m_Pipeline;
const int k_PacketSize = 256;
// Connection establishment omitted
public NetworkConnection m_ConnectionToServer;
public void Configure()
{
// Driver can be used as normal
m_DriverHandle = NetworkDriver.Create(new SimulatorUtility.Parameters {MaxPacketSize = k_PacketSize, MaxPacketCount = 30, PacketDelayMs = 100});
// Driver now knows about this pipeline and can explicitly be asked to send packets through it (by default it sends directly)
m_Pipeline = m_DriverHandle.CreatePipeline(typeof(UnreliableSequencedPipelineStage), typeof(SimulatorPipelineStage));
}
public void SendMessage(NativeArray<byte> someData)
{
// Send using the pipeline created in Configure()
var writer = m_DriverHandle.BeginSend(m_Pipeline, m_ConnectionToServer);
writer.WriteBytes(someData);
m_DriverHandle.EndSend(writer);
}
}
```
## Simulator Pipeline
The simulator pipeline stage could be added on either the client or server to simulate bad network conditions. It's best to add it as the last stage in the pipeline, then it will either drop the packet or add a delay right before it would go on the wire.
### Using the simulator
Nothing needs to be done after configuring the pipline, it can be set up like this when the driver is created:
```c#
m_DriverHandle = NetworkDriver.Create(new SimulatorUtility.Parameters {MaxPacketSize = NetworkParameterConstants.MTU, MaxPacketCount = 30, PacketDelayMs = 25, PacketDropPercentage = 10});
m_Pipeline = m_DriverHandle.CreatePipeline(typeof(SimulatorPipelineStage));
```
This would create a simulator pipeline stage which can delay up to 30 packets of a size up to the MTU size constant. Each packets gets a 25 ms delay applied and 10% of packets received will be dropped. SimulatorPipelineStage processes packets on the Receive stage of the pipeline.
### Debug information
To get information about internal state in the simulator you can check the SimulatorUtility.Context structure, stored in the pipeline stage shared buffer. This tracks how many packets have been set, PacketCount, and how many of those were dropped, PacketDropCount. ReadyPackets and WaitingPackets shows what packets are now ready to be sent (delay time expired) and how many are stored by the simulator. StatsTime and NextPacketTime show the last time the simulator ran and when the next packet is due to be released.
```c#
public unsafe void DumpSimulatorStatistics()
{
var simulatorStageId = NetworkPipelineStageCollection.GetStageId(typeof(SimulatorPipelineStage));
driver.GetPipelineBuffers(pipeline, simulatorStageId, connection[0], out var receiveBuffer, out var sendBuffer, out var sharedBuffer);
var simCtx = (SimulatorUtility.Context*)sharedBuffer.GetUnsafeReadOnlyPtr();
UnityEngine.Debug.Log("Simulator stats\n" +
"PacketCount: " + simCtx->PacketCount + "\n" +
"PacketDropCount: " + simCtx->PacketDropCount + "\n" +
"ReadyPackets: " + simCtx->ReadyPackets + "\n" +
"WaitingPackets: " + simCtx->WaitingPackets + "\n" +
"NextPacketTime: " + simCtx->NextPacketTime + "\n" +
"StatsTime: " + simCtx->StatsTime);
}
```
## Reliability pipeline
The reliability pipeline makes sure all packets are delivered and in order. It adds header information to all packets sent and tracks their state internally to make this happen. Whenever a packet is sent, it is given a sequence ID and then stored in the send processing buffer along with timing information (send time). The packet is then sent with that sequence ID added to the packet header. All packet headers also include information about what remote sequence IDs have been seen, so the receiver of the packet can know the delivery state of the packets it sent. This way there is always information about delivery state flowing between the two endpoints who make up a connection. If a certain time interval expires without an acknowledgement for a particular sequence ID the packet is resent and the timers reset.
Reliability packet header looks like this:
```c#
public struct PacketHeader
{
public ushort Type;
public ushort ProcessingTime;
public ushort SequenceId;
public ushort AckedSequenceId;
public uint AckMask;
}
```
Where the type could be either a payload or ack packet, which is an empty packet with only this header. Processing time is time which passed between receiving a particular sequence ID and sending an acknowledgement for it, this is used for Round Trip Time (RTT) calculations. Then there is the sequence ID of this packet (not used in ack packets) and what remote sequence ID is being acknowledged. The AckMask is the history of acknowledgements we know about (up to the window size) so you can acknowledge multiple packets in a single header.
The ack packet type is used when a certain amount of time has passed and nothing has been sent to the remote endpoint. We then check if we need to send a pending acknowledgement to him, or else the last packet will be assumed lost and a resend will take place. If a message is sent on every update call these kinds of packets never need to be sent.
### Using the reliability pipeline
```c#
m_ServerDriver = NetworkDriver.Create(new ReliableUtility.Parameters { WindowSize = 32 });
m_Pipeline = m_ServerDriver.CreatePipeline(typeof(ReliableSequencedPipelineStage));
```
This would create a pipeline with just the reliability pipeline stage present, and initialize it to a window size of 32 (so it can keep track of 32 reliable packets at a one time). The maximum value for this is 32.
Because only 32 packets can be tracked at a time there can't be more than 32 packets in flight at any one time, trying to send a 33rd packet will result in an error and it will not be reliable (no guarantee of delivery). It's possible to check for such errors by checking the error code in the reliability internal state:
```c#
// Get a reference to the internal state or shared context of the reliability
var reliableStageId = NetworkPipelineStageCollection.GetStageId(typeof(ReliableSequencedPipelineStage));
m_ServerDriver.GetPipelineBuffers(serverPipe, reliableStageId, serverToClient, out var tmpReceiveBuffer, out var tmpSendBuffer, out var serverReliableBuffer);
var serverReliableCtx = (ReliableUtility.SharedContext*) serverReliableBuffer.GetUnsafePtr();
var strm = m_ServerDriver.BeginSend(serverPipe, serverToClient);
m_ServerDriver.EndSend(strm);
if (serverReliableCtx->errorCode != 0)
{
// Failed to send with reliability, error code will be ReliableUtility.ErrorCodes.OutgoingQueueIsFull if no buffer space is left to store the packet
}
```
It's possible to run into the OutgoingQueueIsFull error when packets are being sent too frequently for the latency and quality of the connection. High packet loss means packets need to stay for multiple RTTs in the queue and if the RTT is high then that time can end up being longer than the send rate + window size permit. For example with 60 packets sent per second a packet will go out every 16 ms, if the RTT is 250 ms about 16 packets will be in the queue at any one time. With a packet drop the total time will go up to 500 ms and the packet will be in the last slot when it's finally freed.
It best suited to use the reliability pipeline for event type messages (door opened), Remote Procedure Calls (RPCs) or slow frequency messages like chat.
### Debug information
More internal state information can be gathered using GetPipelineBuffers as shown above. The soaker test gathers a lot of statistics as seen in the SoakCommon.cs file, in the GatherReliabilityStats function. There it checks what the RTT used internally is determined to be and how many packets have been sent, received, dropped, duplicated and resent.

68
Packages/com.unity.transport/Documentation~/samples/clientbehaviour.cs.md


```c#
using UnityEngine;
using Unity.Networking.Transport;
public class ClientBehaviour : MonoBehaviour
{
public NetworkDriver m_Driver;
public NetworkConnection m_Connection;
public bool m_Done;
void Start ()
{
m_Driver = NetworkDriver.Create();
m_Connection = default(NetworkConnection);
var endpoint = NetworkEndPoint.LoopbackIpv4;
endpoint.Port = 9000;
m_Connection = m_Driver.Connect(endpoint);
}
public void OnDestroy()
{
m_Driver.Dispose();
}
void Update()
{
m_Driver.ScheduleUpdate().Complete();
if (!m_Connection.IsCreated)
{
if (!m_Done)
Debug.Log("Something went wrong during connect");
return;
}
DataStreamReader stream;
NetworkEvent.Type cmd;
while ((cmd = m_Connection.PopEvent(m_Driver, out stream)) != NetworkEvent.Type.Empty)
{
if (cmd == NetworkEvent.Type.Connect)
{
Debug.Log("We are now connected to the server");
uint value = 1;
var writer = m_Driver.BeginSend(m_Connection);
writer.WriteUInt(value);
m_Driver.EndSend(writer);
}
else if (cmd == NetworkEvent.Type.Data)
{
uint value = stream.ReadUInt();
Debug.Log("Got the value = " + value + " back from the server");
m_Done = true;
m_Connection.Disconnect(m_Driver);
m_Connection = default(NetworkConnection);
}
else if (cmd == NetworkEvent.Type.Disconnect)
{
Debug.Log("Client got disconnected from server");
m_Connection = default(NetworkConnection);
}
}
}
}
```

95
Packages/com.unity.transport/Documentation~/samples/jobifiedclientbehaviour.cs.md


```c#
using UnityEngine;
using Unity.Jobs;
using Unity.Collections;
using Unity.Networking.Transport;
struct ClientUpdateJob : IJob
{
public NetworkDriver driver;
public NativeArray<NetworkConnection> connection;
public NativeArray<byte> done;
public void Execute()
{
if (!connection[0].IsCreated)
{
if (done[0] != 1)
Debug.Log("Something went wrong during connect");
return;
}
DataStreamReader stream;
NetworkEvent.Type cmd;
while ((cmd = connection[0].PopEvent(driver, out stream)) != NetworkEvent.Type.Empty)
{
if (cmd == NetworkEvent.Type.Connect)
{
Debug.Log("We are now connected to the server");
uint value = 1;
var writer = driver.BeginSend(connection[0]);
writer.WriteUInt(value);
driver.EndSend(writer);
}
else if (cmd == NetworkEvent.Type.Data)
{
uint value = stream.ReadUInt();
Debug.Log("Got the value = " + value + " back from the server");
// And finally change the `done[0]` to `1`
done[0] = 1;
connection[0].Disconnect(driver);
connection[0] = default(NetworkConnection);
}
else if (cmd == NetworkEvent.Type.Disconnect)
{
Debug.Log("Client got disconnected from server");
connection[0] = default(NetworkConnection);
}
}
}
}
public class JobifiedClientBehaviour : MonoBehaviour
{
public NetworkDriver m_Driver;
public NativeArray<NetworkConnection> m_Connection;
public NativeArray<byte> m_Done;
public JobHandle ClientJobHandle;
void Start ()
{
m_Driver = NetworkDriver.Create();
m_Connection = new NativeArray<NetworkConnection>(1, Allocator.Persistent);
m_Done = new NativeArray<byte>(1, Allocator.Persistent);
var endpoint = NetworkEndPoint.LoopbackIpv4;
endpoint.Port = 9000;
m_Connection[0] = m_Driver.Connect(endpoint);
}
public void OnDestroy()
{
ClientJobHandle.Complete();
m_Connection.Dispose();
m_Driver.Dispose();
m_Done.Dispose();
}
void Update()
{
ClientJobHandle.Complete();
var job = new ClientUpdateJob
{
driver = m_Driver,
connection = m_Connection,
done = m_Done
};
ClientJobHandle = m_Driver.ScheduleUpdate();
ClientJobHandle = job.Schedule(ClientJobHandle);
}
}
```

115
Packages/com.unity.transport/Documentation~/samples/jobifiedserverbehaviour.cs.md


```c#
using UnityEngine;
using UnityEngine.Assertions;
using Unity.Jobs;
using Unity.Collections;
using Unity.Networking.Transport;
struct ServerUpdateConnectionsJob : IJob
{
public NetworkDriver driver;
public NativeList<NetworkConnection> connections;
public void Execute()
{
// CleanUpConnections
for (int i = 0; i < connections.Length; i++)
{
if (!connections[i].IsCreated)
{
connections.RemoveAtSwapBack(i);
--i;
}
}
// AcceptNewConnections
NetworkConnection c;
while ((c = driver.Accept()) != default(NetworkConnection))
{
connections.Add(c);
Debug.Log("Accepted a connection");
}
}
}
struct ServerUpdateJob : IJobParallelForDefer
{
public NetworkDriver.Concurrent driver;
public NativeArray<NetworkConnection> connections;
public void Execute(int index)
{
DataStreamReader stream;
Assert.IsTrue(connections[index].IsCreated);
NetworkEvent.Type cmd;
while ((cmd = driver.PopEventForConnection(connections[index], out stream)) != NetworkEvent.Type.Empty)
{
if (cmd == NetworkEvent.Type.Data)
{
uint number = stream.ReadUInt();
Debug.Log("Got " + number + " from the Client adding + 2 to it.");
number +=2;
var writer = driver.BeginSend(connections[index]);
writer.WriteUInt(number);
driver.EndSend(writer);
}
else if (cmd == NetworkEvent.Type.Disconnect)
{
Debug.Log("Client disconnected from server");
connections[index] = default(NetworkConnection);
}
}
}
}
public class JobifiedServerBehaviour : MonoBehaviour
{
public NetworkDriver m_Driver;
public NativeList<NetworkConnection> m_Connections;
private JobHandle ServerJobHandle;
void Start ()
{
m_Connections = new NativeList<NetworkConnection>(16, Allocator.Persistent);
m_Driver = NetworkDriver.Create();
var endpoint = NetworkEndPoint.AnyIpv4;
endpoint.Port = 9000;
if (m_Driver.Bind(endpoint) != 0)
Debug.Log("Failed to bind to port 9000");
else
m_Driver.Listen();
}
public void OnDestroy()
{
// Make sure we run our jobs to completion before exiting.
ServerJobHandle.Complete();
m_Connections.Dispose();
m_Driver.Dispose();
}
void Update ()
{
ServerJobHandle.Complete();
var connectionJob = new ServerUpdateConnectionsJob
{
driver = m_Driver,
connections = m_Connections
};
var serverUpdateJob = new ServerUpdateJob
{
driver = m_Driver.ToConcurrent(),
connections = m_Connections.AsDeferredJobArray()
};
ServerJobHandle = m_Driver.ScheduleUpdate();
ServerJobHandle = connectionJob.Schedule(ServerJobHandle);
ServerJobHandle = serverUpdateJob.Schedule(m_Connections, 1, ServerJobHandle);
}
}
```

81
Packages/com.unity.transport/Documentation~/samples/serverbehaviour.cs.md


```c#
using UnityEngine;
using UnityEngine.Assertions;
using Unity.Collections;
using Unity.Networking.Transport;
public class ServerBehaviour : MonoBehaviour
{
public NetworkDriver m_Driver;
private NativeList<NetworkConnection> m_Connections;
void Start ()
{
m_Driver = NetworkDriver.Create();
var endpoint = NetworkEndPoint.AnyIpv4;
endpoint.Port = 9000;
if (m_Driver.Bind(endpoint) != 0)
Debug.Log("Failed to bind to port 9000");
else
m_Driver.Listen();
m_Connections = new NativeList<NetworkConnection>(16, Allocator.Persistent);
}
public void OnDestroy()
{
m_Driver.Dispose();
m_Connections.Dispose();
}
void Update ()
{
m_Driver.ScheduleUpdate().Complete();
// CleanUpConnections
for (int i = 0; i < m_Connections.Length; i++)
{
if (!m_Connections[i].IsCreated)
{
m_Connections.RemoveAtSwapBack(i);
--i;
}
}
// AcceptNewConnections
NetworkConnection c;
while ((c = m_Driver.Accept()) != default(NetworkConnection))
{
m_Connections.Add(c);
Debug.Log("Accepted a connection");
}
DataStreamReader stream;
for (int i = 0; i < m_Connections.Length; i++)
{
Assert.IsTrue(m_Connections[i].IsCreated);
NetworkEvent.Type cmd;
while ((cmd = m_Driver.PopEventForConnection(m_Connections[i], out stream)) != NetworkEvent.Type.Empty)
{
if (cmd == NetworkEvent.Type.Data)
{
uint number = stream.ReadUInt();
Debug.Log("Got " + number + " from the Client adding + 2 to it.");
number +=2;
var writer = m_Driver.BeginSend(NetworkPipeline.Null, m_Connections[i]);
writer.WriteUInt(number);
m_Driver.EndSend(writer);
}
else if (cmd == NetworkEvent.Type.Disconnect)
{
Debug.Log("Client disconnected from server");
m_Connections[i] = default(NetworkConnection);
}
}
}
}
}
```

20
Packages/com.unity.transport/Documentation~/update-flow.md


# Understanding the Update flow
We call the driver's `ScheduleUpdate` method on every frame. This is so we can update the state of each connection we have active to make sure we read all data that we have received and finally produce events that the user can react to using `PopEvent` and `PopEventForConnection`.
The `Update` loop of the driver is really simple, it might look daunting at first glance but if you strip out all of the job system dependencies you will see we only do three things here:
![FlowchartUpdate](images/com.unity.transport.driver.png)
1. We start by calling our `InternalUpdate`, this call is where we clean up any stale connections, we clear our buffers and we finally check timeouts on our connections.
2. The second thing in the chain is running the `ReceiveJob` for reading and parsing the data from the socket.
3. Finally for each new message we receive on the socket we call a `AppendPacket` function that parses each packet received and either creates an event for it or discards it.
That's it, we clean up, we populate our buffers and we push new events.
You could almost view the `NetworkDriver` as a Control System for the State Machine handling
`NetworkConnection`.
[Back to table of contents](TableOfContents.md)

416
Packages/com.unity.transport/Documentation~/workflow-client-server-jobs.md


# Jobyfiying our Example
In the workflow [Creating a minimal client and server](workflow-client-server.md), our client should look like this [code example](samples/clientbehaviour.cs.md).
> **Note**: It is recommended, before reading this workflow, to refresh your memory on how the [C# Job System](https://docs.unity3d.com/Manual/JobSystem.html) works.
## Creating a Jobified Client
Start by creating a client job to handle your inputs from the network. As you only handle one client at a time we will use the [IJob](https://docs.unity3d.com/ScriptReference/Unity.Jobs.IJob.html) as our job type. You need to pass the driver and the connection to the job so you can handle updates within the `Execute` method of the job.
```c#
struct ClientUpdateJob: IJob
{
public NetworkDriver driver;
public NativeArray<NetworkConnection> connection;
public NativeArray<byte> done;
public void Execute() { ... }
}
```
> **Note**: The data inside the ClientUpdateJob is **copied**. If you want to use the data after the job is completed, you need to have your data in a shared container, such as a [NativeContainer](https://docs.unity3d.com/Manual/JobSystemNativeContainer.html).
Since you might want to update the `NetworkConnection` and the `done` variables inside your job (we might receive a disconnect message), you need to make sure you can share the data between the job and the caller. In this case, you can use a [NativeArray](https://docs.unity3d.com/ScriptReference/Unity.Collections.NativeArray_1.html).
> Note: You can only use [blittable types](https://docs.microsoft.com/en-us/dotnet/framework/interop/blittable-and-non-blittable-types) in a `NativeContainer`. In this case, instead of a `bool` you need to use a `byte`, as its a blittable type.
In your `Execute` method, move over your code from the `Update` method that you have already in place from [_ClientBehaviour.cs_](samples/clientbehaviour.cs.md) and you are done.
You need to change any call to `m_Connection` to `connection[0]` to refer to the first element inside your `NativeArray`. The same goes for your `done` variable, you need to call `done[0]` when you refer to the `done` variable. See the code below:
```c#
public void Execute()
{
if (!connection[0].IsCreated)
{
// Remember that its not a bool anymore.
if (done[0] != 1)
Debug.Log("Something went wrong during connect");
return;
}
DataStreamReader stream;
NetworkEvent.Type cmd;
while ((cmd = connection[0].PopEvent(driver, out stream)) != NetworkEvent.Type.Empty)
{
if (cmd == NetworkEvent.Type.Connect)
{
Debug.Log("We are now connected to the server");
var value = 1;
var writer = driver.BeginSend(connection[0]);
writer.WriteUInt(value);
driver.EndSend(writer);
}
else if (cmd == NetworkEvent.Type.Data)
{
uint value = stream.ReadUInt();
Debug.Log("Got the value = " + value + " back from the server");
// And finally change the `done[0]` to `1`
done[0] = 1;
connection[0].Disconnect(driver);
connection[0] = default(NetworkConnection);
}
else if (cmd == NetworkEvent.Type.Disconnect)
{
Debug.Log("Client got disconnected from server");
connection[0] = default(NetworkConnection);
}
}
}
```
### Updating the client MonoBehaviour
When you have a job, you need to make sure that you can execute the job. To do this, you need to make some changes to your ClientBehaviour:
```c#
public class JobifiedClientBehaviour : MonoBehaviour
{
public NetworkDriver m_Driver;
public NativeArray<NetworkConnection> m_Connection;
public NativeArray<byte> m_Done;
public JobHandle ClientJobHandle;
public void OnDestroy() { ... }
public void Start() { ... }
public void Update() { ... }
}
```
Both `m_Done` and `m_Connection` in the code above, have been changed to type `NativeArray`. We also added a [JobHandle](https://docs.unity3d.com/Manual/JobSystemJobDependencies.html) so you can track your ongoing jobs.
#### Start method
```c#
void Start () {
m_Driver = NetworkDriver.Create();
m_Connection = new NativeArray<NetworkConnection>(1, Allocator.Persistent);
m_Done = new NativeArray<byte>(1, Allocator.Persistent);
var endpoint = NetworkEndPoint.LoopbackIpv4;
endpoint.Port = 9000;
m_Connection[0] = m_Driver.Connect(endpoint);
}
```
The `Start` method looks pretty similar to before, the major update here is to make sure you create your `NativeArray`.
#### OnDestroy method
```c#
public void OnDestroy()
{
ClientJobHandle.Complete();
m_Connection.Dispose();
m_Driver.Dispose();
m_Done.Dispose();
}
```
Same goes for the `OnDestroy` method. Make sure you dispose all your `NativeArray` objects. A new addition is the `ClientJobHandle.Complete()` call. This makes sure your jobs complete before cleaning up and destroying the data they might be using.
#### Client Update loop
Finally you need to update your core game loop:
```c#
void Update()
{
ClientJobHandle.Complete();
...
}
```
You want to make sure (again) that before you start running your new frame, we check that the last frame is complete. Instead of calling `m_Driver.ScheduleUpdate().Complete()`, use the `JobHandle` and call `ClientJobHandle.Complete()`.
To chain your job, start by creating a job struct:
```c#
var job = new ClientUpdateJob
{
driver = m_Driver,
connection = m_Connection,
done = m_Done
};
```
To schedule the job, you need to pass the `JobHandle` dependency that was returned from the `m_Driver.ScheduleUpdate` call in the `Schedule` function of your `IJob`. Start by invoking the `m_Driver.ScheduleUpdate` without a call to `Complete`, and pass the returning `JobHandle` to your saved `ClientJobHandle`.
```c#
ClientJobHandle = m_Driver.ScheduleUpdate();
ClientJobHandle = job.Schedule(ClientJobHandle);
```
As you can see in the code above, you pass the returned `ClientJobHandle` to your own job, returning a newly updated `ClientJobHandle`.
You now have a *JobifiedClientBehaviour* that looks like [this](samples/jobifiedclientbehaviour.cs.md).
## Creating a Jobified Server
The server side is pretty similar to start with. You create the jobs you need and then you update the usage code.
Consider this: you know that the `NetworkDriver` has a `ScheduleUpdate` method that returns a `JobHandle`. The job as you saw above populates the internal buffers of the `NetworkDriver` and lets us call `PopEvent`/`PopEventForConnection` method. What if you create a job that will fan out and run the processing code for all connected clients in parallel? If you look at the documentation for the C# Job System, you can see that there is a [IJobParallelFor](https://docs.unity3d.com/Manual/JobSystemParallelForJobs.html) job type that can handle this scenario
> Note: Because we do not now how many requests we might receive or how many connections we might need to process at any one time, there is another `IJobPrarallelFor` job type that we can use namely: `IJobParallelForDefer`
```c#
struct ServerUpdateJob : IJobParallelForDefer
{
public void Execute(int index)
{
throw new System.NotImplementedException();
}
}
```
However, we can’t run all of our code in parallel.
In the client example above, we started off by cleaning up closed connections and accepting new ones, this can't be done in parallel. You need to create a connection job as well;
Start by creating a `ServerUpdateConnectionJob` job. You know you need to pass both the `driver` and `connections` to our connection job. Then you want your job to "Clean up connections" and "Accept new connections":
```c#
struct ServerUpdateConnectionsJob : IJob
{
public NetworkDriver driver;
public NativeList<NetworkConnection> connections;
public void Execute()
{
// Clean up connections
for (int i = 0; i < connections.Length; i++)
{
if (!connections[i].IsCreated)
{
connections.RemoveAtSwapBack(i);
--i;
}
}
// Accept new connections
NetworkConnection c;
while ((c = driver.Accept()) != default(NetworkConnection))
{
connections.Add(c);
Debug.Log("Accepted a connection");
}
}
}
```
The code above should be almost identical to your old non-jobified code.
With the `ServerUpdateConnectionsJob` done, lets look at how to implement the `ServerUpdateJob` using `IJobParallelFor`.
```c#
struct ServerUpdateJob : IJobParallelForDefer
{
public NetworkDriver.Concurrent driver;
public NativeArray<NetworkConnection> connections;
public void Execute(int index)
{
...
}
}
```
There are **two** major differences here compared with our other `job`. First off we are using the `NetworkDriver.Concurrent` type, this allows you to call the `NetworkDriver` from multiple threads, precisely what you need for the `IParallelForJobDefer`. Secondly, you are now passing a `NativeArray` of type `NetworkConnection` instead of a `NativeList`. The `IParallelForJobDefer` does not accept any other `Unity.Collections` type than a `NativeArray` (more on this later).
### Execute method
```c#
public void Execute(int index)
{
DataStreamReader stream;
Assert.IsTrue(connections[index].IsCreated);
NetworkEvent.Type cmd;
while ((cmd = driver.PopEventForConnection(connections[index], out stream)) !=
NetworkEvent.Type.Empty)
{
if (cmd == NetworkEvent.Type.Data)
{
uint number = stream.ReadUInt();
Debug.Log("Got " + number + " from the Client adding + 2 to it.");
number +=2;
var writer = driver.BeginSend(connections[index]);
writer.WriteUInt(number);
driver.EndSend(writer);
}
else if (cmd == NetworkEvent.Type.Disconnect)
{
Debug.Log("Client disconnected from server");
connections[index] = default(NetworkConnection);
}
}
}
```
The only difference between our old code and our jobified example is that you remove the top level `for` loop that you had in your code: `for (int i = 0; i < m_Connections.Length; i++)`. This is removed because the `Execute` function on this job will be called for each connection, and the `index` to that a available connection will be passed in. You can see this `index` in use in the top level `while` loop:
```
while ((cmd = driver.PopEventForConnection(connections[index], out stream)) != NetworkEvent.Type.Empty`
```
> **Note**: You are using the `index` that was passed into your `Execute` method to iterate over all the `connections`.
You now have 2 jobs:
- The first job is to update your connection status.
- Add new connections
- Remove old / stale connections
- The second job is to parse `NetworkEvent` on each connected client.
### Updating the server MonoBehaviour
With this we can now go back to our [MonoBehaviour](https://docs.unity3d.com/ScriptReference/MonoBehaviour.html) and start updating the server.
```c#
public class JobifiedServerBehaviour : MonoBehaviour
{
public NetworkDriver m_Driver;
public NativeList<NetworkConnection> m_Connections;
private JobHandle ServerJobHandle;
void Start () { ... }
public void OnDestroy() { ... }
void Update () { ... }
}
```
The only change made in your variable declaration is that you have once again added a `JobHandle` so you can keep track of your ongoing jobs.
#### Start method
You do not need to change your `Start` method as it should look the same:
```c#
void Start ()
{
m_Connections = new NativeList<NetworkConnection>(16, Allocator.Persistent);
m_Driver = new NetworkDriver.Create();
var endpoint = NetworkEndPoint.AnyIpv4;
endpoint.Port = 9000;
if (m_Driver.Bind(endpoint) != 0)
Debug.Log("Failed to bind to port 9000");
else
m_Driver.Listen();
}
```
#### OnDestroy method
You need to remember to call `ServerJobHandle.Complete` in your `OnDestroy` method so you can properly clean up after yourself:
```c#
public void OnDestroy()
{
// Make sure we run our jobs to completion before exiting.
ServerJobHandle.Complete();
m_Connections.Dispose();
m_Driver.Dispose();
}
```
#### Server update loop
In your `Update` method, call `Complete`on the `JobHandle`. This will force the jobs to complete before we start a new frame:
```c#
void Update ()
{
ServerJobHandle.Complete();
var connectionJob = new ServerUpdateConnectionsJob
{
driver = m_Driver,
connections = m_Connections
};
var serverUpdateJob = new ServerUpdateJob
{
driver = m_Driver.ToConcurrent(),
connections = m_Connections.ToDeferredJobArray()
};
ServerJobHandle = m_Driver.ScheduleUpdate();
ServerJobHandle = connectionJob.Schedule(ServerJobHandle);
ServerJobHandle = serverUpdateJob.Schedule(m_Connections, 1, ServerJobHandle);
}
```
To chain the jobs, you want to following to happen:
`NetworkDriver.Update` -> `ServerUpdateConnectionsJob` -> `ServerUpdateJob`.
Start by populating your `ServerUpdateConnectionsJob`:
```c#
var connectionJob = new ServerUpdateConnectionsJob
{
driver = m_Driver,
connections = m_Connections
};
```
Then create your `ServerUpdateJob`. Remember to use the `ToConcurrent` call on your driver, to make sure you are using a concurrent driver for the `IParallelForJobDefer`:
```c#
var serverUpdateJob = new ServerUpdateJob
{
driver = m_Driver.ToConcurrent(),
connections = m_Connections.ToDeferredJobArray()
};
```
The final step is to make sure the `NativeArray` is populated to the correct size. This
can be done using a `DeferredJobArray`. It makes sure that, when the job is executed, that the connections array is populated with the correct number of items that you have in your list. Since we will run the `ServerUpdateConnectionsJob` first, this might change the **size** of the list.
Create your job chain and call `Scheduele` as follows:
```
ServerJobHandle = m_Driver.ScheduleUpdate();
ServerJobHandle = connectionJob.Schedule(ServerJobHandle);
ServerJobHandle = serverUpdateJob.Schedule(m_Connections, 1, ServerJobHandle);
```
In the code above, you have:
- Scheduled the `NetworkDriver` job.
- Add the `JobHandle` returned as a dependency on the `ServerUpdateConnectionJob`.
- The final link in the chain is the `ServerUpdateJob` that needs to run after the `ServerUpdateConnectionsJob`. In this line of code, there is a trick to invoke the `IJobParallelForDeferExtensions`. As you can see, `m_Connections` `NativeList` is passed to the `Schedule` method, this updates the count of connections before starting the job. It's here that it will fan out and run all the `ServerUpdateConnectionJobs` in parallel.
> **Note**: If you are having trouble with the `serverUpdateJob.Schedule(m_Connections, 1, ServerJobHandle);` call, you might need to add `"com.unity.jobs": "0.0.7-preview.5"` to your `manifest.json` file, inside the _/Packages_ folder.
You should now have a fully functional [jobified server](samples/jobifiedserverbehaviour.cs.md).
You can download all examples from [here](https://oc.unity3d.com/index.php/s/PHaNZP79Va2YOLT).
[Back to table of contents](TableOfContents.md)

408
Packages/com.unity.transport/Documentation~/workflow-client-server.md


# Workflow: Creating a minimal client and server
## Table of contents
* [Introduction](#introduction)
* [Creating a Server](#creating-a-server)
* [Creating a Client](#creating-a-client)
## Introduction
This workflow helps you create a sample project that highlights how to use the `com.unity.transport` API to:
- Configure
- Connect
- Send data
- Receive data
- Close a connection
- Disconnect
- Timeout a connection
> **Note**: This workflow covers all aspects of the Unity.Networking.Transport package.
The goal is to make a remote `add` function. The flow will be: a client connects to the server, and sends a number, this number is then received by the server that adds another number to it and sends it back to the client. The client, upon receiving the number, disconnects and quits.
Using the `NetworkDriver` to write client and server code is pretty similar between clients and servers, there are a few subtle differences that you can see demonstrated below.
## Creating a Server
A server is an endpoint that listens for incoming connection requests and sends and receives messages.
Start by creating a C# script in the Unity Editor.
Filename: [_Assets\Scripts\ServerBehaviour.cs_](samples/serverbehaviour.cs.md)
```c#
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
public class ServerBehaviour : MonoBehaviour {
// Use this for initialization
void Start () {
}
// Update is called once per frame
void Update () {
}
}
```
### Boilerplate code
As the `com.unity.transport` package is a low level API, there is a bit of boiler plate code you might want to setup. This is an architecture design Unity chose to make sure that you always have full control.
> **Note**: As development on the `com.unity.transport` package evolves, more abstractions may be created to reduce your workload on a day-to-day basis.
The next step is to clean up the dependencies and add our boilerplate code:
**Filename**: [_Assets\Scripts\ServerBehaviour.cs_](samples/serverbehaviour.cs.md)
```c#
using UnityEngine;
using UnityEngine.Assertions;
using Unity.Collections;
using Unity.Networking.Transport;
...
```
#### Code walkthrough
### ServerBehaviour.cs
Adding the members we need the following code:
**Filename**: [_Assets\Scripts\ServerBehaviour.cs_](samples/serverbehaviour.cs.md)
```c#
using ...
public class ServerBehaviour : MonoBehaviour {
public NetworkDriver m_Driver;
private NativeList<NetworkConnection> m_Connections;
void Start () {
}
void OnDestroy() {
}
void Update () {
}
```
#### Code walkthrough
```
public NetworkDriver m_Driver;
private NativeList<NetworkConnection> m_Connections;
```
You need to declare a `NetworkDriver`. You also need to create a [NativeList](http://native-list-info) to hold our connections.
### Start method
**Filename**: [_Assets\Scripts\ServerBehaviour.cs_](samples/serverbehaviour.cs.md)
```c#
void Start ()
{
m_Driver = NetworkDriver.Create();
var endpoint = NetworkEndPoint.AnyIpv4;
endpoint.Port = 9000;
if (m_Driver.Bind(endpoint) != 0)
Debug.Log("Failed to bind to port 9000");
else
m_Driver.Listen();
m_Connections = new NativeList<NetworkConnection>(16, Allocator.Persistent);
}
```
#### Code walkthrough
The first line of code, `m_Driver = NetworkDriver.Create();` , just makes sure you are creating your driver without any parameters.
```c#
if (m_Driver.Bind(endpoint) != 0)
Debug.Log("Failed to bind to port 9000");
else
m_Driver.Listen();
```
Then we try to bind our driver to a specific network address and port, and if that does not fail, we call the `Listen` method.
> **Important**: the call to the `Listen` method sets the `NetworkDriver` to the `Listen` state. This means that the `NetworkDriver` will now actively listen for incoming connections.
` m_Connections = new NativeList<NetworkConnection>(16, Allocator.Persistent);`
Finally we create a `NativeList` to hold all the connections.
### OnDestroy method
Both `NetworkDriver` and `NativeList` allocate unmanaged memory and need to be disposed. To make sure this happens we can simply call the `Dispose` method when we are done with both of them.
Add the following code to the `OnDestroy` method on your [MonoBehaviour](https://docs.unity3d.com/ScriptReference/MonoBehaviour.html):
**Filename**: [_Assets\Scripts\ServerBehaviour.cs_](samples/serverbehaviour.cs.md)
```c#
public void OnDestroy()
{
m_Driver.Dispose();
m_Connections.Dispose();
}
```
### Server Update loop
As the `com.unity.transport` package uses the [Unity C# Job System](https://docs.unity3d.com/Manual/JobSystem.html) internally, the `m_Driver` has a `ScheduleUpdate` method call. Inside our `Update` loop you need to make sure to call the `Complete` method on the [JobHandle](https://docs.unity3d.com/Manual/JobSystemJobDependencies.html) that is returned, in order to know when you are ready to process any updates.
```c#
void Update () {
m_Driver.ScheduleUpdate().Complete();
```
> **Note**: In this example, we are forcing a synchronization on the main thread in order to update and handle our data later in the `MonoBehaviour::Update` call. The workflow [Creating a jobified client and server](workflow-client-server-jobs.md) shows you how to use the Transport package with the C# Job System.
The first thing we want to do, after you have updated your `m_Driver`, is to handle your connections. Start by cleaning up any old stale connections from the list before processing any new ones. This cleanup ensures that, when we iterate through the list to check what new events we have gotten, we dont have any old connections laying around.
Inside the "Clean up connections" block below, we iterate through our connection list and just simply remove any stale connections.
```c#
// Clean up connections
for (int i = 0; i < m_Connections.Length; i++)
{
if (!m_Connections[i].IsCreated)
{
m_Connections.RemoveAtSwapBack(i);
--i;
}
}
```
Under "Accept new connections" below, we add a connection while there are new connections to accept.
```c#
// Accept new connections
NetworkConnection c;
while ((c = m_Driver.Accept()) != default(NetworkConnection))
{
m_Connections.Add(c);
Debug.Log("Accepted a connection");
}
```
Now we have an up-to-date connection list. You can now start querying the driver for events that might have happened since the last update.
```c#
DataStreamReader stream;
for (int i = 0; i < m_Connections.Length; i++)
{
if (!m_Connections[i].IsCreated)
continue;
```
Begin by defining a `DataStreamReader`. This will be used in case any `Data` event was received. Then we just start looping through all our connections.
For each connection we want to call `PopEventForConnection` while there are more events still needing to get processed.
```c#
NetworkEvent.Type cmd;
while ((cmd = m_Driver.PopEventForConnection(m_Connections[i], out stream)) != NetworkEvent.Type.Empty)
{
```
> **Note**: There is also the `NetworkEvent.Type PopEvent(out NetworkConnection con, out DataStreamReader slice)` method call, that returns the first available event, the `NetworkConnection` that its for and possibly a `DataStreamReader`.
We are now ready to process events. Lets start with the `Data` event.
```c#
if (cmd == NetworkEvent.Type.Data)
{
```
Next, we try to read a `uint` from the stream and output what we have received:
```c#
uint number = stream.ReadUInt();
Debug.Log("Got " + number + " from the Client adding + 2 to it.");
```
When this is done we simply add two to the number we received and send it back. To send anything with the `NetworkDriver` we need a instance of a `DataStreamWriter`. A `DataStreamWriter` is a new type that comes with the `com.unity.transport` package. You get a `DataStreamWriter` when you start sending a message by calling `BeginSend`.
After you have written your updated number to your stream, you call the `EndSend` method on the driver and off it goes:
```c#
number +=2;
var writer = m_Driver.BeginSend(NetworkPipeline.Null, m_Connections[i]);
writer.WriteUInt(number);
m_Driver.EndSend(writer);
}
```
> One thing to note here is that we are `NetworkPipeline.Null`, to the `BeginSend` function. This way we say to the driver to use the unreliable pipeline to send our data. It is also possible to not specify a pipeline.
Finally, you need to handle the disconnect case. This is pretty straight forward, if you receive a disconnect message you need to reset that connection to a `default(NetworkConnection)`. As you might remember, the next time the `Update` loop runs you will clean up after yourself.
```c#
else if (cmd == NetworkEvent.Type.Disconnect)
{
Debug.Log("Client disconnected from server");
m_Connections[i] = default(NetworkConnection);
}
}
}
}
```
That's the whole server. Here is the full source code to [_ServerBehaviour.cs_](samples/serverbehaviour.cs.md).
## Creating a Client
The client code looks pretty similar to the server code at first glance, but there are a few subtle differences. This part of the workflow covers the differences between them, and not so much the similarities.
### ClientBehaviour.cs
You still define a `NetworkDriver` but instead of having a list of connections we now only have one. There is a `Done` flag to indicate when we are done, or in case you have issues with a connection, you can exit quick.
**Filename**: [_Assets\Scripts\ClientBehaviour.cs_](samples/clientbehaviour.cs.md)
```c#
using ...
public class ClientBehaviour : MonoBehaviour {
public NetworkDriver m_Driver;
public NetworkConnection m_Connection;
public bool Done;
void Start () { ... }
public void OnDestroy() { ... }
void Update() { ... }
}
```
### Creating and Connecting a Client
Start by creating a driver for the client and an address for the server.
```c#
void Start () {
m_Driver = NetworkDriver.Create();
m_Connection = default(NetworkConnection);
var endpoint = NetworkEndPoint.LoopbackIpv4;
endpoint.Port = 9000;
m_Connection = m_Driver.Connect(endpoint);
}
```
Then call the `Connect` method on your driver.
Cleaning up this time is a bit easier because you don’t need a `NativeList` to hold your connections, so it simply just becomes:
```c#
public void OnDestroy()
{
m_Driver.Dispose();
}
```
### Client Update loop
You start the same way as you did in the server by calling `m_Driver.ScheduleUpdate().Complete();` and make sure that the connection worked.
```c#
void Update()
{
m_Driver.ScheduleUpdate().Complete();
if (!m_Connection.IsCreated)
{
if (!Done)
Debug.Log("Something went wrong during connect");
return;
}
```
You should recognize the code below, but if you look closely you can see that the call to `m_Driver.PopEventForConnection` was switched out with a call to `m_Connection.PopEvent`. This is technically the same method, it just makes it a bit clearer that you are handling a single connection.
```c#
DataStreamReader stream;
NetworkEvent.Type cmd;
while ((cmd = m_Connection.PopEvent(m_Driver, out stream)) != NetworkEvent.Type.Empty)
{
```
Now you encounter a new event you have not seen yet: a `NetworkEvent.Type.Connect` event.
This event tells you that you have received a `ConnectionAccept` message and you are now connected to the remote peer.
> **Note**: In this case, the server that is listening on port `9000` on `NetworkEndPoint.LoopbackIpv4` is more commonly known as `127.0.0.1`.
```
if (cmd == NetworkEvent.Type.Connect)
{
Debug.Log("We are now connected to the server");
uint value = 1;
var writer = m_Driver.BeginSend(m_Connection);
writer.WriteUInt(value);
m_Driver.EndSend(writer);
}
```
When you establish a connection between the client and the server, you send a number (that you want the server to increment by two). The use of the `BeginSend` / `EndSend` pattern together with the `DataStreamWriter`, where we set `value` to one, write it into the stream, and finally send it out on the network.
When the `NetworkEvent` type is `Data`, as below, you read the `value` back that you received from the server and then call the `Disconnect` method.
> **Note**: A good pattern is to always set your `NetworkConnection` to `default(NetworkConnection)` to avoid stale references.
```c#
else if (cmd == NetworkEvent.Type.Data)
{
uint value = stream.ReadUInt();
Debug.Log("Got the value = " + value + " back from the server");
Done = true;
m_Connection.Disconnect(m_Driver);
m_Connection = default(NetworkConnection);
}
```
Lastly we just want to make sure we handle the case that a server disconnects us for some reason.
```c#
else if (cmd == NetworkEvent.Type.Disconnect)
{
Debug.Log("Client got disconnected from server");
m_Connection = default(NetworkConnection);
}
}
}
```
Here is the full source code for the [_ClientBehaviour.cs_](samples/clientbehaviour.cs.md).
## Putting it all together.
To take this for a test run, you can simply add a new empty [GameObject](https://docs.unity3d.com/ScriptReference/GameObject.html) to our **Scene**.
![GameObject Added](images/game-object.PNG)
Add add both of our behaviours to it.
![Inspector](images/inspector.PNG)
Now when we press __Play__ we should see five log messages show up in your __Console__ window. Similar to this:
![Console](images/console-view.PNG)
[Back to table of contents](TableOfContents.md)

5
Packages/com.unity.transport/LICENSE.md


Unity Transport Package © 2018 Unity Technologies
Licensed under the Unity Companion License for Unity-dependent projects (see https://unity3d.com/legal/licenses/unity_companion_license).
Unless expressly provided otherwise, the Software under this license is made available strictly on an “AS IS” BASIS WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED. Please review the license for details on these and other terms and conditions.

7
Packages/com.unity.transport/LICENSE.md.meta


fileFormatVersion: 2
guid: dee4a72ac87dbbc43b3543eeca58fbe4
TextScriptImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

10
Packages/com.unity.transport/Pipfile


[[source]]
name = "pypi-artifactory"
url = "https://artifactory.prd.it.unity3d.com/artifactory/api/pypi/pypi/simple"
verify_ssl = true
[packages]
requests = "*"
[dev-packages]

7
Packages/com.unity.transport/Pipfile.meta


fileFormatVersion: 2
guid: 4fe67ec7a45b4ab0a586ab0216efe681
TextScriptImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

46
Packages/com.unity.transport/README.md


# Welcome
Welcome to the Unity Transport repository!
The new Unity Transport Package which will replace the UNet low-level API.
The preview of the transport package supports establishing connections and sending messages to a
remote host. It also contains utilities for serializing data streams to send
over the network.
## Transport CI summary
[![](https://badge-proxy.cds.internal.unity3d.com/c59df3b8-7f64-4158-9ef7-4c99748185cb)](https://badges.cds.internal.unity3d.com/packages/com.unity.transport/build-info?branch=master) [![](https://badge-proxy.cds.internal.unity3d.com/65a2af76-0337-4ec3-a20c-5f9a09ed62eb)](https://badges.cds.internal.unity3d.com/packages/com.unity.transport/dependencies-info?branch=master) [![](https://badge-proxy.cds.internal.unity3d.com/5cd5fb42-a61f-4502-b75a-b8d80deb41f2)](https://badges.cds.internal.unity3d.com/packages/com.unity.transport/dependants-info) [![](https://badge-proxy.cds.internal.unity3d.com/cad278d5-2dba-4434-aac2-1466a4bd2ea6)](https://badges.cds.internal.unity3d.com/packages/com.unity.transport/warnings-info?branch=master) ![ReleaseBadge](https://badge-proxy.cds.internal.unity3d.com/f2096d78-45e6-4402-978b-0058b1e3277c) ![ReleaseBadge](https://badge-proxy.cds.internal.unity3d.com/fb5e4d88-0b2f-4883-ad0d-1b69b33e7861)
## Documentation
For more information about the Transport package, please see the [Unity Transport Documentation](https://docs-multiplayer.unity3d.com/transport/introduction). The site includes guides, API reference, and release notes.
A [changelog](CHANGELOG.md) is also available in the package.
## Connect
See the [Multiplayer forum](https://forum.unity.com/forums/multiplayer.26/) to ask questions and connect with Transport.
# Samples
All samples are in */TransportSamples~*.
## Ping
The ping sample is a good starting point for learning about all the parts included
in the transport package. The ping client establishes a connection to the ping server,
sends a ping message and receives a pong reply. Once pong is received the client
will disconnect.
It is a simple example showing you how to use the new Unity Transport Package.
Ping consists of multiple scenes, all found in `sampleproject/Assets/Scenes/`.
- `PingMainThread.unity` - A main-thread only implementation of ping.
- `Ping.unity` - A fully jobified version of the ping client and server.
- `PingClient.unity` - The same jobified client code as `Ping.unity`, but without the server.
- `PingServer.unity` - The dedicated server version of the jobified ping. A headless (or Server Build in 2019.1) Linux 64 bit build of this scene is what should be deployed to Multiplay.
- `PingECS.unity` - An ECS version of the jobified ping sample.
## Soaker
A stress test which will create a set number of clients and a server in the same process. Each client will send messages at the specified rate with the specified size and measure statistics.
## Pipeline
An example of the pipelines feature that offers layers of functionality on top of the default socket implementation behaviour.

7
Packages/com.unity.transport/README.md.meta


fileFormatVersion: 2
guid: dfb79a61342174be1ba04fa4efdfdc6d
TextScriptImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

8
Packages/com.unity.transport/Runtime.meta


fileFormatVersion: 2
guid: d547784248f27e348aeac1860989bf5d
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

5
Packages/com.unity.transport/Runtime/AssemblyInfo.cs


using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Unity.Networking.Transport.EditorTests")]
[assembly: InternalsVisibleTo("Unity.Multiplayer.Transport.UTP")]

11
Packages/com.unity.transport/Runtime/AssemblyInfo.cs.meta


fileFormatVersion: 2
guid: 4231ec648d4054f72a4a9cf3adb42c64
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

89
Packages/com.unity.transport/Runtime/AtomicFreeList.cs


using System;
using System.Threading;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
namespace Unity.Networking.Transport.Utilities.LowLevel.Unsafe
{
internal unsafe struct UnsafeAtomicFreeList : IDisposable
{
// used count
// free list size
// free indices...
[NativeDisableUnsafePtrRestriction]
private int* m_Buffer;
private int m_Length;
private Allocator m_Allocator;
public int Capacity => m_Length;
public int InUse => m_Buffer[0] - m_Buffer[1];
public bool IsCreated => m_Buffer != null;
/// <summary>
/// Initializes a new instance of the AtomicFreeList struct.
/// </summary>
/// <param name="capacity">The number of elements the free list can store.</param>
/// <param name="allocator">The <see cref="Allocator"/> used to allocate the memory.</param>
public UnsafeAtomicFreeList(int capacity, Allocator allocator)
{
m_Allocator = allocator;
m_Length = capacity;
var size = UnsafeUtility.SizeOf<int>() * (capacity + 2);
m_Buffer = (int*)UnsafeUtility.Malloc(size, UnsafeUtility.AlignOf<int>(), allocator);
UnsafeUtility.MemClear(m_Buffer, size);
}
public void Dispose()
{
if (IsCreated)
UnsafeUtility.Free(m_Buffer, m_Allocator);
}
/// <summary>
/// Inserts an item on top of the stack.
/// </summary>
/// <param name="item">The item to push onto the stack.</param>
public unsafe void Push(int item)
{
int* buffer = m_Buffer;
int idx = Interlocked.Increment(ref buffer[1]) - 1;
while (Interlocked.CompareExchange(ref buffer[idx + 2], item + 1, 0) != 0)
{
}
}
/// <summary>
/// Remove and return a value from the top of the stack
/// </summary>
/// <remarks>
/// <value>The removed value from the top of the stack.</value>
public unsafe int Pop()
{
int* buffer = m_Buffer;
int idx = buffer[1] - 1;
while (idx >= 0 && Interlocked.CompareExchange(ref buffer[1], idx, idx + 1) != idx + 1)
idx = buffer[1] - 1;
if (idx >= 0)
{
int val = 0;
while (val == 0)
{
val = Interlocked.Exchange(ref buffer[2 + idx], 0);
}
return val - 1;
}
idx = Interlocked.Increment(ref buffer[0]) - 1;
if (idx >= Capacity)
{
Interlocked.Decrement(ref buffer[0]);
return -1;
}
return idx;
}
}
}

11
Packages/com.unity.transport/Runtime/AtomicFreeList.cs.meta


fileFormatVersion: 2
guid: babb432cd74d9eb4e8bf3a339ca4418b
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

112
Packages/com.unity.transport/Runtime/Base64.cs


using System;
using Unity.Collections.LowLevel.Unsafe;
using UnityEngine.Assertions;
namespace Unity.Networking.Transport
{
public static class Base64
{
/// <summary>
/// Decode characters representing a Base64 encoding into bytes. Will throw ArgumentException on any error
/// </summary>
/// <param name="startInputPtr">Pointer to first input char in UTF16 string (C# strings)</param>
/// <param name="inputLength">Number of input chars</param>
/// <param name="startDestPtr">Pointer to location for teh first result byte</param>
/// <param name="destLength">Max length of the preallocated result buffer</param>
/// <returns>Actually written bytes to startDestPtr that is less or equal than destLength</returns>
private static unsafe int FromBase64_Decode_UTF16(byte* startInputPtr, int inputLength, byte* startDestPtr, int destLength)
{
if (inputLength == 0)
return 0;
const int sizeCharUTF16 = 2;
// 3 bytes == 4 chars in the input base64 string
if (inputLength % 4 != 0)
throw new ArgumentException("Base64 string's length must be multiple of 4");
if (destLength < inputLength / 4 * 3 - 2)
throw new ArgumentException("Dest array is too small");
var originalStartDestPtr = startDestPtr;
var n = inputLength / 4;
const string table = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
var lookup = stackalloc byte[256];
UnsafeUtility.MemSet(lookup, 0xFF, 256);
for (byte i = 0; i < table.Length; i++)
lookup[table[i]] = i;
lookup['='] = 0;
// skip last 4 chars
for (var i = 0; i < n - 1; i++)
{
byte a = lookup[startInputPtr[0 * sizeCharUTF16]];
byte b = lookup[startInputPtr[1 * sizeCharUTF16]];
byte c = lookup[startInputPtr[2 * sizeCharUTF16]];
byte d = lookup[startInputPtr[3 * sizeCharUTF16]];
if (a == 0xFF || b == 0xFF || c == 0xFF || d == 0xFF)
throw new ArgumentException("Invalid Base64 symbol");
*startDestPtr++ = (byte) ((a << 2) | (b >> 4));
*startDestPtr++ = (byte) ((b << 4) | (c >> 2));
*startDestPtr++ = (byte) ((c << 6) | d);
startInputPtr += 4 * sizeCharUTF16;
}
// last 4 chars
var cc = startInputPtr[2 * sizeCharUTF16];
var dd = startInputPtr[3 * sizeCharUTF16];
var la = lookup[startInputPtr[0 * sizeCharUTF16]];
var lb = lookup[startInputPtr[1 * sizeCharUTF16]];
var lc = lookup[cc];
var ld = lookup[dd];
if (la == 0xFF || lb == 0xFF || lc == 0xFF || ld == 0xFF)
throw new ArgumentException("Invalid Base64 symbol");
*startDestPtr++ = (byte) ((la << 2) | (lb >> 4));
if (cc != '=') // == means 4 chars == 1 byte, we already wrote that
{
if (dd == '=') // = means 4 chars == 2 bytes, 1 more
{
if (destLength < inputLength / 4 * 3 - 1)
throw new ArgumentException("Dest array is too small");
*startDestPtr++ = (byte) ((lb << 4) | (lc >> 2));
}
else // no padding, 4 chars == 3 bytes, 2 more
{
if (destLength < inputLength / 4 * 3)
throw new ArgumentException("Dest array is too small");
*startDestPtr++ = (byte) ((lb << 4) | (lc >> 2));
*startDestPtr++ = (byte) ((lc << 6) | ld);
}
}
return (int) (startDestPtr - originalStartDestPtr);
}
/// <summary>
/// Decodes base64 string and writes binary data into dest
/// </summary>
/// <param name="base64">Input base64 string to decode</param>
/// <param name="dest">Decoded base64 will be written here</param>
/// <param name="destMaxLength">Max length that dest can handle. Will throw if not enough</param>
/// <returns>Actual length of data that was written to dest. Less or equal than destLength</returns>
public static unsafe int FromBase64String(string base64, byte* dest, int destMaxLength)
{
fixed (char* ptr = base64)
{
return FromBase64_Decode_UTF16((byte*)ptr, base64.Length, dest, destMaxLength);
}
}
}
}

3
Packages/com.unity.transport/Runtime/Base64.cs.meta


fileFormatVersion: 2
guid: 834303e1da63424091c3a7d11d764c2a
timeCreated: 1623792607

91
Packages/com.unity.transport/Runtime/BaselibNetworkArray.cs


using System;
using Unity.Baselib;
using Unity.Baselib.LowLevel;
using Unity.Mathematics;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using ErrorState = Unity.Baselib.LowLevel.Binding.Baselib_ErrorState;
using ErrorCode = Unity.Baselib.LowLevel.Binding.Baselib_ErrorCode;
namespace Unity.Networking.Transport
{
using size_t = UIntPtr;
internal unsafe struct UnsafeBaselibNetworkArray : IDisposable
{
[NativeDisableUnsafePtrRestriction] Binding.Baselib_RegisteredNetwork_Buffer* m_Buffer;
/// <summary>
/// Initializes a new instance of the UnsafeBaselibNetworkArray struct.
/// </summary>
/// <param name="capacity"></param>
/// <exception cref="ArgumentOutOfRangeException">Thrown if the capacity is less then 0 or if the value exceeds <see cref="int.MaxValue"/> </exception>
public UnsafeBaselibNetworkArray(int capacity)
{
var totalSize = (long)capacity;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (capacity < 0)
throw new ArgumentOutOfRangeException(nameof(capacity), "Capacity must be >= 0");
if (totalSize > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(capacity), $"Capacity * sizeof(T) cannot exceed {int.MaxValue} bytes");
#endif
var pageInfo = stackalloc Binding.Baselib_Memory_PageSizeInfo[1];
Binding.Baselib_Memory_GetPageSizeInfo(pageInfo);
var defaultPageSize = (ulong)pageInfo->defaultPageSize;
var pageCount = (ulong)1;
if ((ulong) totalSize > defaultPageSize)
{
pageCount = (ulong)math.ceil(totalSize / (double) defaultPageSize);
}
var error = default(ErrorState);
var pageAllocation = Binding.Baselib_Memory_AllocatePages(
pageInfo->defaultPageSize,
pageCount,
1,
Binding.Baselib_Memory_PageState.ReadWrite,
&error);
if (error.code != ErrorCode.Success)
throw new Exception();
UnsafeUtility.MemSet((void*)pageAllocation.ptr, 0, (long)(pageAllocation.pageCount * pageAllocation.pageSize));
m_Buffer = (Binding.Baselib_RegisteredNetwork_Buffer*)UnsafeUtility.Malloc(UnsafeUtility.SizeOf<Binding.Baselib_RegisteredNetwork_Buffer>(), UnsafeUtility.AlignOf<Binding.Baselib_RegisteredNetwork_Buffer>(), Allocator.Persistent);
*m_Buffer = Binding.Baselib_RegisteredNetwork_Buffer_Register(pageAllocation, &error);
if (error.code != (int)ErrorCode.Success)
{
Binding.Baselib_Memory_ReleasePages(pageAllocation, &error);
*m_Buffer = default;
throw new Exception();
}
}
public void Dispose()
{
var error = default(ErrorState);
var pageAllocation = m_Buffer->allocation;
Binding.Baselib_RegisteredNetwork_Buffer_Deregister(*m_Buffer);
Binding.Baselib_Memory_ReleasePages(pageAllocation, &error);
UnsafeUtility.Free(m_Buffer, Allocator.Persistent);
}
/// <summary>
/// Gets a element at the specified index, with the size of <see cref="elementSize">.
/// </summary>
/// <value>A <see cref="Binding.Baselib_RegisteredNetwork_BufferSlice"> pointing to the index supplied.</value>
public Binding.Baselib_RegisteredNetwork_BufferSlice AtIndexAsSlice(int index, uint elementSize)
{
var offset = elementSize * (uint)index;
Binding.Baselib_RegisteredNetwork_BufferSlice slice;
slice.id = m_Buffer->id;
slice.data = (IntPtr)((byte*) m_Buffer->allocation.ptr + offset);
slice.offset = offset;
slice.size = elementSize;
return slice;
}
}
}

11
Packages/com.unity.transport/Runtime/BaselibNetworkArray.cs.meta


fileFormatVersion: 2
guid: 699137ce9825846c981a2c997973c26a
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

590
Packages/com.unity.transport/Runtime/BaselibNetworkInterface.cs


#if !UNITY_WEBGL
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Unity.Baselib.LowLevel;
using Unity.Burst;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Jobs;
using Unity.Networking.Transport.Utilities.LowLevel.Unsafe;
using Unity.Networking.Transport.Protocols;
using ErrorState = Unity.Baselib.LowLevel.Binding.Baselib_ErrorState;
using ErrorCode = Unity.Baselib.LowLevel.Binding.Baselib_ErrorCode;
namespace Unity.Networking.Transport
{
using NetworkRequest = Binding.Baselib_RegisteredNetwork_Request;
using NetworkEndpoint = Binding.Baselib_RegisteredNetwork_Endpoint;
using NetworkSocket = Binding.Baselib_RegisteredNetwork_Socket_UDP;
public struct BaselibNetworkParameter : INetworkParameter
{
public int receiveQueueCapacity;
public int sendQueueCapacity;
public uint maximumPayloadSize;
}
[BurstCompile]
public struct BaselibNetworkInterface : INetworkInterface
{
public static BaselibNetworkParameter DefaultParameters = new BaselibNetworkParameter
{
receiveQueueCapacity = k_defaultRxQueueSize,
sendQueueCapacity = k_defaultTxQueueSize,
maximumPayloadSize = NetworkParameterConstants.MTU
};
#if ENABLE_UNITY_COLLECTIONS_CHECKS
private class SocketList
{
public struct SocketId
{
public NetworkSocket socket;
}
public List<SocketId> OpenSockets = new List<SocketId>();
~SocketList()
{
foreach (var socket in OpenSockets)
{
Binding.Baselib_RegisteredNetwork_Socket_UDP_Close(socket.socket);
}
}
}
private static SocketList AllSockets = new SocketList();
#endif
struct Payloads : IDisposable
{
public UnsafeAtomicFreeList m_Handles;
public UnsafeBaselibNetworkArray m_PayloadArray;
public UnsafeBaselibNetworkArray m_EndpointArray;
private uint m_PayloadSize;
public int InUse => m_Handles.InUse;
public int Capacity => m_Handles.Capacity;
public Payloads(int capacity, uint maxPayloadSize)
{
m_PayloadSize = maxPayloadSize;
m_Handles = new UnsafeAtomicFreeList(capacity, Allocator.Persistent);
m_PayloadArray = new UnsafeBaselibNetworkArray(capacity * (int)maxPayloadSize);
m_EndpointArray = new UnsafeBaselibNetworkArray(capacity * (int)Binding.Baselib_RegisteredNetwork_Endpoint_MaxSize);
}
public bool IsCreated => m_Handles.IsCreated;
public void Dispose()
{
m_Handles.Dispose();
m_PayloadArray.Dispose();
m_EndpointArray.Dispose();
}
public NetworkRequest GetRequestFromHandle(int handle)
{
return new NetworkRequest {payload = m_PayloadArray.AtIndexAsSlice(handle, m_PayloadSize),
remoteEndpoint = new NetworkEndpoint{slice = m_EndpointArray.AtIndexAsSlice(handle, (uint)Binding.Baselib_RegisteredNetwork_Endpoint_MaxSize)}};
}
public int AcquireHandle()
{
return m_Handles.Pop();
}
public void ReleaseHandle(int handle)
{
m_Handles.Push(handle);
}
}
private BaselibNetworkParameter configuration;
private const int k_defaultRxQueueSize = 64;
private const int k_defaultTxQueueSize = 64;
unsafe struct BaselibData
{
public NetworkSocket m_Socket;
public Payloads m_PayloadsTx;
}
[ReadOnly]
private NativeArray<BaselibData> m_Baselib;
[NativeDisableContainerSafetyRestriction]
private Payloads m_PayloadsRx;
[NativeDisableContainerSafetyRestriction]
private Payloads m_PayloadsTx;
private UnsafeBaselibNetworkArray m_LocalAndTempEndpoint;
/// <summary>
/// Returns the local endpoint.
/// </summary>
/// <value>NetworkInterfaceEndPoint</value>
public unsafe NetworkInterfaceEndPoint LocalEndPoint
{
// error handling: handle the errors...
get
{
var error = default(ErrorState);
Binding.Baselib_NetworkAddress local;
Binding.Baselib_RegisteredNetwork_Socket_UDP_GetNetworkAddress(m_Baselib[0].m_Socket, &local, &error);
var ep = default(NetworkInterfaceEndPoint);
if (error.code != ErrorCode.Success)
return ep;
ep.dataLength = UnsafeUtility.SizeOf<Binding.Baselib_NetworkAddress>();
UnsafeUtility.MemCpy(ep.data, &local, ep.dataLength);
return ep;
}
}
public bool IsCreated => m_Baselib.IsCreated;
/// <summary>
/// Creates a interface endpoint.
/// </summary>
/// <value>NetworkInterfaceEndPoint</value>
public unsafe int CreateInterfaceEndPoint(NetworkEndPoint address, out NetworkInterfaceEndPoint endpoint)
{
var slice = m_LocalAndTempEndpoint.AtIndexAsSlice(0, (uint)Binding.Baselib_RegisteredNetwork_Endpoint_MaxSize);
var error = default(ErrorState);
endpoint = default(NetworkInterfaceEndPoint);
NetworkEndpoint local;
local = Binding.Baselib_RegisteredNetwork_Endpoint_Create(
(Binding.Baselib_NetworkAddress*)&address.rawNetworkAddress,
slice,
&error);
if (error.code != ErrorCode.Success)
return (int)error.code;
endpoint.dataLength = (int)local.slice.size;
fixed (void* ptr = endpoint.data)
{
UnsafeUtility.MemCpy(ptr, (void*)local.slice.data, endpoint.dataLength);
}
return (int) Error.StatusCode.Success;
}
public unsafe NetworkEndPoint GetGenericEndPoint(NetworkInterfaceEndPoint endpoint)
{
// Set to a valid address so length is set correctly
var address = NetworkEndPoint.LoopbackIpv4;
var error = default(ErrorState);
var slice = m_LocalAndTempEndpoint.AtIndexAsSlice(0, (uint)Binding.Baselib_RegisteredNetwork_Endpoint_MaxSize);
NetworkEndpoint local;
local.slice = slice;
local.slice.size = (uint)endpoint.dataLength;
UnsafeUtility.MemCpy((void*)local.slice.data, endpoint.data, endpoint.dataLength);
Binding.Baselib_RegisteredNetwork_Endpoint_GetNetworkAddress(local, &address.rawNetworkAddress, &error);
if (error.code != ErrorCode.Success)
return default;
return address;
}
/// <summary>
/// Initializes a instance of the BaselibNetworkInterface struct.
/// </summary>
/// <param name="param">An array of INetworkParameter. There is currently only <see cref="BaselibNetworkParameter"/> that can be passed.</param>
public unsafe int Initialize(params INetworkParameter[] param)
{
if (!TryExtractParameters(out configuration, param))
{
configuration = DefaultParameters;
}
m_Baselib = new NativeArray<BaselibData>(1, Allocator.Persistent);
var baselib = default(BaselibData);
m_PayloadsTx = new Payloads(configuration.sendQueueCapacity, configuration.maximumPayloadSize);
m_PayloadsRx = new Payloads(configuration.receiveQueueCapacity, configuration.maximumPayloadSize);
m_LocalAndTempEndpoint = new UnsafeBaselibNetworkArray(2 * (int)Binding.Baselib_RegisteredNetwork_Endpoint_MaxSize);
baselib.m_PayloadsTx = m_PayloadsTx;
m_Baselib[0] = baselib;
var ep = default(NetworkInterfaceEndPoint);
var result = 0;
if ((result = CreateInterfaceEndPoint(NetworkEndPoint.AnyIpv4, out ep)) != (int)Error.StatusCode.Success)
return result;
return Bind(ep);
}
public void Dispose()
{
if (m_Baselib[0].m_Socket.handle != IntPtr.Zero)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AllSockets.OpenSockets.Remove(new SocketList.SocketId
{socket = m_Baselib[0].m_Socket});
#endif
Binding.Baselib_RegisteredNetwork_Socket_UDP_Close(m_Baselib[0].m_Socket);
}
m_LocalAndTempEndpoint.Dispose();
if (m_PayloadsTx.IsCreated)
m_PayloadsTx.Dispose();
if (m_PayloadsRx.IsCreated)
m_PayloadsRx.Dispose();
m_Baselib.Dispose();
}
#region ReceiveJob
[BurstCompile]
struct FlushSendJob : IJob
{
public Payloads Tx;
[NativeDisableContainerSafetyRestriction]
public NativeArray<BaselibData> Baselib;
public unsafe void Execute()
{
var error = default(ErrorState);
var pollCount = 0;
while(Binding.Baselib_RegisteredNetwork_Socket_UDP_ProcessSend(Baselib[0].m_Socket, &error) == Binding.Baselib_RegisteredNetwork_ProcessStatus.Pending && pollCount++ < k_defaultTxQueueSize){}
int count;
// InUse is not thread safe, needs to be called in a single threaded flush job
var inFlight = Tx.InUse;
if (inFlight > 0)
{
var results = stackalloc Binding.Baselib_RegisteredNetwork_CompletionResult[inFlight];
count = (int)Binding.Baselib_RegisteredNetwork_Socket_UDP_DequeueSend(Baselib[0].m_Socket, results, (uint)inFlight, &error);
if (error.code != ErrorCode.Success)
{
// copy recv flow? e.g. pass
return;
}
for (int i = 0; i < count; ++i)
{
// return results[i].status through userdata, mask? or allocate space at beginning?
// pass through a new NetworkPacketSender.?
Tx.ReleaseHandle((int)results[i].requestUserdata - 1);
}
}
}
}
[BurstCompile]
struct ReceiveJob : IJob
{
public NetworkPacketReceiver Receiver;
public Payloads Rx;
[NativeDisableContainerSafetyRestriction]
public NativeArray<BaselibData> Baselib;
public unsafe void Execute()
{
var count = 0;
var outstanding = Rx.InUse;
var error = default(ErrorState);
var requests = stackalloc Binding.Baselib_RegisteredNetwork_Request[Rx.Capacity];
if (outstanding > 0)
{
var pollCount = 0;
while (Binding.Baselib_RegisteredNetwork_Socket_UDP_ProcessRecv(Baselib[0].m_Socket, &error) == Binding.Baselib_RegisteredNetwork_ProcessStatus.Pending && pollCount++ < k_defaultRxQueueSize) {}
var results = stackalloc Binding.Baselib_RegisteredNetwork_CompletionResult[outstanding];
// Pop Completed Requests off the CompletionQ
count = (int)Binding.Baselib_RegisteredNetwork_Socket_UDP_DequeueRecv(Baselib[0].m_Socket, results, (uint)outstanding, &error);
if (error.code != ErrorCode.Success)
{
Receiver.ReceiveErrorCode = (int) error.code;
return;
}
// Copy and run Append on each Packet.
var stream = Receiver.GetDataStream();
var headerLength = UnsafeUtility.SizeOf<UdpCHeader>();
var address = default(NetworkInterfaceEndPoint);
var indicies = stackalloc int[count];
for (int i = 0; i < count; i++)
{
if (results[i].status == Binding.Baselib_RegisteredNetwork_CompletionStatus.Failed)
{
continue;
}
var receivedBytes = (int) results[i].bytesTransferred;
var index = (int)results[i].requestUserdata - 1;
var packet = Rx.GetRequestFromHandle(index);
indicies[i] = index;
outstanding--;
var payloadLen = receivedBytes;
int dataStreamSize = Receiver.GetDataStreamSize();
if (Receiver.DynamicDataStreamSize())
{
while (dataStreamSize + payloadLen >= stream.Length)
stream.ResizeUninitialized(stream.Length*2);
}
else if (dataStreamSize + payloadLen > stream.Length)
{
Receiver.ReceiveErrorCode = 10040;//(int)ErrorCode.OutOfMemory;
continue;
}
UnsafeUtility.MemCpy(
(byte*)stream.GetUnsafePtr() + dataStreamSize,
(byte*)packet.payload.data,
payloadLen);
var remote = packet.remoteEndpoint.slice;
address.dataLength = (int)remote.size;
UnsafeUtility.MemCpy(address.data, (void*)remote.data, (int)remote.size);
Receiver.ReceiveCount += Receiver.AppendPacket(address, receivedBytes);
}
// Reuse the requests after they have been processed.
for (int i = 0; i < count; i++)
{
requests[i] = Rx.GetRequestFromHandle(indicies[i]);
requests[i].requestUserdata = (IntPtr)indicies[i] + 1;
}
}
while (Rx.InUse < Rx.Capacity)
{
int handle = Rx.AcquireHandle();
requests[count] = Rx.GetRequestFromHandle(handle);
requests[count].requestUserdata = (IntPtr)handle + 1;
++count;
}
if (count > 0)
{
count = (int) Binding.Baselib_RegisteredNetwork_Socket_UDP_ScheduleRecv(
Baselib[0].m_Socket,
requests,
(uint)count,
&error);
if (error.code != ErrorCode.Success)
Receiver.ReceiveErrorCode = (int) error.code;
}
}
}
#endregion
public JobHandle ScheduleReceive(NetworkPacketReceiver receiver, JobHandle dep)
{
var job = new ReceiveJob
{
Baselib = m_Baselib,
Rx = m_PayloadsRx,
Receiver = receiver
};
return job.Schedule(dep);
}
public JobHandle ScheduleSend(NativeQueue<QueuedSendMessage> sendQueue, JobHandle dep)
{
var job = new FlushSendJob
{
Baselib = m_Baselib,
Tx = m_PayloadsTx
};
return job.Schedule(dep);
}
/// <summary>
/// Binds the BaselibNetworkInterface to the endpoint passed.
/// </summary>
/// <param name="endpoint">A valid ipv4 or ipv6 address</param>
/// <value>int</value>
public unsafe int Bind(NetworkInterfaceEndPoint endpoint)
{
var baselib = m_Baselib[0];
if (m_Baselib[0].m_Socket.handle != IntPtr.Zero)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AllSockets.OpenSockets.Remove(new SocketList.SocketId
{socket = m_Baselib[0].m_Socket});
#endif
Binding.Baselib_RegisteredNetwork_Socket_UDP_Close(m_Baselib[0].m_Socket);
baselib.m_Socket.handle = IntPtr.Zero;
// Recreate the payloads to make sure we do not loose any items from the queue
m_PayloadsRx.Dispose();
m_PayloadsRx = new Payloads(configuration.receiveQueueCapacity, configuration.maximumPayloadSize);
}
var slice = m_LocalAndTempEndpoint.AtIndexAsSlice(0, (uint)Binding.Baselib_RegisteredNetwork_Endpoint_MaxSize);
UnsafeUtility.MemCpy((void*)slice.data, endpoint.data, endpoint.dataLength);
var error = default(ErrorState);
NetworkEndpoint local;
local.slice = slice;
Binding.Baselib_NetworkAddress localAddress;
Binding.Baselib_RegisteredNetwork_Endpoint_GetNetworkAddress(local, &localAddress, &error);
baselib.m_Socket = Binding.Baselib_RegisteredNetwork_Socket_UDP_Create(
&localAddress,
Binding.Baselib_NetworkAddress_AddressReuse.Allow,
checked((uint)configuration.sendQueueCapacity),
checked((uint)configuration.receiveQueueCapacity),
&error);
if (error.code != ErrorCode.Success)
{
m_Baselib[0] = baselib;
return (int) error.code == -1 ? -1 : -(int) error.code;
}
// Schedule receive right away so we do not loose packets received before the first call to update
int count = 0;
var requests = stackalloc Binding.Baselib_RegisteredNetwork_Request[m_PayloadsRx.Capacity];
while (m_PayloadsRx.InUse < m_PayloadsRx.Capacity)
{
int handle = m_PayloadsRx.AcquireHandle();
requests[count] = m_PayloadsRx.GetRequestFromHandle(handle);
requests[count].requestUserdata = (IntPtr)handle + 1;
++count;
}
if (count > 0)
{
Binding.Baselib_RegisteredNetwork_Socket_UDP_ScheduleRecv(
baselib.m_Socket,
requests,
(uint)count,
&error);
// how should this be handled? what are the cases?
if (error.code != ErrorCode.Success)
return (int) error.code == -1 ? -1 : -(int) error.code;
}
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AllSockets.OpenSockets.Add(new SocketList.SocketId
{socket = baselib.m_Socket});
#endif
m_Baselib[0] = baselib;
return 0;
}
public int Listen()
{
return 0;
}
static TransportFunctionPointer<NetworkSendInterface.BeginSendMessageDelegate> BeginSendMessageFunctionPointer = new TransportFunctionPointer<NetworkSendInterface.BeginSendMessageDelegate>(BeginSendMessage);
static TransportFunctionPointer<NetworkSendInterface.EndSendMessageDelegate> EndSendMessageFunctionPointer = new TransportFunctionPointer<NetworkSendInterface.EndSendMessageDelegate>(EndSendMessage);
static TransportFunctionPointer<NetworkSendInterface.AbortSendMessageDelegate> AbortSendMessageFunctionPointer = new TransportFunctionPointer<NetworkSendInterface.AbortSendMessageDelegate>(AbortSendMessage);
public unsafe NetworkSendInterface CreateSendInterface()
{
return new NetworkSendInterface
{
BeginSendMessage = BeginSendMessageFunctionPointer,
EndSendMessage = EndSendMessageFunctionPointer,
AbortSendMessage = AbortSendMessageFunctionPointer,
UserData = (IntPtr)m_Baselib.GetUnsafePtr()
};
}
[BurstCompile(DisableDirectCall = true)]
[AOT.MonoPInvokeCallback(typeof(NetworkSendInterface.BeginSendMessageDelegate))]
private static unsafe int BeginSendMessage(out NetworkInterfaceSendHandle handle, IntPtr userData, int requiredPayloadSize)
{
var baselib = (BaselibData*)userData;
handle = default(NetworkInterfaceSendHandle);
int index = baselib->m_PayloadsTx.AcquireHandle();
if (index < 0)
return (int)Error.StatusCode.NetworkSendQueueFull;
var message = baselib->m_PayloadsTx.GetRequestFromHandle(index);
if ((int) message.payload.size < requiredPayloadSize)
{
baselib->m_PayloadsTx.ReleaseHandle(index);
return (int)Error.StatusCode.NetworkPacketOverflow;
}
handle.id = index;
handle.size = 0;
handle.data = (IntPtr)message.payload.data;
handle.capacity = (int) message.payload.size;
return (int)Error.StatusCode.Success;
}
[BurstCompile(DisableDirectCall = true)]
[AOT.MonoPInvokeCallback(typeof(NetworkSendInterface.EndSendMessageDelegate))]
private static unsafe int EndSendMessage(ref NetworkInterfaceSendHandle handle, ref NetworkInterfaceEndPoint address, IntPtr userData, ref NetworkSendQueueHandle sendQueueHandle)
{
var baselib = (BaselibData*)userData;
int index = handle.id;
var message = baselib->m_PayloadsTx.GetRequestFromHandle(index);
message.requestUserdata = (IntPtr) (index + 1);
message.payload.size = (uint)handle.size;
var addr = address;
UnsafeUtility.MemCpy((void*)message.remoteEndpoint.slice.data, addr.data, address.dataLength);
NetworkRequest* messagePtr = &message;
var error = default(ErrorState);
var count = (int) Binding.Baselib_RegisteredNetwork_Socket_UDP_ScheduleSend(
baselib->m_Socket,
messagePtr,
1u,
&error);
if (error.code != ErrorCode.Success)
{
baselib->m_PayloadsTx.ReleaseHandle(index);
return (int) error.code == -1 ? -1 : -(int) error.code;
}
return handle.size;
}
[BurstCompile(DisableDirectCall = true)]
[AOT.MonoPInvokeCallback(typeof(NetworkSendInterface.AbortSendMessageDelegate))]
private static unsafe void AbortSendMessage(ref NetworkInterfaceSendHandle handle, IntPtr userData)
{
var baselib = (BaselibData*)userData;
var id = handle.id;
baselib->m_PayloadsTx.ReleaseHandle(id);
}
bool ValidateParameters(BaselibNetworkParameter param)
{
if (param.receiveQueueCapacity <= 0)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
UnityEngine.Debug.LogWarning("Value for receiveQueueCapacity must be larger then zero.");
#endif
return false;
}
if (param.sendQueueCapacity <= 0)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
UnityEngine.Debug.LogWarning("Value for sendQueueCapacity must be larger then zero.");
#endif
return false;
}
return true;
}
/// <summary>
/// Tries to extract the BaselibNetworkParameter from the param's passed.
/// </summary>
/// <param name="config"></param>
/// <param name="param"></param>
/// <value>boolean indicating if the extration was successful or not.</value>
bool TryExtractParameters(out BaselibNetworkParameter config, params INetworkParameter[] param)
{
for (int i = 0; i < param.Length; ++i)
{
if (param[i] is BaselibNetworkParameter && ValidateParameters((BaselibNetworkParameter) param[i]))
{
config = (BaselibNetworkParameter) param[i];
return true;
}
}
config = default;
return false;
}
}
}
#endif // !UNITY_WEBGL

11
Packages/com.unity.transport/Runtime/BaselibNetworkInterface.cs.meta


fileFormatVersion: 2
guid: 700170a841a1d4c06914f899d021cd3a
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

1001
Packages/com.unity.transport/Runtime/DataStream.cs
文件差异内容过多而无法显示
查看文件

11
Packages/com.unity.transport/Runtime/DataStream.cs.meta


fileFormatVersion: 2
guid: a006f31a7b465d946a601f1d26b9d13c
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

57
Packages/com.unity.transport/Runtime/HMACSHA256.cs


using Unity.Collections;
namespace Unity.Networking.Transport
{
public static class HMACSHA256
{
/// <summary>
/// Writes 32 bytes to result using key and message
/// </summary>
/// <param name="keyValue">Key data</param>
/// <param name="keyArrayLength">Length of the key data</param>
/// <param name="messageBytes">Message to hash</param>
/// <param name="messageLength">Length of the message</param>
/// <param name="result">Where to write resulting 32 bytes hash</param>
public static unsafe void ComputeHash(byte* keyValue, int keyArrayLength, byte* messageBytes, int messageLength, byte* result)
{
const int B = 64;
const int sha256SizeBytes = 32;
const byte ipad = 0x36;
const byte opad = 0x5C;
var shorterKey = stackalloc byte[sha256SizeBytes];
var sha256State = SHA256.SHA256State.Create();
if (keyArrayLength > B)
{
sha256State.Update(keyValue, keyArrayLength);
sha256State.Final(shorterKey);
keyValue = shorterKey;
keyArrayLength = sha256SizeBytes;
}
var kx = stackalloc byte[B];
for (var i = 0; i < keyArrayLength; i++)
kx[i] = (byte) (ipad ^ keyValue[i]);
for (var i = keyArrayLength; i < B; i++)
kx[i] = ipad;
sha256State = SHA256.SHA256State.Create();
sha256State.Update(kx, B);
sha256State.Update(messageBytes, messageLength);
sha256State.Final(result);
for (var i = 0; i < keyArrayLength; i++)
kx[i] = (byte) (opad ^ keyValue[i]);
for (var i = keyArrayLength; i < B; i++)
kx[i] = opad;
sha256State = SHA256.SHA256State.Create();
sha256State.Update(kx, B);
sha256State.Update(result, sha256SizeBytes);
sha256State.Final(result);
}
}
}

3
Packages/com.unity.transport/Runtime/HMACSHA256.cs.meta


fileFormatVersion: 2
guid: b9cc5fb706844312b1315cf642b8c45c
timeCreated: 1623799100

153
Packages/com.unity.transport/Runtime/INetworkInterface.cs


using System;
using Unity.Networking.Transport.Protocols;
using Unity.Collections;
using Unity.Jobs;
using Unity.Burst;
using Unity.Collections.LowLevel.Unsafe;
using System.Runtime.InteropServices;
namespace Unity.Networking.Transport
{
/// <summary>
/// The NetworkPacketReceiver is an interface for handling received packets, needed by the <see cref="INetworkInterface"/>
/// </summary>
public struct NetworkPacketReceiver
{
public int ReceiveCount { get {return m_Driver.ReceiveCount;} set{m_Driver.ReceiveCount = value;} }
/// <summary>
/// AppendPacket is where we parse the data from the network into easy to handle events.
/// </summary>
/// <param name="address">The address of the endpoint we received data from.</param>
/// <param name="header">The header data indicating what type of packet it is. <see cref="UdpCHeader"/> for more information.</param>
/// <param name="dataLen">The size of the payload, if any.</param>
/// <returns></returns>
public int AppendPacket(NetworkInterfaceEndPoint address, int dataLen)
{
return m_Driver.AppendPacket(address, dataLen);
}
/// <summary>
/// Get the datastream associated with this Receiver.
/// </summary>
/// <returns>Returns a NativeList of bytes</returns>
public NativeList<byte> GetDataStream()
{
return m_Driver.GetDataStream();
}
public int GetDataStreamSize()
{
return m_Driver.GetDataStreamSize();
}
/// <summary>
/// Check if the DataStreamWriter uses dynamic allocations to automatically resize the buffers or not.
/// </summary>
/// <returns>True if its dynamically resizing the DataStreamWriter</returns>
public bool DynamicDataStreamSize()
{
return m_Driver.DynamicDataStreamSize();
}
/// <summary>
/// Check if an address is currently assosiated with a valid connection.
/// This is mostly useful to keep interface internal lists of connections in sync with the correct state.
/// </summary>
public bool IsAddressUsed(NetworkInterfaceEndPoint address)
{
return m_Driver.IsAddressUsed(address);
}
public long LastUpdateTime => m_Driver.LastUpdateTime;
public int ReceiveErrorCode { set{m_Driver.ReceiveErrorCode = value;} }
internal NetworkDriver m_Driver;
}
[Flags]
public enum SendHandleFlags
{
AllocatedByDriver = 1 << 0
}
public struct NetworkInterfaceSendHandle
{
public IntPtr data;
public int capacity;
public int size;
public int id;
public SendHandleFlags flags;
}
public struct NetworkSendQueueHandle
{
private IntPtr handle;
internal static unsafe NetworkSendQueueHandle ToTempHandle(NativeQueue<QueuedSendMessage>.ParallelWriter sendQueue)
{
void* ptr = UnsafeUtility.Malloc(UnsafeUtility.SizeOf<NativeQueue<QueuedSendMessage>.ParallelWriter>(), UnsafeUtility.AlignOf<NativeQueue<QueuedSendMessage>.ParallelWriter>(), Allocator.Temp);
UnsafeUtility.WriteArrayElement(ptr, 0, sendQueue);
return new NetworkSendQueueHandle { handle = (IntPtr)ptr };
}
public unsafe NativeQueue<QueuedSendMessage>.ParallelWriter FromHandle()
{
void* ptr = (void*)handle;
return UnsafeUtility.ReadArrayElement<NativeQueue<QueuedSendMessage>.ParallelWriter>(ptr, 0);
}
}
public struct NetworkSendInterface
{
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate int BeginSendMessageDelegate(out NetworkInterfaceSendHandle handle, IntPtr userData, int requiredPayloadSize);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate int EndSendMessageDelegate(ref NetworkInterfaceSendHandle handle, ref NetworkInterfaceEndPoint address, IntPtr userData, ref NetworkSendQueueHandle sendQueue);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void AbortSendMessageDelegate(ref NetworkInterfaceSendHandle handle, IntPtr userData);
public TransportFunctionPointer<BeginSendMessageDelegate> BeginSendMessage;
public TransportFunctionPointer<EndSendMessageDelegate> EndSendMessage;
public TransportFunctionPointer<AbortSendMessageDelegate> AbortSendMessage;
[NativeDisableUnsafePtrRestriction] public IntPtr UserData;
}
public interface INetworkInterface : IDisposable
{
NetworkInterfaceEndPoint LocalEndPoint { get; }
int Initialize(params INetworkParameter[] param);
/// <summary>
/// Schedule a ReceiveJob. This is used to read data from your supported medium and pass it to the AppendData function
/// supplied by <see cref="NetworkDriver"/>
/// </summary>
/// <param name="receiver">A <see cref="NetworkDriver"/> used to parse the data received.</param>
/// <param name="dep">A <see cref="JobHandle"/> to any dependency we might have.</param>
/// <returns>A <see cref="JobHandle"/> to our newly created ScheduleReceive Job.</returns>
JobHandle ScheduleReceive(NetworkPacketReceiver receiver, JobHandle dep);
/// <summary>
/// Schedule a SendJob. This is used to flush send queues to your supported medium
/// </summary>
/// <param name="sendQueue">The send queue which can be used to emulate parallel send.</param>
/// <param name="dep">A <see cref="JobHandle"/> to any dependency we might have.</param>
/// <returns>A <see cref="JobHandle"/> to our newly created ScheduleSend Job.</returns>
JobHandle ScheduleSend(NativeQueue<QueuedSendMessage> sendQueue, JobHandle dep);
/// <summary>
/// Binds the medium to a specific endpoint.
/// </summary>
/// <param name="endpoint">
/// A valid <see cref="NetworkInterfaceEndPoint"/>.
/// </param>
/// <returns>0 on Success</returns>
int Bind(NetworkInterfaceEndPoint endpoint);
/// <summary>
/// Start listening for incoming connections. This is normally a no-op for real UDP sockets.
/// </summary>
/// <returns>0 on Success</returns>
int Listen();
NetworkSendInterface CreateSendInterface();
int CreateInterfaceEndPoint(NetworkEndPoint address, out NetworkInterfaceEndPoint endpoint);
NetworkEndPoint GetGenericEndPoint(NetworkInterfaceEndPoint endpoint);
}
}

11
Packages/com.unity.transport/Runtime/INetworkInterface.cs.meta


fileFormatVersion: 2
guid: 7b096ed7046c56e419b59c93c44656ea
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

164
Packages/com.unity.transport/Runtime/IPCManager.cs


using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Unity.Collections;
using Unity.Networking.Transport.Utilities;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Jobs;
using Unity.Networking.Transport.Protocols;
using Random = Unity.Mathematics.Random;
namespace Unity.Networking.Transport
{
internal struct IPCManager
{
public static IPCManager Instance = new IPCManager();
[StructLayout(LayoutKind.Explicit)]
internal unsafe struct IPCData
{
[FieldOffset(0)] public int from;
[FieldOffset(4)] public int length;
[FieldOffset(8)] public fixed byte data[NetworkParameterConstants.MTU];
}
private NativeMultiQueue<IPCData> m_IPCQueue;
private NativeHashMap<ushort, int> m_IPCChannels;
internal static JobHandle ManagerAccessHandle;
public bool IsCreated => m_IPCQueue.IsCreated;
private int m_RefCount;
public void AddRef()
{
if (m_RefCount == 0)
{
m_IPCQueue = new NativeMultiQueue<IPCData>(128);
m_IPCChannels = new NativeHashMap<ushort, int>(64, Allocator.Persistent);
}
++m_RefCount;
}
public void Release()
{
--m_RefCount;
if (m_RefCount == 0)
{
ManagerAccessHandle.Complete();
m_IPCQueue.Dispose();
m_IPCChannels.Dispose();
}
}
internal unsafe void Update(NetworkInterfaceEndPoint local, NativeQueue<QueuedSendMessage> queue)
{
QueuedSendMessage val;
while (queue.TryDequeue(out val))
{
var ipcData = new IPCData();
UnsafeUtility.MemCpy(ipcData.data, val.Data, val.DataLength);
ipcData.length = val.DataLength;
ipcData.from = *(int*)local.data;
m_IPCQueue.Enqueue(*(int*)val.Dest.data, ipcData);
}
}
public unsafe NetworkInterfaceEndPoint CreateEndPoint(ushort port)
{
ManagerAccessHandle.Complete();
int id = 0;
if (port == 0)
{
while (id == 0)
{
port = RandomHelpers.GetRandomUShort();
if (!m_IPCChannels.TryGetValue(port, out _))
{
id = m_IPCChannels.Count() + 1;
m_IPCChannels.TryAdd(port, id);
}
}
}
else
{
if (!m_IPCChannels.TryGetValue(port, out id))
{
id = m_IPCChannels.Count() + 1;
m_IPCChannels.TryAdd(port, id);
}
}
var endpoint = default(NetworkInterfaceEndPoint);
endpoint.dataLength = 4;
*(int*) endpoint.data = id;
return endpoint;
}
public unsafe bool GetEndPointPort(NetworkInterfaceEndPoint ep, out ushort port)
{
ManagerAccessHandle.Complete();
int id = *(int*) ep.data;
var values = m_IPCChannels.GetValueArray(Allocator.Temp);
var keys = m_IPCChannels.GetKeyArray(Allocator.Temp);
port = 0;
for (var i = 0; i < m_IPCChannels.Count(); ++i)
{
if (values[i] == id)
{
port = keys[i];
return true;
}
}
return false;
}
public unsafe int PeekNext(NetworkInterfaceEndPoint local, void* slice, out int length, out NetworkInterfaceEndPoint from)
{
ManagerAccessHandle.Complete();
IPCData data;
from = default(NetworkInterfaceEndPoint);
length = 0;
if (m_IPCQueue.Peek(*(int*)local.data, out data))
{
UnsafeUtility.MemCpy(slice, data.data, data.length);
length = data.length;
}
GetEndPointByHandle(data.from, out from);
return length;
}
public unsafe int ReceiveMessageEx(NetworkInterfaceEndPoint local, void* payloadData, int payloadLen, ref NetworkInterfaceEndPoint remote)
{
IPCData data;
if (!m_IPCQueue.Peek(*(int*)local.data, out data))
return 0;
GetEndPointByHandle(data.from, out remote);
var totalLength = Math.Min(payloadLen, data.length);
UnsafeUtility.MemCpy(payloadData, data.data, totalLength);
if (totalLength < data.length)
return -1;
m_IPCQueue.Dequeue(*(int*)local.data, out data);
return totalLength;
}
private unsafe void GetEndPointByHandle(int handle, out NetworkInterfaceEndPoint endpoint)
{
var temp = default(NetworkInterfaceEndPoint);
temp.dataLength = 4;
*(int*)temp.data = handle;
endpoint = temp;
}
}
}

11
Packages/com.unity.transport/Runtime/IPCManager.cs.meta


fileFormatVersion: 2
guid: 0b251b44841f75a46be8967c77f9e8f8
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

197
Packages/com.unity.transport/Runtime/IPCNetworkInterface.cs


using System;
using AOT;
using Unity.Burst;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Jobs;
using Unity.Networking.Transport.Protocols;
namespace Unity.Networking.Transport
{
[BurstCompile]
public struct IPCNetworkInterface : INetworkInterface
{
[ReadOnly] private NativeArray<NetworkInterfaceEndPoint> m_LocalEndPoint;
public NetworkInterfaceEndPoint LocalEndPoint => m_LocalEndPoint[0];
public int CreateInterfaceEndPoint(NetworkEndPoint address, out NetworkInterfaceEndPoint endpoint)
{
if (!address.IsLoopback && !address.IsAny)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
throw new ArgumentException("IPC network driver can only handle loopback addresses");
#else
endpoint = default(NetworkInterfaceEndPoint);
return (int)Error.StatusCode.NetworkArgumentMismatch;
#endif
}
endpoint = IPCManager.Instance.CreateEndPoint(address.Port);
return (int)Error.StatusCode.Success;
}
public NetworkEndPoint GetGenericEndPoint(NetworkInterfaceEndPoint endpoint)
{
if (!IPCManager.Instance.GetEndPointPort(endpoint, out var port))
return default;
return NetworkEndPoint.LoopbackIpv4.WithPort(port);
}
public int Initialize(params INetworkParameter[] param)
{
IPCManager.Instance.AddRef();
m_LocalEndPoint = new NativeArray<NetworkInterfaceEndPoint>(1, Allocator.Persistent);
var ep = default(NetworkInterfaceEndPoint);
var result = 0;
if ((result = CreateInterfaceEndPoint(NetworkEndPoint.LoopbackIpv4, out ep)) != (int)Error.StatusCode.Success)
return result;
m_LocalEndPoint[0] = ep;
return 0;
}
public void Dispose()
{
m_LocalEndPoint.Dispose();
IPCManager.Instance.Release();
}
[BurstCompile]
struct SendUpdate : IJob
{
public IPCManager ipcManager;
public NativeQueue<QueuedSendMessage> ipcQueue;
[ReadOnly] public NativeArray<NetworkInterfaceEndPoint> localEndPoint;
public void Execute()
{
ipcManager.Update(localEndPoint[0], ipcQueue);
}
}
[BurstCompile]
struct ReceiveJob : IJob
{
public NetworkPacketReceiver receiver;
public IPCManager ipcManager;
public NetworkInterfaceEndPoint localEndPoint;
public unsafe void Execute()
{
var stream = receiver.GetDataStream();
receiver.ReceiveCount = 0;
receiver.ReceiveErrorCode = 0;
while (true)
{
int dataStreamSize = receiver.GetDataStreamSize();
if (receiver.DynamicDataStreamSize())
{
while (dataStreamSize+NetworkParameterConstants.MTU >= stream.Length)
stream.ResizeUninitialized(stream.Length*2);
}
else if (dataStreamSize >= stream.Length)
return;
var endpoint = default(NetworkInterfaceEndPoint);
var result = NativeReceive((byte*)stream.GetUnsafePtr() + dataStreamSize,
Math.Min(NetworkParameterConstants.MTU, stream.Length - dataStreamSize), ref endpoint);
if (result <= 0)
{
// FIXME: handle error
if (result < 0)
receiver.ReceiveErrorCode = 10040;
return;
}
receiver.ReceiveCount += receiver.AppendPacket(endpoint, result);
}
}
unsafe int NativeReceive(void* data, int length, ref NetworkInterfaceEndPoint address)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (length <= 0)
throw new ArgumentException("Can't receive into 0 bytes or less of buffer memory");
#endif
return ipcManager.ReceiveMessageEx(localEndPoint, data, length, ref address);
}
}
public JobHandle ScheduleReceive(NetworkPacketReceiver receiver, JobHandle dep)
{
var job = new ReceiveJob
{receiver = receiver, ipcManager = IPCManager.Instance, localEndPoint = LocalEndPoint};
dep = job.Schedule(JobHandle.CombineDependencies(dep, IPCManager.ManagerAccessHandle));
IPCManager.ManagerAccessHandle = dep;
return dep;
}
public JobHandle ScheduleSend(NativeQueue<QueuedSendMessage> sendQueue, JobHandle dep)
{
var sendJob = new SendUpdate {ipcManager = IPCManager.Instance, ipcQueue = sendQueue, localEndPoint = m_LocalEndPoint};
dep = sendJob.Schedule(JobHandle.CombineDependencies(dep, IPCManager.ManagerAccessHandle));
IPCManager.ManagerAccessHandle = dep;
return dep;
}
public unsafe int Bind(NetworkInterfaceEndPoint endpoint)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (endpoint.dataLength != 4 || *(int*)endpoint.data == 0)
throw new InvalidOperationException();
#endif
m_LocalEndPoint[0] = endpoint;
return 0;
}
public int Listen()
{
return 0;
}
static TransportFunctionPointer<NetworkSendInterface.BeginSendMessageDelegate> BeginSendMessageFunctionPointer = new TransportFunctionPointer<NetworkSendInterface.BeginSendMessageDelegate>(BeginSendMessage);
static TransportFunctionPointer<NetworkSendInterface.EndSendMessageDelegate> EndSendMessageFunctionPointer = new TransportFunctionPointer<NetworkSendInterface.EndSendMessageDelegate>(EndSendMessage);
static TransportFunctionPointer<NetworkSendInterface.AbortSendMessageDelegate> AbortSendMessageFunctionPointer = new TransportFunctionPointer<NetworkSendInterface.AbortSendMessageDelegate>(AbortSendMessage);
public NetworkSendInterface CreateSendInterface()
{
return new NetworkSendInterface
{
BeginSendMessage = BeginSendMessageFunctionPointer,
EndSendMessage = EndSendMessageFunctionPointer,
AbortSendMessage = AbortSendMessageFunctionPointer,
};
}
[BurstCompile(DisableDirectCall = true)]
[AOT.MonoPInvokeCallback(typeof(NetworkSendInterface.BeginSendMessageDelegate))]
private static unsafe int BeginSendMessage(out NetworkInterfaceSendHandle handle, IntPtr userData, int requiredPayloadSize)
{
handle.id = 0;
handle.size = 0;
handle.capacity = requiredPayloadSize;
handle.data = (IntPtr)UnsafeUtility.Malloc(handle.capacity, 8, Allocator.Temp);
handle.flags = default;
return 0;
}
[BurstCompile(DisableDirectCall = true)]
[AOT.MonoPInvokeCallback(typeof(NetworkSendInterface.EndSendMessageDelegate))]
private static unsafe int EndSendMessage(ref NetworkInterfaceSendHandle handle, ref NetworkInterfaceEndPoint address, IntPtr userData, ref NetworkSendQueueHandle sendQueueHandle)
{
var sendQueue = sendQueueHandle.FromHandle();
var msg = default(QueuedSendMessage);
msg.Dest = address;
msg.DataLength = handle.size;
UnsafeUtility.MemCpy(msg.Data, (void*)handle.data, handle.size);
sendQueue.Enqueue(msg);
return handle.size;
}
[BurstCompile(DisableDirectCall = true)]
[AOT.MonoPInvokeCallback(typeof(NetworkSendInterface.AbortSendMessageDelegate))]
private static void AbortSendMessage(ref NetworkInterfaceSendHandle handle, IntPtr userData)
{
}
}
}

11
Packages/com.unity.transport/Runtime/IPCNetworkInterface.cs.meta


fileFormatVersion: 2
guid: 137e5edeb5e8eea4ca03b69eb129165c
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

195
Packages/com.unity.transport/Runtime/NetworkCompressionModel.cs


using System;
using Unity.Collections;
using Unity.Mathematics;
namespace Unity.Networking.Transport
{
public unsafe struct NetworkCompressionModel : IDisposable
{
internal static readonly byte[] k_BucketSizes =
{
0, 0, 1, 2, 3, 4, 6, 8, 10, 12, 15, 18, 21, 24, 27, 32
};
internal static readonly uint[] k_BucketOffsets =
{
0, 1, 2, 4, 8, 16, 32, 96, 352, 1376, 5472, 38240, 300384, 2397536, 19174752, 153392480
};
internal static readonly int[] k_FirstBucketCandidate =
{
// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
15, 15, 15, 15, 14, 14, 14, 13, 13, 13, 12, 12, 12, 11, 11, 11, 10, 10, 10, 9, 9, 8, 8, 7, 7, 6, 5, 4, 3, 2, 1, 1, 0
};
internal static readonly byte[] k_DefaultModelData = { 16, // 16 symbols
2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 6, 6, 6,
0, 0 }; // no contexts
internal const int k_AlphabetSize = 16;
internal const int k_MaxHuffmanSymbolLength = 6;
internal const int k_MaxContexts = 1;
public void Dispose()
{
}
public NetworkCompressionModel(Allocator allocator)
{
for (int i = 0; i < k_AlphabetSize; ++i)
{
bucketSizes[i] = k_BucketSizes[i];
bucketOffsets[i] = k_BucketOffsets[i];
}
byte[] modelData = k_DefaultModelData;
//int numContexts = NetworkConfig.maxContexts;
int numContexts = 1;
byte[,] symbolLengths = new byte[numContexts, k_AlphabetSize];
int readOffset = 0;
{
// default model
int defaultModelAlphabetSize = modelData[readOffset++];
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (defaultModelAlphabetSize != k_AlphabetSize)
throw new InvalidOperationException("The alphabet size of compression models must be " + k_AlphabetSize);
#endif
for (int i = 0; i < k_AlphabetSize; i++)
{
byte length = modelData[readOffset++];
for (int context = 0; context < numContexts; context++)
{
symbolLengths[context, i] = length;
}
}
// other models
int numModels = modelData[readOffset] | (modelData[readOffset + 1] << 8);
readOffset += 2;
for (int model = 0; model < numModels; model++)
{
int context = modelData[readOffset] | (modelData[readOffset + 1] << 8);
readOffset += 2;
int modelAlphabetSize = modelData[readOffset++];
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (modelAlphabetSize != k_AlphabetSize)
throw new InvalidOperationException("The alphabet size of compression models must be " + k_AlphabetSize);
#endif
for (int i = 0; i < k_AlphabetSize; i++)
{
byte length = modelData[readOffset++];
symbolLengths[context, i] = length;
}
}
}
// generate tables
var tmpSymbolLengths = new byte[k_AlphabetSize];
var tmpSymbolDecodeTable = new ushort[1 << k_MaxHuffmanSymbolLength];
var symbolCodes = new byte[k_AlphabetSize];
for (int context = 0; context < numContexts; context++)
{
for (int i = 0; i < k_AlphabetSize; i++)
tmpSymbolLengths[i] = symbolLengths[context, i];
GenerateHuffmanCodes(symbolCodes, 0, tmpSymbolLengths, 0, k_AlphabetSize, k_MaxHuffmanSymbolLength);
GenerateHuffmanDecodeTable(tmpSymbolDecodeTable, 0, tmpSymbolLengths, symbolCodes, k_AlphabetSize, k_MaxHuffmanSymbolLength);
for (int i = 0; i < k_AlphabetSize; i++)
{
encodeTable[context * k_AlphabetSize + i] = (ushort)((symbolCodes[i] << 8) | symbolLengths[context, i]);
}
for (int i = 0; i < (1 << k_MaxHuffmanSymbolLength); i++)
{
decodeTable[context * (1 << k_MaxHuffmanSymbolLength) + i] = tmpSymbolDecodeTable[i];
}
}
}
private static void GenerateHuffmanCodes(byte[] symboLCodes, int symbolCodesOffset, byte[] symbolLengths, int symbolLengthsOffset, int alphabetSize, int maxCodeLength)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (alphabetSize > 256 || maxCodeLength > 8)
throw new InvalidOperationException("Can only generate huffman codes up to alphabet size 256 and maximum code length 8");
#endif
var lengthCounts = new byte[maxCodeLength + 1];
var symbolList = new byte[maxCodeLength + 1, alphabetSize];
//byte[] symbol_list[(MAX_HUFFMAN_CODE_LENGTH + 1u) * MAX_NUM_HUFFMAN_SYMBOLS];
for (int symbol = 0; symbol < alphabetSize; symbol++)
{
int length = symbolLengths[symbol + symbolLengthsOffset];
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (length > maxCodeLength)
throw new InvalidOperationException("Maximum code length exceeded");
#endif
symbolList[length, lengthCounts[length]++] = (byte)symbol;
}
uint nextCodeWord = 0;
for (int length = 1; length <= maxCodeLength; length++)
{
int length_count = lengthCounts[length];
for (int i = 0; i < length_count; i++)
{
int symbol = symbolList[length, i];
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (symbolLengths[symbol + symbolLengthsOffset] != length)
throw new InvalidOperationException("Incorrect symbol length");
#endif
symboLCodes[symbol + symbolCodesOffset] = (byte)ReverseBits(nextCodeWord++, length);
}
nextCodeWord <<= 1;
}
}
private static uint ReverseBits(uint value, int num_bits)
{
value = ((value & 0x55555555u) << 1) | ((value & 0xAAAAAAAAu) >> 1);
value = ((value & 0x33333333u) << 2) | ((value & 0xCCCCCCCCu) >> 2);
value = ((value & 0x0F0F0F0Fu) << 4) | ((value & 0xF0F0F0F0u) >> 4);
value = ((value & 0x00FF00FFu) << 8) | ((value & 0xFF00FF00u) >> 8);
value = (value << 16) | (value >> 16);
return value >> (32 - num_bits);
}
// decode table entries: (symbol << 8) | length
private static void GenerateHuffmanDecodeTable(ushort[] decodeTable, int decodeTableOffset, byte[] symbolLengths, byte[] symbolCodes, int alphabetSize, int maxCodeLength)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (alphabetSize > 256 || maxCodeLength > 8)
throw new InvalidOperationException("Can only generate huffman codes up to alphabet size 256 and maximum code length 8");
#endif
uint maxCode = 1u << maxCodeLength;
for (int symbol = 0; symbol < alphabetSize; symbol++)
{
int length = symbolLengths[symbol];
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (length > maxCodeLength)
throw new InvalidOperationException("Maximum code length exceeded");
#endif
if (length > 0)
{
uint code = symbolCodes[symbol];
uint step = 1u << length;
do
{
decodeTable[decodeTableOffset + code] = (ushort)(symbol << 8 | length);
code += step;
} while (code < maxCode);
}
}
}
public fixed ushort encodeTable[k_MaxContexts * k_AlphabetSize];
public fixed ushort decodeTable[k_MaxContexts * (1 << k_MaxHuffmanSymbolLength)];
public fixed byte bucketSizes[k_AlphabetSize];
public fixed uint bucketOffsets[k_AlphabetSize];
public int CalculateBucket(uint value)
{
int bucketIndex = k_FirstBucketCandidate[math.lzcnt(value)];
if (bucketIndex + 1 < k_AlphabetSize && value >= bucketOffsets[bucketIndex + 1])
bucketIndex++;
return bucketIndex;
}
}
}

11
Packages/com.unity.transport/Runtime/NetworkCompressionModel.cs.meta


fileFormatVersion: 2
guid: 8f1beb626cc4d46cea7be796acb882b8
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

142
Packages/com.unity.transport/Runtime/NetworkConnection.cs


using Unity.Collections;
namespace Unity.Networking.Transport
{
namespace Error
{
/// <summary>
/// DisconnectReason enumerates all disconnect reasons.
/// </summary>
public enum DisconnectReason : byte
{
/// <summary>Indicates a normal disconnection as a result of calling Disconnect on the connection.</summary>
Default, // don't assign explicit values
/// <summary>Indicates the connection timed out.</summary>
Timeout,
/// <summary>Indicates the connection failed to establish a connection after <see cref="NetworkConfigParameter.maxConnectAttempts"/>.</summary>
MaxConnectionAttempts,
/// <summary>Indicates the connection was closed remotely.</summary>
ClosedByRemote,
/// <summary>Used only for count. Keep last and don't assign explicit values</summary>
Count
}
public enum StatusCode
{
Success = 0,
NetworkIdMismatch = -1,
NetworkVersionMismatch = -2,
NetworkStateMismatch = -3,
NetworkPacketOverflow = -4,
NetworkSendQueueFull = -5,
NetworkHeaderInvalid = -6,
NetworkDriverParallelForErr = -7,
NetworkSendHandleInvalid = -8,
NetworkArgumentMismatch = -9,
}
}
/// <summary>
/// The NetworkConnection is a struct that hold all information needed by the driver to link it with a virtual
/// connection. The NetworkConnection is a public representation of a connection.
/// </summary>
public struct NetworkConnection
{
internal int m_NetworkId;
internal int m_NetworkVersion;
/// <summary>
/// ConnectionState enumerates available connection states a connection can have.
/// </summary>
public enum State
{
/// <summary>Indicates the connection is disconnected</summary>
Disconnected,
/// <summary>Indicates the connection is trying to connect.</summary>
Connecting,
/// <summary>Indicates the connection is waiting for a connection response. </summary>
AwaitingResponse,
/// <summary>Indicates the connection is connected.. </summary>
Connected
}
/// <summary>
/// Disconnects a virtual connection and marks it for deletion. This connection will be removed on next the next frame.
/// </summary>
/// <param name="driver">The driver that owns the virtual connection.</param>
public int Disconnect(NetworkDriver driver)
{
return driver.Disconnect(this);
}
/// <summary>
/// Receive an event for this specific connection. Should be called until it returns <see cref="NetworkEvent.Type.Empty"/>, even if the socket is disconnected.
/// </summary>
/// <param name="driver">The driver that owns the virtual connection.</param>
/// <param name="strm">A DataStreamReader, that will only be populated if a <see cref="NetworkEvent.Type.Data"/>
/// event was received.
/// </param>
public NetworkEvent.Type PopEvent(NetworkDriver driver, out DataStreamReader stream)
{
return driver.PopEventForConnection(this, out stream);
}
public NetworkEvent.Type PopEvent(NetworkDriver driver, out DataStreamReader stream, out NetworkPipeline pipeline)
{
return driver.PopEventForConnection(this, out stream, out pipeline);
}
/// <summary>
/// Close an active NetworkConnection, similar to <see cref="Disconnect{T}"/>.
/// </summary>
/// <param name="driver">The driver that owns the virtual connection.</param>
public int Close(NetworkDriver driver)
{
if (m_NetworkId >= 0)
return driver.Disconnect(this);
return -1;
}
/// <summary>
/// Check to see if a NetworkConnection is Created.
/// </summary>
/// <returns>`true` if the NetworkConnection has been correctly created by a call to
/// <see cref="NetworkDriver.Accept"/> or <see cref="NetworkDriver.Connect"/></returns>
public bool IsCreated
{
get { return m_NetworkVersion != 0; }
}
public State GetState(NetworkDriver driver)
{
return driver.GetConnectionState(this);
}
public static bool operator ==(NetworkConnection lhs, NetworkConnection rhs)
{
return lhs.m_NetworkId == rhs.m_NetworkId && lhs.m_NetworkVersion == rhs.m_NetworkVersion;
}
public static bool operator !=(NetworkConnection lhs, NetworkConnection rhs)
{
return lhs.m_NetworkId != rhs.m_NetworkId || lhs.m_NetworkVersion != rhs.m_NetworkVersion;
}
public override bool Equals(object o)
{
return this == (NetworkConnection)o;
}
public bool Equals(NetworkConnection o)
{
return this == o;
}
public override int GetHashCode()
{
return (m_NetworkId << 8) ^ m_NetworkVersion;
}
public int InternalId => m_NetworkId;
}
}

11
Packages/com.unity.transport/Runtime/NetworkConnection.cs.meta


fileFormatVersion: 2
guid: 6823092a5f3f3ce4b858282568abac60
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

1001
Packages/com.unity.transport/Runtime/NetworkDriver.cs
文件差异内容过多而无法显示
查看文件

11
Packages/com.unity.transport/Runtime/NetworkDriver.cs.meta


fileFormatVersion: 2
guid: 38c540f496ff7824c96d4ba0d3bac37e
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

403
Packages/com.unity.transport/Runtime/NetworkEndPoint.cs


using System;
using System.Runtime.InteropServices;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Mathematics;
using Unity.Baselib;
using Unity.Baselib.LowLevel;
using ErrorState = Unity.Baselib.LowLevel.Binding.Baselib_ErrorState;
namespace Unity.Networking.Transport
{
/// <summary>
/// NetworkFamily indicates what type of underlying medium we are using.
/// </summary>
public enum NetworkFamily
{
Invalid = 0,
Ipv4 = 2,
Ipv6 = 23
}
[StructLayout(LayoutKind.Sequential)]
public unsafe struct NetworkEndPoint
{
enum AddressType { Any = 0, Loopback = 1 }
private const int rawIpv4Length = 4;
private const int rawIpv6Length = 16;
private const int rawDataLength = 16; // Maximum space needed to hold a IPv6 Address
#if !UNITY_2021_1_OR_NEWER && !UNITY_DOTSRUNTIME
private const int rawLength = rawDataLength + 4; // SizeOf<Baselib_NetworkAddress>
#else
private const int rawLength = rawDataLength + 8; // SizeOf<Baselib_NetworkAddress>
#endif
private static readonly bool IsLittleEndian = true;
internal Binding.Baselib_NetworkAddress rawNetworkAddress;
public int length;
static NetworkEndPoint()
{
uint test = 1;
byte* test_b = (byte*) &test;
IsLittleEndian = test_b[0] == 1;
}
public ushort Port
{
get => (ushort) (rawNetworkAddress.port1 | (rawNetworkAddress.port0 << 8));
set
{
rawNetworkAddress.port0 = (byte)((value >> 8) & 0xff);
rawNetworkAddress.port1 = (byte)(value & 0xff);
}
}
public NetworkFamily Family
{
get => FromBaselibFamily((Binding.Baselib_NetworkAddress_Family)rawNetworkAddress.family);
set => rawNetworkAddress.family = (byte)ToBaselibFamily(value);
}
public NativeArray<byte> GetRawAddressBytes()
{
if (Family == NetworkFamily.Ipv4)
{
var bytes = new NativeArray<byte>(rawIpv4Length, Allocator.Temp);
UnsafeUtility.MemCpy(bytes.GetUnsafePtr(), UnsafeUtility.AddressOf(ref rawNetworkAddress), rawIpv4Length);
return bytes;
}
else if (Family == NetworkFamily.Ipv6)
{
var bytes = new NativeArray<byte>(rawIpv6Length, Allocator.Temp);
UnsafeUtility.MemCpy(bytes.GetUnsafePtr(), UnsafeUtility.AddressOf(ref rawNetworkAddress), rawIpv6Length);
return bytes;
}
return default;
}
public void SetRawAddressBytes(NativeArray<byte> bytes, NetworkFamily family = NetworkFamily.Ipv4)
{
if (family == NetworkFamily.Ipv4)
{
if (bytes.Length != rawIpv4Length)
throw new InvalidOperationException($"Bad input length, a ipv4 address is 4 bytes long not {bytes.Length}");
UnsafeUtility.MemCpy(UnsafeUtility.AddressOf(ref rawNetworkAddress), bytes.GetUnsafeReadOnlyPtr(), rawIpv4Length);
Family = family;
}
else if (family == NetworkFamily.Ipv6)
{
if (bytes.Length != rawIpv6Length)
throw new InvalidOperationException($"Bad input length, a ipv6 address is 16 bytes long not {bytes.Length}");
UnsafeUtility.MemCpy(UnsafeUtility.AddressOf(ref rawNetworkAddress), bytes.GetUnsafeReadOnlyPtr(), rawIpv6Length);
Family = family;
}
}
public ushort RawPort
{
get
{
ushort *port = (ushort*)((byte*) UnsafeUtility.AddressOf(ref rawNetworkAddress) + rawDataLength);
return *port;
}
set
{
ushort *port = (ushort*)((byte*) UnsafeUtility.AddressOf(ref rawNetworkAddress) + rawDataLength);
*port = value;
}
}
public string Address => AddressAsString();
public bool IsValid => Family != 0;
public static NetworkEndPoint AnyIpv4 => CreateAddress(0);
public static NetworkEndPoint LoopbackIpv4 => CreateAddress(0, AddressType.Loopback);
public static NetworkEndPoint AnyIpv6 => CreateAddress(0, AddressType.Any, NetworkFamily.Ipv6);
public static NetworkEndPoint LoopbackIpv6 => CreateAddress(0, AddressType.Loopback, NetworkFamily.Ipv6);
public NetworkEndPoint WithPort(ushort port)
{
var ep = this;
ep.Port = port;
return ep;
}
public bool IsLoopback => (this == LoopbackIpv4.WithPort(Port)) || (this == LoopbackIpv6.WithPort(Port));
public bool IsAny => (this == AnyIpv4.WithPort(Port)) || (this == AnyIpv6.WithPort(Port));
// Returns true if we can fully parse the input and return a valid endpoint
public static bool TryParse(string address, ushort port, out NetworkEndPoint endpoint, NetworkFamily family = NetworkFamily.Ipv4)
{
UnsafeUtility.SizeOf<Binding.Baselib_NetworkAddress>();
endpoint = default(NetworkEndPoint);
var nullTerminator = '\0';
var errorState = default(ErrorState);
var ipBytes = System.Text.Encoding.UTF8.GetBytes(address + nullTerminator);
fixed (byte* ipBytesPtr = ipBytes)
fixed (Binding.Baselib_NetworkAddress* rawAddress = &endpoint.rawNetworkAddress)
{
Binding.Baselib_NetworkAddress_Encode(
rawAddress,
ToBaselibFamily(family),
ipBytesPtr,
(ushort) port,
&errorState);
}
if (errorState.code != Binding.Baselib_ErrorCode.Success)
{
return false;
}
return endpoint.IsValid;
}
// Returns a default address if parsing fails
public static NetworkEndPoint Parse(string address, ushort port, NetworkFamily family = NetworkFamily.Ipv4)
{
if (TryParse(address, port, out var endpoint, family))
return endpoint;
return default;
}
public static bool operator ==(NetworkEndPoint lhs, NetworkEndPoint rhs)
{
return lhs.Compare(rhs);
}
public static bool operator !=(NetworkEndPoint lhs, NetworkEndPoint rhs)
{
return !lhs.Compare(rhs);
}
public override bool Equals(object other)
{
return this == (NetworkEndPoint) other;
}
public override int GetHashCode()
{
var p = (byte*) UnsafeUtility.AddressOf(ref rawNetworkAddress);
unchecked
{
var result = 0;
for (int i = 0; i < rawLength; i++)
{
result = (result * 31) ^ (int)p[i];
}
return result;
}
}
bool Compare(NetworkEndPoint other)
{
var p = (byte*) UnsafeUtility.AddressOf(ref rawNetworkAddress);
var p1 = (byte*) UnsafeUtility.AddressOf(ref other.rawNetworkAddress);
return UnsafeUtility.MemCmp(p, p1, rawLength) == 0;
}
static void AppendHex(ref FixedString128 str, ushort val)
{
int shamt = 12;
// Find the first non-zero nibble
while (shamt > 0)
{
if (((val>>shamt)&0xf) != 0)
break;
shamt -= 4;
}
while (shamt >= 0)
{
var nibble = (val>>shamt)&0xf;
if (nibble >= 10)
str.Add((byte)('a' + nibble - 10));
else
str.Add((byte)('0' + nibble));
shamt -= 4;
}
}
internal static FixedString128 AddressToString(in Binding.Baselib_NetworkAddress rawNetworkAddress)
{
FixedString128 str = default;
FixedString32 dot = ".";
FixedString32 colon = ":";
FixedString32 opensqb = "[";
FixedString32 closesqb = "]";
switch ((Binding.Baselib_NetworkAddress_Family)rawNetworkAddress.family)
{
case Binding.Baselib_NetworkAddress_Family.IPv4:
// TODO(steve): Update to use ipv4_0 ... 3 when its available.
str.Append(rawNetworkAddress.data0);
str.Append(dot);
str.Append(rawNetworkAddress.data1);
str.Append(dot);
str.Append(rawNetworkAddress.data2);
str.Append(dot);
str.Append(rawNetworkAddress.data3);
str.Append(colon);
str.Append((ushort) (rawNetworkAddress.port1 | (rawNetworkAddress.port0 << 8)));
break;
case Binding.Baselib_NetworkAddress_Family.IPv6:
// TODO(steve): Include scope and handle leading zeros
// TODO(steve): Update to use ipv6_0 ... 15 when its available.
str.Append(opensqb);
AppendHex(ref str, (ushort)(rawNetworkAddress.data1 | (rawNetworkAddress.data0 << 8)));
str.Append(colon);
AppendHex(ref str, (ushort)(rawNetworkAddress.data3 | (rawNetworkAddress.data2 << 8)));
str.Append(colon);
AppendHex(ref str, (ushort)(rawNetworkAddress.data5 | (rawNetworkAddress.data4 << 8)));
str.Append(colon);
AppendHex(ref str, (ushort)(rawNetworkAddress.data7 | (rawNetworkAddress.data6 << 8)));
str.Append(colon);
AppendHex(ref str, (ushort)(rawNetworkAddress.data9 | (rawNetworkAddress.data8 << 8)));
str.Append(colon);
AppendHex(ref str, (ushort)(rawNetworkAddress.data11 | (rawNetworkAddress.data10 << 8)));
str.Append(colon);
AppendHex(ref str, (ushort)(rawNetworkAddress.data13 | (rawNetworkAddress.data12 << 8)));
str.Append(colon);
AppendHex(ref str, (ushort)(rawNetworkAddress.data15 | (rawNetworkAddress.data14 << 8)));
str.Append(colon);
str.Append(closesqb);
str.Append(colon);
str.Append((ushort) (rawNetworkAddress.port1 | (rawNetworkAddress.port0 << 8)));
break;
default:
// TODO(steve): Throw an exception?
break;
}
return str;
}
private string AddressAsString()
{
return AddressToString(rawNetworkAddress).ToString();
}
private static ushort ByteSwap(ushort val)
{
return (ushort) (((val & 0xff) << 8) | (val >> 8));
}
private static uint ByteSwap(uint val)
{
return (uint) (((val & 0xff) << 24) | ((val & 0xff00) << 8) | ((val >> 8) & 0xff00) | (val >> 24));
}
static NetworkEndPoint CreateAddress(ushort port, AddressType type = AddressType.Any, NetworkFamily family = NetworkFamily.Ipv4)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
UnityEngine.Debug.Assert(UnsafeUtility.SizeOf<Binding.Baselib_NetworkAddress>() == rawLength);
#endif
if (family == NetworkFamily.Invalid)
return default;
uint ipv4Loopback = (127 << 24) | 1;
if (IsLittleEndian)
{
port = ByteSwap(port);
ipv4Loopback = ByteSwap(ipv4Loopback);
}
var ep = new NetworkEndPoint
{
Family = family,
RawPort = port,
length = rawLength
};
if (type == AddressType.Loopback)
{
if (family == NetworkFamily.Ipv4)
{
*(uint*) UnsafeUtility.AddressOf(ref ep.rawNetworkAddress) = ipv4Loopback;
}
else if (family == NetworkFamily.Ipv6)
{
ep.rawNetworkAddress.data15 = 1;
}
}
return ep;
}
static NetworkFamily FromBaselibFamily(Binding.Baselib_NetworkAddress_Family family)
{
if (family == Binding.Baselib_NetworkAddress_Family.IPv4)
return NetworkFamily.Ipv4;
if (family == Binding.Baselib_NetworkAddress_Family.IPv6)
return NetworkFamily.Ipv6;
return NetworkFamily.Invalid;
}
static Binding.Baselib_NetworkAddress_Family ToBaselibFamily(NetworkFamily family)
{
if (family == NetworkFamily.Ipv4)
return Binding.Baselib_NetworkAddress_Family.IPv4;
if (family == NetworkFamily.Ipv6)
return Binding.Baselib_NetworkAddress_Family.IPv6;
return Binding.Baselib_NetworkAddress_Family.Invalid;
}
}
public unsafe struct NetworkInterfaceEndPoint : IEquatable<NetworkInterfaceEndPoint>
{
public const int k_MaxLength = 56;
public int dataLength;
public fixed byte data[k_MaxLength];
public bool IsValid => dataLength != 0;
public static bool operator ==(NetworkInterfaceEndPoint lhs, NetworkInterfaceEndPoint rhs)
{
return lhs.Equals(rhs);
}
public static bool operator !=(NetworkInterfaceEndPoint lhs, NetworkInterfaceEndPoint rhs)
{
return !lhs.Equals(rhs);
}
public override bool Equals(object other)
{
return Equals((NetworkInterfaceEndPoint) other);
}
public override int GetHashCode()
{
fixed (byte* p = data)
unchecked
{
var result = 0;
for (int i = 0; i < dataLength; i++)
{
result = (result * 31) ^ (int)p[i];
}
return result;
}
}
public bool Equals(NetworkInterfaceEndPoint other)
{
// baselib doesn't return consistent lengths under posix, so lengths can
// only be used as a shortcut if only one addresses a blank.
if (dataLength != other.dataLength && (dataLength <= 0 || other.dataLength <= 0))
return false;
fixed (void* p = this.data)
{
return UnsafeUtility.MemCmp(p, other.data, math.min(dataLength, other.dataLength)) == 0;
}
}
}
}

11
Packages/com.unity.transport/Runtime/NetworkEndPoint.cs.meta


fileFormatVersion: 2
guid: f4fec373e8214414a96bbb300a2dcaac
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

274
Packages/com.unity.transport/Runtime/NetworkEventQueue.cs


using System;
using System.Runtime.InteropServices;
using System.Threading;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
namespace Unity.Networking.Transport
{
[StructLayout(LayoutKind.Explicit)]
public struct NetworkEvent
{
/// <summary>
/// NetworkEvent.Type enumerates available network events for this driver.
/// </summary>
public enum Type : short
{
Empty = 0,
Data,
Connect,
Disconnect
}
[FieldOffset(0)] public Type type;
[FieldOffset(2)] public short pipelineId;
[FieldOffset(4)] public int connectionId;
[FieldOffset(8)] public int status;
[FieldOffset(8)] public int offset;
[FieldOffset(12)] public int size;
}
public struct NetworkEventQueue : IDisposable
{
private int MaxEvents {
get { return m_ConnectionEventQ.Length / (m_ConnectionEventHeadTail.Length/2); }
}
public NetworkEventQueue(int queueSizePerConnection)
{
m_MasterEventQ = new NativeQueue<SubQueueItem>(Allocator.Persistent);
m_ConnectionEventQ = new NativeList<NetworkEvent>(queueSizePerConnection, Allocator.Persistent);
m_ConnectionEventHeadTail = new NativeList<int>(2, Allocator.Persistent);
m_ConnectionEventQ.ResizeUninitialized(queueSizePerConnection);
m_ConnectionEventHeadTail.Add(0);
m_ConnectionEventHeadTail.Add(0);
}
public void Dispose()
{
m_MasterEventQ.Dispose();
m_ConnectionEventQ.Dispose();
m_ConnectionEventHeadTail.Dispose();
}
// The returned stream is valid until PopEvent is called again or until the main driver updates
public NetworkEvent.Type PopEvent(out int id, out int offset, out int size)
{
return PopEvent(out id, out offset, out size, out var _);
}
public NetworkEvent.Type PopEvent(out int id, out int offset, out int size, out int pipelineId)
{
offset = 0;
size = 0;
id = -1;
pipelineId = 0;
while (true)
{
SubQueueItem ev;
if (!m_MasterEventQ.TryDequeue(out ev))
{
return NetworkEvent.Type.Empty;
}
if (m_ConnectionEventHeadTail[ev.connection * 2] == ev.idx)
{
id = ev.connection;
return PopEventForConnection(ev.connection, out offset, out size, out pipelineId);
}
}
}
public NetworkEvent.Type PopEventForConnection(int connectionId, out int offset, out int size)
{
return PopEventForConnection(connectionId, out offset, out size, out var _);
}
public NetworkEvent.Type PopEventForConnection(int connectionId, out int offset, out int size, out int pipelineId)
{
offset = 0;
size = 0;
pipelineId = 0;
if (connectionId < 0 || connectionId >= m_ConnectionEventHeadTail.Length / 2)
return NetworkEvent.Type.Empty;
int idx = m_ConnectionEventHeadTail[connectionId * 2];
if (idx >= m_ConnectionEventHeadTail[connectionId * 2 + 1])
return NetworkEvent.Type.Empty;
m_ConnectionEventHeadTail[connectionId * 2] = idx + 1;
NetworkEvent ev = m_ConnectionEventQ[connectionId * MaxEvents + idx];
pipelineId = ev.pipelineId;
if (ev.type == NetworkEvent.Type.Data)
{
offset = ev.offset;
size = ev.size;
}
else if (ev.type == NetworkEvent.Type.Disconnect && ev.status != (int)Error.DisconnectReason.Default)
{
offset = -ev.status;
}
return ev.type;
}
public int GetCountForConnection(int connectionId)
{
if (connectionId < 0 || connectionId >= m_ConnectionEventHeadTail.Length / 2)
return 0;
return m_ConnectionEventHeadTail[connectionId * 2 + 1] - m_ConnectionEventHeadTail[connectionId * 2];
}
/// ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
/// internal helper functions ::::::::::::::::::::::::::::::::::::::::::
public void PushEvent(NetworkEvent ev)
{
int curMaxEvents = MaxEvents;
if (ev.connectionId >= m_ConnectionEventHeadTail.Length / 2)
{
// Connection id out of range, grow the number of connections in the queue
int oldSize = m_ConnectionEventHeadTail.Length;
m_ConnectionEventHeadTail.ResizeUninitialized((ev.connectionId + 1)*2);
for (;oldSize < m_ConnectionEventHeadTail.Length; ++oldSize)
m_ConnectionEventHeadTail[oldSize] = 0;
m_ConnectionEventQ.ResizeUninitialized((m_ConnectionEventHeadTail.Length / 2) * curMaxEvents);
}
int idx = m_ConnectionEventHeadTail[ev.connectionId * 2 + 1];
if (idx >= curMaxEvents)
{
// Grow the max items per queue and remap the queues
int oldMax = curMaxEvents;
while (idx >= curMaxEvents)
curMaxEvents *= 2;
int maxConnections = m_ConnectionEventHeadTail.Length / 2;
m_ConnectionEventQ.ResizeUninitialized(maxConnections * curMaxEvents);
for (int con = maxConnections-1; con >= 0; --con)
{
for (int i = m_ConnectionEventHeadTail[con*2+1]-1; i >= m_ConnectionEventHeadTail[con * 2]; --i)
{
m_ConnectionEventQ[con * curMaxEvents + i] = m_ConnectionEventQ[con * oldMax + i];
}
}
}
m_ConnectionEventQ[ev.connectionId * curMaxEvents + idx] = ev;
m_ConnectionEventHeadTail[ev.connectionId * 2 + 1] = idx + 1;
m_MasterEventQ.Enqueue(new SubQueueItem {connection = ev.connectionId, idx = idx});
}
internal void Clear()
{
m_MasterEventQ.Clear();
for (int i = 0; i < m_ConnectionEventHeadTail.Length; ++i)
{
m_ConnectionEventHeadTail[i] = 0;
}
}
struct SubQueueItem
{
public int connection;
public int idx;
}
private NativeQueue<SubQueueItem> m_MasterEventQ;
private NativeList<NetworkEvent> m_ConnectionEventQ;
private NativeList<int> m_ConnectionEventHeadTail;
public Concurrent ToConcurrent()
{
Concurrent concurrent;
concurrent.m_ConnectionEventQ = m_ConnectionEventQ;
concurrent.m_ConnectionEventHeadTail = new Concurrent.ConcurrentConnectionQueue(m_ConnectionEventHeadTail);
return concurrent;
}
public struct Concurrent
{
[NativeContainer]
[NativeContainerIsAtomicWriteOnly]
internal unsafe struct ConcurrentConnectionQueue
{
[NativeDisableUnsafePtrRestriction] private UnsafeList* m_ConnectionEventHeadTail;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
private AtomicSafetyHandle m_Safety;
#endif
public ConcurrentConnectionQueue(NativeList<int> queue)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
m_Safety = NativeListUnsafeUtility.GetAtomicSafetyHandle(ref queue);
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
#endif
m_ConnectionEventHeadTail = (UnsafeList*) NativeListUnsafeUtility.GetInternalListDataPtrUnchecked(ref queue);
}
public int Length
{
get { return m_ConnectionEventHeadTail->Length; }
}
public int Dequeue(int connectionId)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
#endif
int idx = -1;
if (connectionId < 0 || connectionId >= m_ConnectionEventHeadTail->Length / 2)
return -1;
while (idx < 0)
{
idx = ((int*)m_ConnectionEventHeadTail->Ptr)[connectionId * 2];
if (idx >= ((int*)m_ConnectionEventHeadTail->Ptr)[connectionId * 2 + 1])
return -1;
if (Interlocked.CompareExchange(ref ((int*)m_ConnectionEventHeadTail->Ptr)[connectionId * 2], idx + 1,
idx) != idx)
idx = -1;
}
return idx;
}
}
private int MaxEvents {
get { return m_ConnectionEventQ.Length / (m_ConnectionEventHeadTail.Length/2); }
}
public NetworkEvent.Type PopEventForConnection(int connectionId, out int offset, out int size)
{
return PopEventForConnection(connectionId, out offset, out size, out var _);
}
public NetworkEvent.Type PopEventForConnection(int connectionId, out int offset, out int size, out int pipelineId)
{
offset = 0;
size = 0;
pipelineId = 0;
int idx = m_ConnectionEventHeadTail.Dequeue(connectionId);
if (idx < 0)
return NetworkEvent.Type.Empty;
NetworkEvent ev = m_ConnectionEventQ[connectionId * MaxEvents + idx];
pipelineId = ev.pipelineId;
if (ev.type == NetworkEvent.Type.Data)
{
offset = ev.offset;
size = ev.size;
}
else if (ev.type == NetworkEvent.Type.Disconnect && ev.status != (int)Error.DisconnectReason.Default)
{
offset = -ev.status;
}
return ev.type;
}
[ReadOnly] internal NativeList<NetworkEvent> m_ConnectionEventQ;
internal ConcurrentConnectionQueue m_ConnectionEventHeadTail;
}
}
}

11
Packages/com.unity.transport/Runtime/NetworkEventQueue.cs.meta


fileFormatVersion: 2
guid: 24080868f8aea4078b8027bb059f03a2
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

62
Packages/com.unity.transport/Runtime/NetworkParams.cs


namespace Unity.Networking.Transport
{
/// <summary>
/// The interface for NetworkParameters
/// </summary>
public interface INetworkParameter
{
}
/// <summary>
/// Default NetworkParameter Constants.
/// </summary>
public struct NetworkParameterConstants
{
/// <summary>The default size of the event queue.</summary>
public const int InitialEventQueueSize = 100;
public const int InvalidConnectionId = -1;
/// <summary>
/// The default size of the DataStreamWriter. This value can be overridden using the <see cref="NetworkConfigParameter"/>.
/// </summary>
public const int DriverDataStreamSize = 64 * 1024;
/// <summary>The default connection timeout value. This value can be overridden using the <see cref="NetworkConfigParameter"/></summary>
public const int ConnectTimeoutMS = 1000;
/// <summary>The default max connection attempts value. This value can be overridden using the <see cref="NetworkConfigParameter"/></summary>
public const int MaxConnectAttempts = 60;
/// <summary>The default disconnect timeout attempts value. This value can be overridden using the <see cref="NetworkConfigParameter"/></summary>
public const int DisconnectTimeoutMS = 30 * 1000;
public const int MTU = 1400;
}
/// <summary>
/// The NetworkDataStreamParameter is used to set a fixed data stream size.
/// </summary>
/// <remarks>The <see cref="DataStreamWriter"/> will grow on demand if the size is set to zero. </remarks>
public struct NetworkDataStreamParameter : INetworkParameter
{
/// <summary>Size of the default <see cref="DataStreamWriter"/></summary>
public int size;
}
/// <summary>
/// The NetworkConfigParameter is used to set specific parameters that the driver uses.
/// </summary>
public struct NetworkConfigParameter : INetworkParameter
{
/// <summary>A timeout in milliseconds indicating how long we will wait until we send a new connection attempt.</summary>
public int connectTimeoutMS;
/// <summary>The maximum amount of connection attempts we will try before disconnecting.</summary>
public int maxConnectAttempts;
/// <summary>A timeout in milliseconds indicating how long we will wait for a socket event, before we disconnect the socket.</summary>
/// <remarks>The connection needs to receive data from the connected endpoint within this timeout.</remarks>
public int disconnectTimeoutMS;
/// <summary>The maximum amount of time a single frame can advance timeout values.</summary>
/// <remarks>The main use for this parameter is to not get disconnects at frame spikes when both endpoints lives in the same process.</remarks>
public int maxFrameTimeMS;
/// <summary>A fixed amount of time to use for an interval between ScheduleUpdate. This is used instead of a clock.</summary>
/// <remarks>The main use for this parameter is tests where determinism is more important than correctness.</remarks>
public int fixedFrameTimeMS;
}
}

11
Packages/com.unity.transport/Runtime/NetworkParams.cs.meta


fileFormatVersion: 2
guid: 727aa4ae963216b4aa40606d66cb2df1
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

978
Packages/com.unity.transport/Runtime/NetworkPipeline.cs


using System;
using System.Threading;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Burst;
using System.Collections.Generic;
using System.Diagnostics;
using Unity.Networking.Transport.Protocols;
using Unity.Networking.Transport.Utilities;
using System.Runtime.InteropServices;
namespace Unity.Networking.Transport
{
public unsafe struct InboundSendBuffer
{
public byte* buffer;
public byte* bufferWithHeaders;
public int bufferLength;
public int bufferWithHeadersLength;
public int headerPadding;
public void SetBufferFrombufferWithHeaders()
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (bufferWithHeadersLength < headerPadding)
throw new IndexOutOfRangeException("Buffer is too small to fit headers");
#endif
buffer = bufferWithHeaders + headerPadding;
bufferLength = bufferWithHeadersLength - headerPadding;
}
}
public unsafe struct InboundRecvBuffer
{
public byte* buffer;
public int bufferLength;
public InboundRecvBuffer Slice(int offset)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (bufferLength < offset)
throw new ArgumentOutOfRangeException("Buffer does not contain enough data");
#endif
InboundRecvBuffer slice;
slice.buffer = buffer + offset;
slice.bufferLength = bufferLength - offset;
return slice;
}
}
public unsafe struct NetworkPipelineContext
{
public byte* staticInstanceBuffer;
public byte* internalSharedProcessBuffer;
public byte* internalProcessBuffer;
public DataStreamWriter header;
public long timestamp;
public int staticInstanceBufferLength;
public int internalSharedProcessBufferLength;
public int internalProcessBufferLength;
public int accumulatedHeaderCapacity;
}
public unsafe interface INetworkPipelineStage
{
NetworkPipelineStage StaticInitialize(byte* staticInstanceBuffer, int staticInstanceBufferLength, INetworkParameter[] param);
int StaticSize { get; }
}
public unsafe struct NetworkPipelineStage
{
public NetworkPipelineStage(TransportFunctionPointer<ReceiveDelegate> Receive,
TransportFunctionPointer<SendDelegate> Send,
TransportFunctionPointer<InitializeConnectionDelegate> InitializeConnection,
int ReceiveCapacity,
int SendCapacity,
int HeaderCapacity,
int SharedStateCapacity,
int PayloadCapacity = 0) // 0 means any size
{
this.Receive = Receive;
this.Send = Send;
this.InitializeConnection = InitializeConnection;
this.ReceiveCapacity = ReceiveCapacity;
this.SendCapacity = SendCapacity;
this.HeaderCapacity = HeaderCapacity;
this.SharedStateCapacity = SharedStateCapacity;
this.PayloadCapacity = PayloadCapacity;
StaticStateStart = StaticStateCapcity = 0;
}
[Flags]
public enum Requests
{
None = 0,
Resume = 1,
Update = 2,
SendUpdate = 4,
Error = 8
}
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void ReceiveDelegate(ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer, ref Requests requests);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate int SendDelegate(ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref Requests requests);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void InitializeConnectionDelegate(byte* staticInstanceBuffer, int staticInstanceBufferLength,
byte* sendProcessBuffer, int sendProcessBufferLength, byte* recvProcessBuffer, int recvProcessBufferLength,
byte* sharedProcessBuffer, int sharedProcessBufferLength);
public TransportFunctionPointer<ReceiveDelegate> Receive;
public TransportFunctionPointer<SendDelegate> Send;
public TransportFunctionPointer<InitializeConnectionDelegate> InitializeConnection;
public readonly int ReceiveCapacity;
public readonly int SendCapacity;
public readonly int HeaderCapacity;
public readonly int SharedStateCapacity;
public readonly int PayloadCapacity;
internal int StaticStateStart;
internal int StaticStateCapcity;
}
public struct NetworkPipelineStageId
{
internal int Index;
internal int IsValid;
}
public static class NetworkPipelineStageCollection
{
static NetworkPipelineStageCollection()
{
m_stages = new List<INetworkPipelineStage>();
RegisterPipelineStage(new NullPipelineStage());
RegisterPipelineStage(new FragmentationPipelineStage());
RegisterPipelineStage(new ReliableSequencedPipelineStage());
RegisterPipelineStage(new UnreliableSequencedPipelineStage());
RegisterPipelineStage(new SimulatorPipelineStage());
RegisterPipelineStage(new SimulatorPipelineStageInSend());
}
public static void RegisterPipelineStage(INetworkPipelineStage stage)
{
for (int i = 0; i < m_stages.Count; ++i)
{
if (m_stages[i].GetType() == stage.GetType())
{
// TODO: should this be an error?
m_stages[i] = stage;
return;
}
}
m_stages.Add(stage);
}
public static NetworkPipelineStageId GetStageId(Type stageType)
{
for (int i = 0; i < m_stages.Count; ++i)
{
if (stageType == m_stages[i].GetType())
return new NetworkPipelineStageId{Index=i, IsValid = 1};
}
throw new InvalidOperationException($"Pipeline stage {stageType} is not registered");
}
internal static List<INetworkPipelineStage> m_stages;
}
public struct NetworkPipeline
{
internal int Id;
public static NetworkPipeline Null => default(NetworkPipeline);
public static bool operator ==(NetworkPipeline lhs, NetworkPipeline rhs)
{
return lhs.Id == rhs.Id;
}
public static bool operator !=(NetworkPipeline lhs, NetworkPipeline rhs)
{
return lhs.Id != rhs.Id;
}
public override bool Equals(object compare)
{
return this == (NetworkPipeline) compare;
}
public override int GetHashCode()
{
return Id;
}
public bool Equals(NetworkPipeline connection)
{
return connection.Id == Id;
}
}
public struct NetworkPipelineParams : INetworkParameter
{
public int initialCapacity;
[Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
public static void ValidateParameters(params INetworkParameter[] param)
{
foreach (var parameter in param)
{
if (parameter is NetworkPipelineParams @params && @params.initialCapacity < 0)
throw new ArgumentException($"Value for NetworkPipelineParams.initialCapacity must be larger then zero.");
}
}
}
internal struct NetworkPipelineProcessor : IDisposable
{
public const int Alignment = 8;
public const int AlignmentMinusOne = Alignment-1;
public int PayloadCapacity(NetworkPipeline pipeline)
{
if (pipeline.Id > 0)
{
var p = m_Pipelines[pipeline.Id - 1];
return p.payloadCapacity;
}
return NetworkParameterConstants.MTU;
}
public Concurrent ToConcurrent()
{
var concurrent = new Concurrent
{
m_StageCollection = m_StageCollection,
m_StaticInstanceBuffer = m_StaticInstanceBuffer,
m_Pipelines = m_Pipelines,
m_StageList = m_StageList,
m_AccumulatedHeaderCapacity = m_AccumulatedHeaderCapacity,
m_SendStageNeedsUpdateWrite = m_SendStageNeedsUpdateRead.AsParallelWriter(),
sizePerConnection = sizePerConnection,
sendBuffer = m_SendBuffer,
sharedBuffer = m_SharedBuffer,
m_timestamp = m_timestamp
};
return concurrent;
}
public struct Concurrent
{
[ReadOnly] internal NativeArray<NetworkPipelineStage> m_StageCollection;
[ReadOnly] internal NativeArray<byte> m_StaticInstanceBuffer;
[ReadOnly] internal NativeList<PipelineImpl> m_Pipelines;
[ReadOnly] internal NativeList<int> m_StageList;
[ReadOnly] internal NativeList<int> m_AccumulatedHeaderCapacity;
internal NativeQueue<UpdatePipeline>.ParallelWriter m_SendStageNeedsUpdateWrite;
[ReadOnly] internal NativeArray<int> sizePerConnection;
// TODO: not really read-only, just hacking the safety system
[ReadOnly] internal NativeList<byte> sharedBuffer;
[ReadOnly] internal NativeList<byte> sendBuffer;
[ReadOnly] internal NativeArray<long> m_timestamp;
public int SendHeaderCapacity(NetworkPipeline pipeline)
{
var p = m_Pipelines[pipeline.Id-1];
return p.headerCapacity;
}
public int PayloadCapacity(NetworkPipeline pipeline)
{
if (pipeline.Id > 0)
{
var p = m_Pipelines[pipeline.Id - 1];
return p.payloadCapacity;
}
return NetworkParameterConstants.MTU;
}
public unsafe int Send(NetworkDriver.Concurrent driver, NetworkPipeline pipeline, NetworkConnection connection, NetworkInterfaceSendHandle sendHandle, int headerSize)
{
if (sendHandle.data == IntPtr.Zero)
{
return (int) Error.StatusCode.NetworkSendHandleInvalid;
}
var p = m_Pipelines[pipeline.Id-1];
var connectionId = connection.m_NetworkId;
// TODO: not really read-only, just hacking the safety system
NativeArray<byte> tmpBuffer = sendBuffer;
int* sendBufferLock = (int*) tmpBuffer.GetUnsafeReadOnlyPtr();
sendBufferLock += connectionId * sizePerConnection[SendSizeOffset] / 4;
if (Interlocked.CompareExchange(ref *sendBufferLock, 1, 0) != 0)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
UnityEngine.Debug.LogError("The parallel network driver needs to process a single unique connection per job, processing a single connection multiple times in a parallel for is not supported.");
return (int) Error.StatusCode.NetworkDriverParallelForErr;
#else
return (int) Error.StatusCode.NetworkDriverParallelForErr;
#endif
}
NativeList<UpdatePipeline> currentUpdates = new NativeList<UpdatePipeline>(128, Allocator.Temp);
int retval = ProcessPipelineSend(driver, 0, pipeline, connection, sendHandle, headerSize, currentUpdates);
Interlocked.Exchange(ref *sendBufferLock, 0);
// Move the updates requested in this iteration to the concurrent queue so it can be read/parsed in update routine
for (int i = 0; i < currentUpdates.Length; ++i)
m_SendStageNeedsUpdateWrite.Enqueue(currentUpdates[i]);
return retval;
}
internal unsafe int ProcessPipelineSend(NetworkDriver.Concurrent driver, int startStage, NetworkPipeline pipeline, NetworkConnection connection,
NetworkInterfaceSendHandle sendHandle, int headerSize, NativeList<UpdatePipeline> currentUpdates)
{
int initialHeaderSize = headerSize;
int retval = sendHandle.size;
NetworkPipelineContext ctx = default(NetworkPipelineContext);
ctx.timestamp = m_timestamp[0];
var p = m_Pipelines[pipeline.Id-1];
var connectionId = connection.m_NetworkId;
var resumeQ = new NativeList<int>(16, Allocator.Temp);
int resumeQStart = 0;
// If the call comes from update, the sendHandle is set to default.
var inboundBuffer = default(InboundSendBuffer);
if (sendHandle.data != IntPtr.Zero)
{
inboundBuffer.bufferWithHeaders = (byte*)sendHandle.data + initialHeaderSize + 1;
inboundBuffer.bufferWithHeadersLength = sendHandle.size - initialHeaderSize - 1;
inboundBuffer.buffer = inboundBuffer.bufferWithHeaders + p.headerCapacity;
inboundBuffer.bufferLength = inboundBuffer.bufferWithHeadersLength - p.headerCapacity;
}
while (true)
{
headerSize = p.headerCapacity;
int internalBufferOffset = p.sendBufferOffset + sizePerConnection[SendSizeOffset] * connectionId;
int internalSharedBufferOffset = p.sharedBufferOffset + sizePerConnection[SharedSizeOffset] * connectionId;
// If this is not the first stage we need to fast forward the buffer offset to the correct place
if (startStage > 0)
{
if (inboundBuffer.bufferWithHeadersLength > 0)
{
UnityEngine.Debug.LogError("Can't start from a stage with a buffer");
return (int)Error.StatusCode.NetworkStateMismatch;
}
for (int i = 0; i < startStage; ++i)
{
internalBufferOffset += (m_StageCollection[m_StageList[p.FirstStageIndex + i]].SendCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
internalSharedBufferOffset += (m_StageCollection[m_StageList[p.FirstStageIndex + i]].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
headerSize -= m_StageCollection[m_StageList[p.FirstStageIndex + i]].HeaderCapacity;
}
}
for (int i = startStage; i < p.NumStages; ++i)
{
int stageHeaderCapacity = m_StageCollection[m_StageList[p.FirstStageIndex + i]].HeaderCapacity;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (stageHeaderCapacity > headerSize)
throw new InvalidOperationException("The stage does not contain enough header space to send the message");
#endif
inboundBuffer.headerPadding = headerSize;
headerSize -= stageHeaderCapacity;
if (stageHeaderCapacity > 0 && inboundBuffer.bufferWithHeadersLength > 0)
{
var headerArray = NativeArrayUnsafeUtility.ConvertExistingDataToNativeArray<byte>(inboundBuffer.bufferWithHeaders + headerSize, stageHeaderCapacity, Allocator.Invalid);
#if ENABLE_UNITY_COLLECTIONS_CHECKS
NativeArrayUnsafeUtility.SetAtomicSafetyHandle(ref headerArray, AtomicSafetyHandle.GetTempMemoryHandle());
#endif
ctx.header = new DataStreamWriter(headerArray);
}
else
ctx.header = new DataStreamWriter(stageHeaderCapacity, Allocator.Temp);
var prevInbound = inboundBuffer;
NetworkPipelineStage.Requests requests = NetworkPipelineStage.Requests.None;
var sendResult = ProcessSendStage(i, internalBufferOffset, internalSharedBufferOffset, p, ref resumeQ, ref ctx, ref inboundBuffer, ref requests);
if ((requests & NetworkPipelineStage.Requests.Update) != 0)
AddSendUpdate(connection, i, pipeline, currentUpdates);
if (inboundBuffer.bufferWithHeadersLength == 0)
{
if ((requests & NetworkPipelineStage.Requests.Error) != 0 && sendHandle.data != IntPtr.Zero)
retval = sendResult;
break;
}
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (inboundBuffer.headerPadding != prevInbound.headerPadding)
throw new InvalidOperationException("Changing the header padding in a pipeline is not supported");
#endif
if (inboundBuffer.buffer != prevInbound.buffer)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (inboundBuffer.buffer != inboundBuffer.bufferWithHeaders + inboundBuffer.headerPadding ||
inboundBuffer.bufferLength + inboundBuffer.headerPadding > inboundBuffer.bufferWithHeadersLength)
throw new InvalidOperationException("When creating an internal buffer in pipelines the buffer must be a subset of the buffer with headers");
#endif
// Copy header to new buffer so it is part of the payload
UnsafeUtility.MemCpy(inboundBuffer.bufferWithHeaders + headerSize, ctx.header.AsNativeArray().GetUnsafeReadOnlyPtr(), ctx.header.Length);
}
#if ENABLE_UNITY_COLLECTIONS_CHECKS
else
{
if (inboundBuffer.bufferWithHeaders != prevInbound.bufferWithHeaders)
throw new InvalidOperationException("Changing the send buffer with headers without changing the buffer is not supported");
}
#endif
if (ctx.header.Length < stageHeaderCapacity)
{
int wastedSpace = stageHeaderCapacity - ctx.header.Length;
// Remove wasted space in the header
UnsafeUtility.MemMove(inboundBuffer.buffer - wastedSpace, inboundBuffer.buffer, inboundBuffer.bufferLength);
}
// Update the inbound buffer for next iteration
inboundBuffer.buffer = inboundBuffer.bufferWithHeaders + headerSize;
inboundBuffer.bufferLength = ctx.header.Length + inboundBuffer.bufferLength;
internalBufferOffset += (ctx.internalProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
internalSharedBufferOffset += (ctx.internalSharedProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
}
if (inboundBuffer.bufferLength != 0)
{
if (sendHandle.data != IntPtr.Zero && inboundBuffer.bufferWithHeaders == (byte*)sendHandle.data + initialHeaderSize + 1)
{
// Actually send the data - after collapsing it again
if (inboundBuffer.buffer != inboundBuffer.bufferWithHeaders)
{
UnsafeUtility.MemMove(inboundBuffer.bufferWithHeaders, inboundBuffer.buffer, inboundBuffer.bufferLength);
inboundBuffer.buffer = inboundBuffer.bufferWithHeaders;
}
((byte*)sendHandle.data)[initialHeaderSize] = (byte)pipeline.Id;
int sendSize = initialHeaderSize + 1 + inboundBuffer.bufferLength;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (sendSize > sendHandle.size)
throw new InvalidOperationException("Pipeline increased the data in the buffer, this is not allowed");
#endif
sendHandle.size = sendSize;
if ((retval = driver.CompleteSend(connection, sendHandle, true)) < 0)
{
UnityEngine.Debug.LogWarning(FixedString.Format("CompleteSend failed with the following error code: {0}", retval));
}
sendHandle = default;
}
else
{
// TODO: This sends the packet directly, bypassing the pipeline process. The problem is that in that way
// we can't set the hasPipeline flag in the headers. There is a workaround for now.
// Sending without pipeline, the correct pipeline will be added by the default flags when this is called
if (driver.BeginSend(connection, out var writer) == 0)
{
writer.WriteByte((byte)pipeline.Id);
writer.WriteBytes(inboundBuffer.buffer, inboundBuffer.bufferLength);
if (writer.HasFailedWrites)
driver.AbortSend(writer);
else
{
if ((retval = driver.EndSend(writer)) <= 0)
{
UnityEngine.Debug.Log(FixedString.Format("An error occurred during EndSend. ErrorCode: {0}", retval));
}
}
}
}
}
if (resumeQStart >= resumeQ.Length)
{
break;
}
startStage = resumeQ[resumeQStart++];
inboundBuffer = default(InboundSendBuffer);
}
if (sendHandle.data != IntPtr.Zero)
driver.AbortSend(sendHandle);
return retval;
}
private unsafe int ProcessSendStage(int startStage, int internalBufferOffset, int internalSharedBufferOffset,
PipelineImpl p, ref NativeList<int> resumeQ, ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
var stageIndex = p.FirstStageIndex + startStage;
var pipelineStage = m_StageCollection[m_StageList[stageIndex]];
ctx.accumulatedHeaderCapacity = m_AccumulatedHeaderCapacity[stageIndex];
ctx.staticInstanceBuffer = (byte*)m_StaticInstanceBuffer.GetUnsafeReadOnlyPtr() + pipelineStage.StaticStateStart;
ctx.staticInstanceBufferLength = pipelineStage.StaticStateCapcity;
ctx.internalProcessBuffer = (byte*)sendBuffer.GetUnsafeReadOnlyPtr() + internalBufferOffset;
ctx.internalProcessBufferLength = pipelineStage.SendCapacity;
ctx.internalSharedProcessBuffer = (byte*)sharedBuffer.GetUnsafeReadOnlyPtr() + internalSharedBufferOffset;
ctx.internalSharedProcessBufferLength = pipelineStage.SharedStateCapacity;
requests = NetworkPipelineStage.Requests.None;
var retval = pipelineStage.Send.Ptr.Invoke(ref ctx, ref inboundBuffer, ref requests);
if ((requests & NetworkPipelineStage.Requests.Resume) != 0)
resumeQ.Add(startStage);
return retval;
}
}
private NativeArray<NetworkPipelineStage> m_StageCollection;
private NativeArray<byte> m_StaticInstanceBuffer;
private NativeList<int> m_StageList;
private NativeList<int> m_AccumulatedHeaderCapacity;
private NativeList<PipelineImpl> m_Pipelines;
private NativeList<byte> m_ReceiveBuffer;
private NativeList<byte> m_SendBuffer;
private NativeList<byte> m_SharedBuffer;
private NativeList<UpdatePipeline> m_ReceiveStageNeedsUpdate;
private NativeList<UpdatePipeline> m_SendStageNeedsUpdate;
private NativeQueue<UpdatePipeline> m_SendStageNeedsUpdateRead;
private NativeArray<int> sizePerConnection;
private NativeArray<long> m_timestamp;
private const int SendSizeOffset = 0;
private const int RecveiveSizeOffset = 1;
private const int SharedSizeOffset = 2;
internal struct PipelineImpl
{
public int FirstStageIndex;
public int NumStages;
public int receiveBufferOffset;
public int sendBufferOffset;
public int sharedBufferOffset;
public int headerCapacity;
public int payloadCapacity;
}
public unsafe NetworkPipelineProcessor(params INetworkParameter[] param)
{
NetworkPipelineParams config = default(NetworkPipelineParams);
for (int i = 0; i < param.Length; ++i)
{
if (param[i] is NetworkPipelineParams)
config = (NetworkPipelineParams)param[i];
}
int staticBufferSize = 0;
for (int i = 0; i < NetworkPipelineStageCollection.m_stages.Count; ++i)
{
staticBufferSize += NetworkPipelineStageCollection.m_stages[i].StaticSize;
staticBufferSize = (staticBufferSize+15)&(~15);
}
m_StaticInstanceBuffer = new NativeArray<byte>(staticBufferSize, Allocator.Persistent);
m_StageCollection = new NativeArray<NetworkPipelineStage>(NetworkPipelineStageCollection.m_stages.Count, Allocator.Persistent);
staticBufferSize = 0;
for (int i = 0; i < NetworkPipelineStageCollection.m_stages.Count; ++i)
{
var stageStruct = NetworkPipelineStageCollection.m_stages[i].StaticInitialize((byte*)m_StaticInstanceBuffer.GetUnsafePtr() + staticBufferSize, NetworkPipelineStageCollection.m_stages[i].StaticSize, param);
stageStruct.StaticStateStart = staticBufferSize;
stageStruct.StaticStateCapcity = NetworkPipelineStageCollection.m_stages[i].StaticSize;
m_StageCollection[i] = stageStruct;
staticBufferSize += NetworkPipelineStageCollection.m_stages[i].StaticSize;
staticBufferSize = (staticBufferSize+15)&(~15);
}
m_StageList = new NativeList<int>(16, Allocator.Persistent);
m_AccumulatedHeaderCapacity = new NativeList<int>(16, Allocator.Persistent);
m_Pipelines = new NativeList<PipelineImpl>(16, Allocator.Persistent);
m_ReceiveBuffer = new NativeList<byte>(config.initialCapacity, Allocator.Persistent);
m_SendBuffer = new NativeList<byte>(config.initialCapacity, Allocator.Persistent);
m_SharedBuffer = new NativeList<byte>(config.initialCapacity, Allocator.Persistent);
sizePerConnection = new NativeArray<int>(3, Allocator.Persistent);
// Store an int for the spinlock first in each connections send buffer, round up to alignment of 8
sizePerConnection[SendSizeOffset] = Alignment;
m_ReceiveStageNeedsUpdate = new NativeList<UpdatePipeline>(128, Allocator.Persistent);
m_SendStageNeedsUpdate = new NativeList<UpdatePipeline>(128, Allocator.Persistent);
m_SendStageNeedsUpdateRead = new NativeQueue<UpdatePipeline>(Allocator.Persistent);
m_timestamp = new NativeArray<long>(1, Allocator.Persistent);
}
public void Dispose()
{
m_StageList.Dispose();
m_AccumulatedHeaderCapacity.Dispose();
m_ReceiveBuffer.Dispose();
m_SendBuffer.Dispose();
m_SharedBuffer.Dispose();
m_Pipelines.Dispose();
sizePerConnection.Dispose();
m_ReceiveStageNeedsUpdate.Dispose();
m_SendStageNeedsUpdate.Dispose();
m_SendStageNeedsUpdateRead.Dispose();
m_timestamp.Dispose();
m_StageCollection.Dispose();
m_StaticInstanceBuffer.Dispose();
}
public long Timestamp
{
get { return m_timestamp[0]; }
internal set { m_timestamp[0] = value; }
}
public unsafe void initializeConnection(NetworkConnection con)
{
var requiredReceiveSize = (con.m_NetworkId + 1) * sizePerConnection[RecveiveSizeOffset];
var requiredSendSize = (con.m_NetworkId + 1) * sizePerConnection[SendSizeOffset];
var requiredSharedSize = (con.m_NetworkId + 1) * sizePerConnection[SharedSizeOffset];
if (m_ReceiveBuffer.Length < requiredReceiveSize)
m_ReceiveBuffer.ResizeUninitialized(requiredReceiveSize);
if (m_SendBuffer.Length < requiredSendSize)
m_SendBuffer.ResizeUninitialized(requiredSendSize);
if (m_SharedBuffer.Length < requiredSharedSize)
m_SharedBuffer.ResizeUninitialized(requiredSharedSize);
UnsafeUtility.MemClear((byte*)m_ReceiveBuffer.GetUnsafePtr() + con.m_NetworkId * sizePerConnection[RecveiveSizeOffset], sizePerConnection[RecveiveSizeOffset]);
UnsafeUtility.MemClear((byte*)m_SendBuffer.GetUnsafePtr() + con.m_NetworkId * sizePerConnection[SendSizeOffset], sizePerConnection[SendSizeOffset]);
UnsafeUtility.MemClear((byte*)m_SharedBuffer.GetUnsafePtr() + con.m_NetworkId * sizePerConnection[SharedSizeOffset], sizePerConnection[SharedSizeOffset]);
InitializeStages(con.m_NetworkId);
}
unsafe void InitializeStages(int networkId)
{
var connectionId = networkId;
for (int i = 0; i < m_Pipelines.Length; i++)
{
var pipeline = m_Pipelines[i];
int recvBufferOffset = pipeline.receiveBufferOffset + sizePerConnection[RecveiveSizeOffset] * connectionId;
int sendBufferOffset = pipeline.sendBufferOffset + sizePerConnection[SendSizeOffset] * connectionId;
int sharedBufferOffset = pipeline.sharedBufferOffset + sizePerConnection[SharedSizeOffset] * connectionId;
for (int stage = pipeline.FirstStageIndex;
stage < pipeline.FirstStageIndex + pipeline.NumStages;
stage++)
{
var pipelineStage = m_StageCollection[m_StageList[stage]];
var sendProcessBuffer = (byte*)m_SendBuffer.GetUnsafePtr() + sendBufferOffset;
var sendProcessBufferLength = pipelineStage.SendCapacity;
var recvProcessBuffer = (byte*)m_ReceiveBuffer.GetUnsafePtr() + recvBufferOffset;
var recvProcessBufferLength = pipelineStage.ReceiveCapacity;
var sharedProcessBuffer = (byte*)m_SharedBuffer.GetUnsafePtr() + sharedBufferOffset;
var sharedProcessBufferLength = pipelineStage.SharedStateCapacity;
var staticInstanceBuffer = (byte*)m_StaticInstanceBuffer.GetUnsafePtr() + pipelineStage.StaticStateStart;
var staticInstanceBufferLength = pipelineStage.StaticStateCapcity;
pipelineStage.InitializeConnection.Ptr.Invoke(staticInstanceBuffer, staticInstanceBufferLength,
sendProcessBuffer, sendProcessBufferLength, recvProcessBuffer, recvProcessBufferLength,
sharedProcessBuffer, sharedProcessBufferLength);
sendBufferOffset += (sendProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
recvBufferOffset += (recvProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
sharedBufferOffset += (sharedProcessBufferLength + AlignmentMinusOne) & (~AlignmentMinusOne);
}
}
}
/// <summary>
/// Create a new NetworkPipeline.
/// </summary>
/// <param name="stages">The stages we want the pipeline to contain.</param>
/// <value>A valid pipeline is returned.</value>
/// <exception cref="InvalidOperationException">Thrown if you try to create more then 255 pipelines.</exception>
/// <exception cref="InvalidOperationException">Thrown if you try to use a invalid pipeline stage.</exception>
public NetworkPipeline CreatePipeline(params Type[] stages)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (m_Pipelines.Length > 255)
throw new InvalidOperationException("Cannot create more than 255 pipelines on a single driver");
#endif
var receiveCap = 0;
var sharedCap = 0;
var sendCap = 0;
var headerCap = 0;
var payloadCap = 0;
var pipeline = new PipelineImpl();
pipeline.FirstStageIndex = m_StageList.Length;
pipeline.NumStages = stages.Length;
for (int i = 0; i < stages.Length; i++)
{
var stageId = NetworkPipelineStageCollection.GetStageId(stages[i]).Index;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (stageId < 0)
throw new InvalidOperationException("Trying to create pipeline with invalid stage " + stages[i]);
#endif
m_StageList.Add(stageId);
m_AccumulatedHeaderCapacity.Add(headerCap); // For every stage, compute how much header space has already bee used by other stages when sending
// Make sure all data buffers are aligned
receiveCap += (m_StageCollection[stageId].ReceiveCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
sendCap += (m_StageCollection[stageId].SendCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
headerCap += m_StageCollection[stageId].HeaderCapacity;
sharedCap += (m_StageCollection[stageId].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
if (payloadCap == 0)
{
payloadCap = m_StageCollection[stageId].PayloadCapacity; // The first non-zero stage determines the pipeline capacity
}
}
pipeline.receiveBufferOffset = sizePerConnection[RecveiveSizeOffset];
sizePerConnection[RecveiveSizeOffset] = sizePerConnection[RecveiveSizeOffset] + receiveCap;
pipeline.sendBufferOffset = sizePerConnection[SendSizeOffset];
sizePerConnection[SendSizeOffset] = sizePerConnection[SendSizeOffset] + sendCap;
pipeline.sharedBufferOffset = sizePerConnection[SharedSizeOffset];
sizePerConnection[SharedSizeOffset] = sizePerConnection[SharedSizeOffset] + sharedCap;
pipeline.headerCapacity = headerCap;
// If no stage explicitly supports more tha MTU the pipeline as a whole does not support more than one MTU
pipeline.payloadCapacity = (payloadCap!=0) ? payloadCap : NetworkParameterConstants.MTU;
m_Pipelines.Add(pipeline);
return new NetworkPipeline {Id = m_Pipelines.Length};
}
public void GetPipelineBuffers(NetworkPipeline pipelineId, NetworkPipelineStageId stageId, NetworkConnection connection,
out NativeArray<byte> readProcessingBuffer, out NativeArray<byte> writeProcessingBuffer,
out NativeArray<byte> sharedBuffer)
{
if (pipelineId.Id < 1)
throw new InvalidOperationException("The specified pipeline is not valid");
if (stageId.IsValid == 0)
throw new InvalidOperationException("The specified pipeline stage is not valid");
var pipeline = m_Pipelines[pipelineId.Id-1];
int recvBufferOffset = pipeline.receiveBufferOffset + sizePerConnection[RecveiveSizeOffset] * connection.InternalId;
int sendBufferOffset = pipeline.sendBufferOffset + sizePerConnection[SendSizeOffset] * connection.InternalId;
int sharedBufferOffset = pipeline.sharedBufferOffset + sizePerConnection[SharedSizeOffset] * connection.InternalId;
int stageIndexInList;
bool stageNotFound = true;
for (stageIndexInList = pipeline.FirstStageIndex;
stageIndexInList < pipeline.FirstStageIndex + pipeline.NumStages;
stageIndexInList++)
{
if (m_StageList[stageIndexInList] == stageId.Index)
{
stageNotFound = false;
break;
}
sendBufferOffset += (m_StageCollection[m_StageList[stageIndexInList]].SendCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
recvBufferOffset += (m_StageCollection[m_StageList[stageIndexInList]].ReceiveCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
sharedBufferOffset += (m_StageCollection[m_StageList[stageIndexInList]].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
}
if (stageNotFound)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
throw new InvalidOperationException($"Could not find stage ID {stageId} make sure the type for this stage ID is added when the pipeline is created.");
#else
writeProcessingBuffer = default;
readProcessingBuffer = default;
sharedBuffer = default;
return;
#endif
}
writeProcessingBuffer = ((NativeArray<byte>)m_SendBuffer).GetSubArray(sendBufferOffset, m_StageCollection[m_StageList[stageIndexInList]].SendCapacity);
readProcessingBuffer = ((NativeArray<byte>)m_ReceiveBuffer).GetSubArray(recvBufferOffset, m_StageCollection[m_StageList[stageIndexInList]].ReceiveCapacity);
sharedBuffer = ((NativeArray<byte>)m_SharedBuffer).GetSubArray(sharedBufferOffset, m_StageCollection[m_StageList[stageIndexInList]].SharedStateCapacity);
}
internal struct UpdatePipeline
{
public NetworkPipeline pipeline;
public int stage;
public NetworkConnection connection;
}
internal unsafe void UpdateSend(NetworkDriver.Concurrent driver, out int updateCount)
{
// Clear the send lock since it cannot be kept here and can be lost if there are exceptions in send
NativeArray<byte> tmpBuffer = m_SendBuffer;
int* sendBufferLock = (int*) tmpBuffer.GetUnsafePtr();
for (int connectionOffset = 0; connectionOffset < m_SendBuffer.Length; connectionOffset += sizePerConnection[SendSizeOffset])
sendBufferLock[connectionOffset / 4] = 0;
NativeArray<UpdatePipeline> sendUpdates = new NativeArray<UpdatePipeline>(m_SendStageNeedsUpdateRead.Count + m_SendStageNeedsUpdate.Length, Allocator.Temp);
UpdatePipeline updateItem;
updateCount = 0;
while (m_SendStageNeedsUpdateRead.TryDequeue(out updateItem))
{
if (driver.GetConnectionState(updateItem.connection) == NetworkConnection.State.Connected)
sendUpdates[updateCount++] = updateItem;
}
int startLength = updateCount;
for (int i = 0; i < m_SendStageNeedsUpdate.Length; i++)
{
if (driver.GetConnectionState(m_SendStageNeedsUpdate[i].connection) == NetworkConnection.State.Connected)
sendUpdates[updateCount++] = m_SendStageNeedsUpdate[i];
}
NativeList<UpdatePipeline> currentUpdates = new NativeList<UpdatePipeline>(128, Allocator.Temp);
// Move the updates requested in this iteration to the concurrent queue so it can be read/parsed in update routine
for (int i = 0; i < updateCount; ++i)
{
updateItem = sendUpdates[i];
var result = ToConcurrent().ProcessPipelineSend(driver, updateItem.stage, updateItem.pipeline, updateItem.connection, default, 0, currentUpdates);
if (result < 0)
{
UnityEngine.Debug.LogWarning(FixedString.Format("ProcessPipelineSend failed with the following error code {0}.", result));
}
}
for (int i = 0; i < currentUpdates.Length; ++i)
m_SendStageNeedsUpdateRead.Enqueue(currentUpdates[i]);
}
private static void AddSendUpdate(NetworkConnection connection, int stageId, NetworkPipeline pipelineId, NativeList<UpdatePipeline> currentUpdates)
{
var newUpdate = new UpdatePipeline
{connection = connection, stage = stageId, pipeline = pipelineId};
bool uniqueItem = true;
for (int j = 0; j < currentUpdates.Length; ++j)
{
if (currentUpdates[j].stage == newUpdate.stage &&
currentUpdates[j].pipeline.Id == newUpdate.pipeline.Id &&
currentUpdates[j].connection == newUpdate.connection)
uniqueItem = false;
}
if (uniqueItem)
currentUpdates.Add(newUpdate);
}
public void UpdateReceive(NetworkDriver driver, out int updateCount)
{
NativeArray<UpdatePipeline> receiveUpdates = new NativeArray<UpdatePipeline>(m_ReceiveStageNeedsUpdate.Length, Allocator.Temp);
// Move current update requests to a new queue
updateCount = 0;
for (int i = 0; i < m_ReceiveStageNeedsUpdate.Length; ++i)
{
if (driver.GetConnectionState(m_ReceiveStageNeedsUpdate[i].connection) == NetworkConnection.State.Connected)
receiveUpdates[updateCount++] = m_ReceiveStageNeedsUpdate[i];
}
m_ReceiveStageNeedsUpdate.Clear();
// Process all current requested updates, new update requests will (possibly) be generated from the pipeline stages
for (int i = 0; i < updateCount; ++i)
{
UpdatePipeline updateItem = receiveUpdates[i];
ProcessReceiveStagesFrom(driver, updateItem.stage, updateItem.pipeline, updateItem.connection, default);
}
}
public unsafe void Receive(NetworkDriver driver, NetworkConnection connection, NativeArray<byte> buffer)
{
byte pipelineId = buffer[0];
if (pipelineId == 0 || pipelineId > m_Pipelines.Length)
{
UnityEngine.Debug.LogError("Received a packet with an invalid pipeline.");
return;
}
var p = m_Pipelines[pipelineId-1];
int startStage = p.NumStages - 1;
InboundRecvBuffer inBuffer;
inBuffer.buffer = (byte*)buffer.GetUnsafePtr() + 1;
inBuffer.bufferLength = buffer.Length - 1;
ProcessReceiveStagesFrom(driver, startStage, new NetworkPipeline{Id = pipelineId}, connection, inBuffer);
}
private unsafe void ProcessReceiveStagesFrom(NetworkDriver driver, int startStage, NetworkPipeline pipeline, NetworkConnection connection, InboundRecvBuffer buffer)
{
var p = m_Pipelines[pipeline.Id-1];
var connectionId = connection.m_NetworkId;
var resumeQ = new NativeList<int>(16, Allocator.Temp);
int resumeQStart = 0;
NetworkPipelineContext ctx = default(NetworkPipelineContext);
ctx.timestamp = Timestamp;
var inboundBuffer = buffer;
ctx.header = default(DataStreamWriter);
NativeList<UpdatePipeline> sendUpdates = new NativeList<UpdatePipeline>(128, Allocator.Temp);
while (true)
{
bool needsUpdate = false;
bool needsSendUpdate = false;
int internalBufferOffset = p.receiveBufferOffset + sizePerConnection[RecveiveSizeOffset] * connectionId;
int internalSharedBufferOffset = p.sharedBufferOffset + sizePerConnection[SharedSizeOffset] * connectionId;
// Adjust offset accounting for stages in front of the starting stage, since we're parsing the stages in reverse order
for (int st = 0; st < startStage; ++st)
{
internalBufferOffset += (m_StageCollection[m_StageList[p.FirstStageIndex+st]].ReceiveCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
internalSharedBufferOffset += (m_StageCollection[m_StageList[p.FirstStageIndex+st]].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
}
for (int i = startStage; i >= 0; --i)
{
ProcessReceiveStage(i, pipeline, internalBufferOffset, internalSharedBufferOffset, ref ctx, ref inboundBuffer, ref resumeQ, ref needsUpdate, ref needsSendUpdate);
if (needsUpdate)
{
var newUpdate = new UpdatePipeline
{connection = connection, stage = i, pipeline = pipeline};
bool uniqueItem = true;
for (int j = 0; j < m_ReceiveStageNeedsUpdate.Length; ++j)
{
if (m_ReceiveStageNeedsUpdate[j].stage == newUpdate.stage &&
m_ReceiveStageNeedsUpdate[j].pipeline.Id == newUpdate.pipeline.Id &&
m_ReceiveStageNeedsUpdate[j].connection == newUpdate.connection)
uniqueItem = false;
}
if (uniqueItem)
m_ReceiveStageNeedsUpdate.Add(newUpdate);
}
if (needsSendUpdate)
AddSendUpdate(connection, i, pipeline, m_SendStageNeedsUpdate);
if (inboundBuffer.bufferLength == 0)
break;
// Offset needs to be adjusted for the next pipeline (the one in front of this one)
if (i > 0)
{
internalBufferOffset -=
(m_StageCollection[m_StageList[p.FirstStageIndex + i - 1]].ReceiveCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
internalSharedBufferOffset -=
(m_StageCollection[m_StageList[p.FirstStageIndex + i - 1]].SharedStateCapacity + AlignmentMinusOne) & (~AlignmentMinusOne);
}
needsUpdate = false;
}
if (inboundBuffer.bufferLength != 0)
driver.PushDataEvent(connection, pipeline.Id, inboundBuffer.buffer, inboundBuffer.bufferLength);
if (resumeQStart >= resumeQ.Length)
{
return;
}
startStage = resumeQ[resumeQStart++];
inboundBuffer = default;
}
}
private unsafe void ProcessReceiveStage(int stage, NetworkPipeline pipeline, int internalBufferOffset, int internalSharedBufferOffset, ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer, ref NativeList<int> resumeQ, ref bool needsUpdate, ref bool needsSendUpdate)
{
var p = m_Pipelines[pipeline.Id-1];
var stageId = m_StageList[p.FirstStageIndex + stage];
var pipelineStage = m_StageCollection[stageId];
ctx.staticInstanceBuffer = (byte*)m_StaticInstanceBuffer.GetUnsafePtr() + pipelineStage.StaticStateStart;
ctx.staticInstanceBufferLength = pipelineStage.StaticStateCapcity;
ctx.internalProcessBuffer = (byte*)m_ReceiveBuffer.GetUnsafePtr() + internalBufferOffset;
ctx.internalProcessBufferLength = pipelineStage.ReceiveCapacity;
ctx.internalSharedProcessBuffer = (byte*)m_SharedBuffer.GetUnsafePtr() + internalSharedBufferOffset;
ctx.internalSharedProcessBufferLength = pipelineStage.SharedStateCapacity;
NetworkPipelineStage.Requests requests = NetworkPipelineStage.Requests.None;
pipelineStage.Receive.Ptr.Invoke(ref ctx, ref inboundBuffer, ref requests);
if ((requests & NetworkPipelineStage.Requests.Resume) != 0)
resumeQ.Add(stage);
needsUpdate = (requests & NetworkPipelineStage.Requests.Update) != 0;
needsSendUpdate = (requests & NetworkPipelineStage.Requests.SendUpdate) != 0;
}
[Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
public static void ValidateSendHandle(NetworkInterfaceSendHandle handle)
{
if (handle.data == IntPtr.Zero)
throw new ArgumentException($"Value for NetworkDataStreamParameter.size must be larger then zero.");
}
}
}

11
Packages/com.unity.transport/Runtime/NetworkPipeline.cs.meta


fileFormatVersion: 2
guid: 261474cc1fff8d34e9e95e2f2dfdd7f5
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

271
Packages/com.unity.transport/Runtime/NetworkProtocol.cs


using System;
using System.Runtime.InteropServices;
using Unity.Collections.LowLevel.Unsafe;
namespace Unity.Networking.Transport
{
internal interface INetworkProtocol : IDisposable
{
/// <summary>
/// This is call when initializing the NetworkDriver. If the protocol requires custom paramters, they can be passed
/// to the NetworkDriver initialization.
/// </summary>
void Initialize(INetworkParameter[] parameters);
/// <summary>
/// Returns a burst compatible NetworkProtocol struct containing the function pointers and custom UserData for the protocol.
/// </summary>
NetworkProtocol CreateProtocolInterface();
/// <summary>
/// This method should bind the NetworkInterface to the local endpoint and perform any
/// custom binding behaviour for the protocol.
/// </summary>
int Bind(INetworkInterface networkInterface, ref NetworkInterfaceEndPoint localEndPoint);
/// <summary>
/// Create a new connection address for the endPoint using the passed NetworkInterface.
/// Some protocols - as relay - could decide to use virtual addressed that not necessarily
/// maps 1 - 1 to a endpoint.
/// </summary>
int Connect(INetworkInterface networkInterface, NetworkEndPoint endPoint, out NetworkInterfaceEndPoint address);
NetworkEndPoint GetRemoteEndPoint(INetworkInterface networkInterface, NetworkInterfaceEndPoint address);
}
/// <summary>
/// This is a Burst compatible struct that contains all the function pointers that the NetworkDriver
/// uses for processing messages with a particular protocol.
/// </summary>
internal struct NetworkProtocol
{
/// <summary>
/// Computes the size required for allocating a packet for the connection with this protocol. The dataCapacity received
/// can be modified to reflect the resulting payload capacity of the packet, if it gets reduced the NetworkDriver will
/// return a NetworkPacketOverflow error. The payloadOffset return value is the possition where the payload data needs
/// to be stored in the packet.
/// </summary>
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate int ComputePacketAllocationSizeDelegate(ref NetworkDriver.Connection connection, ref int dataCapacity, out int payloadOffset);
/// <summary>
/// Process a receiving packet and returns a ProcessPacketCommand that will indicate to the NetworkDriver what actions
/// need to be performed and what to do with the message.
/// </summary>
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void ProcessReceiveDelegate(IntPtr stream, ref NetworkInterfaceEndPoint address, int size, ref NetworkSendInterface sendInterface, ref NetworkSendQueueHandle queueHandle, IntPtr userData, ref ProcessPacketCommand command);
/// <summary>
/// Process a sending packet. When this method is called, the packet is ready to be sent through the sendInterface.
/// Here the protocol could perform some final steps as, for instance, filling some header fields.
/// </summary>
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate int ProcessSendDelegate(ref NetworkDriver.Connection connection, bool hasPipeline, ref NetworkSendInterface sendInterface, ref NetworkInterfaceSendHandle sendHandle, ref NetworkSendQueueHandle queueHandle, IntPtr userData);
/// <summary>
/// This method should send a protocol specific connect confirmation message from a server to a client using the connection.
/// </summary>
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void ProcessSendConnectionAcceptDelegate(ref NetworkDriver.Connection connection, ref NetworkSendInterface sendInterface, ref NetworkSendQueueHandle queueHandle, IntPtr userData);
/// <summary>
/// This method should send a protocol specific connect request message from a client to a server.
/// </summary>
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void ProcessSendConnectionRequestDelegate(ref NetworkDriver.Connection connection, ref NetworkSendInterface sendInterface, ref NetworkSendQueueHandle queueHandle, IntPtr userData);
/// <summary>
/// This method should send a protocol specific disconnect request message from a client to a server.
/// </summary>
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void ProcessSendDisconnectDelegate(ref NetworkDriver.Connection connection, ref NetworkSendInterface sendInterface, ref NetworkSendQueueHandle queueHandle, IntPtr userData);
/// <summary>
/// This method is called every NetworkDriver tick and can be used for performing protocol update operations.
/// One common case is sending protocol specific packets for keeping the connections alive or retrying failed ones.
/// </summary>
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void UpdateDelegate(long updateTime, ref NetworkSendInterface sendInterface, ref NetworkSendQueueHandle queueHandle, IntPtr userData);
public TransportFunctionPointer<ComputePacketAllocationSizeDelegate> ComputePacketAllocationSize;
public TransportFunctionPointer<ProcessReceiveDelegate> ProcessReceive;
public TransportFunctionPointer<ProcessSendDelegate> ProcessSend;
public TransportFunctionPointer<ProcessSendConnectionAcceptDelegate> ProcessSendConnectionAccept;
public TransportFunctionPointer<ProcessSendConnectionRequestDelegate> ProcessSendConnectionRequest;
public TransportFunctionPointer<ProcessSendDisconnectDelegate> ProcessSendDisconnect;
public TransportFunctionPointer<UpdateDelegate> Update;
/// <summary>
/// Raw pointer that is going to be passed to the function pointer and that can contain protocol specific data.
/// </summary>
[NativeDisableUnsafePtrRestriction] public IntPtr UserData;
/// <summary>
/// The maximun size of the header of a data packet for this protocol.
/// </summary>
public int MaxHeaderSize;
/// <summary>
/// The maximun size of the footer of a data packet for this protocol.
/// </summary>
public int MaxFooterSize;
/// <summary>
/// The maximun amount of bytes that are not payload data for a packet for this protocol.
/// </summary>
public int PaddingSize => MaxHeaderSize + MaxFooterSize;
/// <summary>
/// If true - UpdateDelegate will be called
/// </summary>
public bool NeedsUpdate;
public NetworkProtocol(
TransportFunctionPointer<ComputePacketAllocationSizeDelegate> computePacketAllocationSize,
TransportFunctionPointer<ProcessReceiveDelegate> processReceive,
TransportFunctionPointer<ProcessSendDelegate> processSend,
TransportFunctionPointer<ProcessSendConnectionAcceptDelegate> processSendConnectionAccept,
TransportFunctionPointer<ProcessSendConnectionRequestDelegate> processSendConnectionRequest,
TransportFunctionPointer<ProcessSendDisconnectDelegate> processSendDisconnect,
TransportFunctionPointer<UpdateDelegate> update,
bool needsUpdate,
IntPtr userData,
int maxHeaderSize,
int maxFooterSize
) {
ComputePacketAllocationSize = computePacketAllocationSize;
ProcessReceive = processReceive;
ProcessSend = processSend;
ProcessSendConnectionAccept = processSendConnectionAccept;
ProcessSendConnectionRequest = processSendConnectionRequest;
ProcessSendDisconnect = processSendDisconnect;
Update = update;
NeedsUpdate = needsUpdate;
UserData = userData;
MaxHeaderSize = maxHeaderSize;
MaxFooterSize = maxFooterSize;
}
}
/// <summary>
/// The type of commands that the NetworkDriver can process from a received packet after it is proccessed
/// by the protocol.
/// </summary>
public enum ProcessPacketCommandType : byte
{
/// <summary>
/// Do not perform any extra action.
/// </summary>
Drop = 0, // keep Drop = 0 to make it the default.
/// <summary>
/// Find and update the address for a connection.
/// </summary>
AddressUpdate,
/// <summary>
/// Complete the binding proccess.
/// </summary>
BindAccept,
/// <summary>
/// The connection has been accepted by the server and can be completed.
/// </summary>
ConnectionAccept,
/// <summary>
/// The connection has been rejected by the server.
/// </summary>
ConnectionReject,
/// <summary>
/// A connection request comming from a client has been received by the server.
/// </summary>
ConnectionRequest,
/// <summary>
/// A Data message has been received for a well stablished connection.
/// </summary>
Data,
/// <summary>
/// The connection is requesting to disconnect.
/// </summary>
Disconnect,
/// <summary>
/// A simultanious Data + Connection accept command.
/// </summary>
DataWithImplicitConnectionAccept,
}
/// <summary>
/// Contains the command type and command data required by the NetworkDriver to process a packet.
/// </summary>
[StructLayout(LayoutKind.Explicit)]
internal unsafe struct ProcessPacketCommand
{
/// <summary>
/// The type of the command to proccess
/// </summary>
[FieldOffset(0)] public ProcessPacketCommandType Type;
// The following fields behaves like a C++ union. All command types data should start with the Address field.
[FieldOffset(1)] public NetworkInterfaceEndPoint ConnectionAddress;
[FieldOffset(1)] public ProcessPacketCommandAddressUpdate AsAddressUpdate;
[FieldOffset(1)] public ProcessPacketCommandConnectionAccept AsConnectionAccept;
[FieldOffset(1)] public ProcessPacketCommandConnectionRequest AsConnectionRequest;
[FieldOffset(1)] public ProcessPacketCommandData AsData;
[FieldOffset(1)] public ProcessPacketCommandDataWithImplicitConnectionAccept AsDataWithImplicitConnectionAccept;
[FieldOffset(1)] public ProcessPacketCommandDisconnect AsDisconnect;
}
internal struct ProcessPacketCommandAddressUpdate
{
public NetworkInterfaceEndPoint Address;
public NetworkInterfaceEndPoint NewAddress;
public ushort SessionToken;
}
internal struct ProcessPacketCommandConnectionRequest
{
public NetworkInterfaceEndPoint Address;
public ushort SessionId;
}
internal struct ProcessPacketCommandConnectionAccept
{
public NetworkInterfaceEndPoint Address;
public ushort SessionId;
public ushort ConnectionToken;
}
internal struct ProcessPacketCommandDisconnect
{
public NetworkInterfaceEndPoint Address;
public ushort SessionId;
}
internal struct ProcessPacketCommandData
{
public NetworkInterfaceEndPoint Address;
public ushort SessionId;
public int Offset;
public int Length;
public byte HasPipelineByte;
public bool HasPipeline => HasPipelineByte != 0;
}
internal struct ProcessPacketCommandDataWithImplicitConnectionAccept
{
public NetworkInterfaceEndPoint Address;
public ushort SessionId;
public int Offset;
public int Length;
public byte HasPipelineByte;
public ushort ConnectionToken;
public bool HasPipeline => HasPipelineByte != 0;
}
}

11
Packages/com.unity.transport/Runtime/NetworkProtocol.cs.meta


fileFormatVersion: 2
guid: 92570cf71531f6e4f88558d4869992b3
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

31
Packages/com.unity.transport/Runtime/NetworkProtocols.cs


using System;
using System.Runtime.InteropServices;
namespace Unity.Networking.Transport.Protocols
{
public enum UdpCProtocol
{
ConnectionRequest,
ConnectionReject,
ConnectionAccept,
Disconnect,
Data
}
[StructLayout(LayoutKind.Explicit)]
public unsafe struct UdpCHeader
{
[Flags]
public enum HeaderFlags : byte
{
HasConnectToken = 0x1,
HasPipeline = 0x2
}
public const int Length = 4;
[FieldOffset(0)] public fixed byte Data[Length];
[FieldOffset(0)] public byte Type;
[FieldOffset(1)] public HeaderFlags Flags;
[FieldOffset(2)] public ushort SessionToken;
}
}

11
Packages/com.unity.transport/Runtime/NetworkProtocols.cs.meta


fileFormatVersion: 2
guid: baae74db3a3900e47bdd9e2a930406c6
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

8
Packages/com.unity.transport/Runtime/Pipelines.meta


fileFormatVersion: 2
guid: 2c439af55be04483ba9d0917fdc6eb98
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

226
Packages/com.unity.transport/Runtime/Pipelines/FragmentationPipelineStage.cs


using System;
using AOT;
using Unity.Burst;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Networking.Transport.Protocols;
using Unity.Networking.Transport.Utilities;
namespace Unity.Networking.Transport
{
[BurstCompile]
public unsafe struct FragmentationPipelineStage : INetworkPipelineStage
{
public struct FragContext
{
public int startIndex;
public int endIndex;
public int sequence;
public bool packetError;
}
[Flags]
enum FragFlags
{
First = 1 << 15,
Last = 1 << 14,
SeqMask = Last - 1
}
#if FRAGMENTATION_DEBUG
const int FragHeaderCapacity = 2 + 4; // 2 bits for First/Last flags, 14 bits sequence number
#else
const int FragHeaderCapacity = 2; // 2 bits for First/Last flags, 14 bits sequence number
#endif
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.SendDelegate))]
private static int Send(ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
var fragContext = (FragContext*) ctx.internalProcessBuffer;
var dataBuffer = ctx.internalProcessBuffer + sizeof(FragContext);
var param = (FragmentationUtility.Parameters*)ctx.staticInstanceBuffer;
FragFlags flags = FragFlags.First;
int headerCapacity = ctx.header.Capacity;
var systemHeaderCapacity = sizeof(UdpCHeader) + 1 + 2; // Extra byte is for pipeline id, two bytes for footer
var maxBlockLength = NetworkParameterConstants.MTU - systemHeaderCapacity - inboundBuffer.headerPadding;
var maxBlockLengthFirstPacket = maxBlockLength - ctx.accumulatedHeaderCapacity; // The first packet has the headers for all pipeline stages before this one
if (fragContext->endIndex > fragContext->startIndex)
{
var isResume = 0 == inboundBuffer.bufferLength;
if (!isResume)
throw new InvalidOperationException("Internal error: we encountered data in the fragmentation buffer, but this is not a resume call.");
// We have data left over from a previous call
flags &= ~FragFlags.First;
var blockLength = fragContext->endIndex - fragContext->startIndex;
if (blockLength > maxBlockLength)
{
blockLength = maxBlockLength;
}
var blockStart = dataBuffer + fragContext->startIndex;
inboundBuffer.buffer = blockStart;
inboundBuffer.bufferWithHeaders = blockStart - inboundBuffer.headerPadding;
inboundBuffer.bufferLength = blockLength;
inboundBuffer.bufferWithHeadersLength = blockLength + inboundBuffer.headerPadding;
fragContext->startIndex += blockLength;
}
else if (inboundBuffer.bufferLength > maxBlockLengthFirstPacket)
{
var payloadCapacity = param->PayloadCapacity;
var excessLength = inboundBuffer.bufferLength - maxBlockLengthFirstPacket;
var excessStart = inboundBuffer.buffer + maxBlockLengthFirstPacket;
if (excessLength + inboundBuffer.headerPadding > payloadCapacity)
{
throw new InvalidOperationException($"Fragmentation capacity exceeded. Capacity:{payloadCapacity} Payload:{excessLength + inboundBuffer.headerPadding}");
}
UnsafeUtility.MemCpy(dataBuffer + inboundBuffer.headerPadding, excessStart, excessLength);
fragContext->startIndex = inboundBuffer.headerPadding; // Leaving room for header
fragContext->endIndex = excessLength + inboundBuffer.headerPadding;
inboundBuffer.bufferWithHeadersLength -= excessLength;
inboundBuffer.bufferLength -= excessLength;
}
if (fragContext->endIndex > fragContext->startIndex)
{
requests |= NetworkPipelineStage.Requests.Resume;
}
else
{
flags |= FragFlags.Last;
}
var sequence = fragContext->sequence++;
var combined = (sequence & (int)FragFlags.SeqMask) | (int)flags; // lower 14 bits sequence, top 2 bits flags
ctx.header.WriteShort((short)combined);
#if FRAGMENTATION_DEBUG
// For debugging - this allows WireShark to identify fragmentation packets
ctx.header.WriteByte((byte) '@');
ctx.header.WriteByte((byte) '@');
ctx.header.WriteByte((byte) '@');
ctx.header.WriteByte((byte) '@');
#endif
return (int)Error.StatusCode.Success;
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.ReceiveDelegate))]
private static void Receive(ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
var fragContext = (FragContext*) ctx.internalProcessBuffer;
var dataBuffer = ctx.internalProcessBuffer + sizeof(FragContext);
var param = (FragmentationUtility.Parameters*)ctx.staticInstanceBuffer;
var inboundArray = NativeArrayUnsafeUtility.ConvertExistingDataToNativeArray<byte>(inboundBuffer.buffer, inboundBuffer.bufferLength, Allocator.Invalid);
#if ENABLE_UNITY_COLLECTIONS_CHECKS
var safetyHandle = AtomicSafetyHandle.GetTempMemoryHandle();
NativeArrayUnsafeUtility.SetAtomicSafetyHandle(ref inboundArray, safetyHandle);
#endif
var reader = new DataStreamReader(inboundArray);
var combined = reader.ReadShort();
var foundSequence = combined & (int)FragFlags.SeqMask;
var flags = (FragFlags)combined & ~FragFlags.SeqMask;
inboundBuffer = inboundBuffer.Slice(FragHeaderCapacity);
var expectedSequence = fragContext->sequence;
var isFirst = 0 != (flags & FragFlags.First);
var isLast = 0 != (flags & FragFlags.Last);
if (isFirst)
{
expectedSequence = foundSequence;
fragContext->packetError = false;
fragContext->endIndex = 0;
}
if (foundSequence != expectedSequence)
{
// We've missed a packet.
fragContext->packetError = true;
fragContext->endIndex = 0; // Discard data we have already collected
}
if (!fragContext->packetError)
{
if (!isLast || fragContext->endIndex > 0)
{
if (fragContext->endIndex + inboundBuffer.bufferLength > param->PayloadCapacity)
{
throw new InvalidOperationException($"Fragmentation capacity exceeded");
}
// Append the data to the end
UnsafeUtility.MemCpy(dataBuffer + fragContext->endIndex, inboundBuffer.buffer, inboundBuffer.bufferLength);
fragContext->endIndex += inboundBuffer.bufferLength;
}
if (isLast && fragContext->endIndex > 0)
{
// Data is complete
inboundBuffer = new InboundRecvBuffer
{
buffer = dataBuffer,
bufferLength = fragContext->endIndex
};
}
}
if (!isLast || fragContext->packetError)
{
// No output if we expect more data, or if data is incomplete due to packet loss
inboundBuffer = default;
}
fragContext->sequence = (foundSequence + 1) & (int)FragFlags.SeqMask;
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.InitializeConnectionDelegate))]
private static void InitializeConnection(byte* staticInstanceBuffer, int staticInstanceBufferLength,
byte* sendProcessBuffer, int sendProcessBufferLength, byte* recvProcessBuffer, int recvProcessBufferLength,
byte* sharedProcessBuffer, int sharedProcessBufferLength)
{
}
static TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate> ReceiveFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate>(Receive);
static TransportFunctionPointer<NetworkPipelineStage.SendDelegate> SendFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.SendDelegate>(Send);
static TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate> InitializeConnectionFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate>(InitializeConnection);
public NetworkPipelineStage StaticInitialize(byte* staticInstanceBuffer, int staticInstanceBufferLength, INetworkParameter[] netParams)
{
FragmentationUtility.Parameters param = default;
foreach (var netParam in netParams)
{
if (netParam is FragmentationUtility.Parameters)
param = (FragmentationUtility.Parameters)netParam;
}
var payloadCapacity = param.PayloadCapacity;
if (payloadCapacity == 0)
payloadCapacity = 4 * 1024;
if (payloadCapacity <= NetworkParameterConstants.MTU)
throw new InvalidOperationException($"Please specify a FragmentationUtility.Parameters with a PayloadCapacity greater than MTU, which is {NetworkParameterConstants.MTU}");
param.PayloadCapacity = payloadCapacity;
UnsafeUtility.MemCpy(staticInstanceBuffer, &param, UnsafeUtility.SizeOf<FragmentationUtility.Parameters>());
return new NetworkPipelineStage(
Receive: ReceiveFunctionPointer,
Send: SendFunctionPointer,
InitializeConnection: InitializeConnectionFunctionPointer,
ReceiveCapacity: sizeof(FragContext) + payloadCapacity,
SendCapacity: sizeof(FragContext) + payloadCapacity,
HeaderCapacity: FragHeaderCapacity,
SharedStateCapacity: 0,
param.PayloadCapacity
);
}
public int StaticSize => UnsafeUtility.SizeOf<FragmentationUtility.Parameters>();
}
}

11
Packages/com.unity.transport/Runtime/Pipelines/FragmentationPipelineStage.cs.meta


fileFormatVersion: 2
guid: fde48fdaf9c384f9c8260f20718f8d36
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

11
Packages/com.unity.transport/Runtime/Pipelines/FragmentationUtility.cs


namespace Unity.Networking.Transport.Utilities
{
public struct FragmentationUtility
{
public struct Parameters : INetworkParameter
{
public int PayloadCapacity;
}
}
}

11
Packages/com.unity.transport/Runtime/Pipelines/FragmentationUtility.cs.meta


fileFormatVersion: 2
guid: 138012999bcee4107a7095b126b6bd89
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

47
Packages/com.unity.transport/Runtime/Pipelines/NullPipelineStage.cs


using AOT;
using Unity.Burst;
namespace Unity.Networking.Transport
{
[BurstCompile]
public unsafe struct NullPipelineStage : INetworkPipelineStage
{
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.SendDelegate))]
private static int Send(ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
return (int)Error.StatusCode.Success;
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.ReceiveDelegate))]
private static void Receive(ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.InitializeConnectionDelegate))]
private static void InitializeConnection(byte* staticInstanceBuffer, int staticInstanceBufferLength,
byte* sendProcessBuffer, int sendProcessBufferLength, byte* recvProcessBuffer, int recvProcessBufferLength,
byte* sharedProcessBuffer, int sharedProcessBufferLength)
{
}
static TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate> ReceiveFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate>(Receive);
static TransportFunctionPointer<NetworkPipelineStage.SendDelegate> SendFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.SendDelegate>(Send);
static TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate> InitializeConnectionFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate>(InitializeConnection);
public NetworkPipelineStage StaticInitialize(byte* staticInstanceBuffer, int staticInstanceBufferLength, INetworkParameter[] netParams)
{
return new NetworkPipelineStage(
Receive: ReceiveFunctionPointer,
Send: SendFunctionPointer,
InitializeConnection: InitializeConnectionFunctionPointer,
ReceiveCapacity: 0,
SendCapacity: 0,
HeaderCapacity: 0,
SharedStateCapacity: 0
);
}
public int StaticSize => 0;
}
}

11
Packages/com.unity.transport/Runtime/Pipelines/NullPipelineStage.cs.meta


fileFormatVersion: 2
guid: 299f00a4e7d7d1344a8e410a7b3e807c
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

187
Packages/com.unity.transport/Runtime/Pipelines/ReliableSequencedPipelineStage.cs


using AOT;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Networking.Transport.Utilities;
using Unity.Burst;
namespace Unity.Networking.Transport
{
[BurstCompile]
public unsafe struct ReliableSequencedPipelineStage : INetworkPipelineStage
{
static TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate> ReceiveFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate>(Receive);
static TransportFunctionPointer<NetworkPipelineStage.SendDelegate> SendFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.SendDelegate>(Send);
static TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate> InitializeConnectionFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate>(InitializeConnection);
public NetworkPipelineStage StaticInitialize(byte* staticInstanceBuffer, int staticInstanceBufferLength, INetworkParameter[] netParams)
{
ReliableUtility.Parameters param = default;
foreach (var netParam in netParams)
{
if (netParam.GetType() == typeof(ReliableUtility.Parameters))
param = (ReliableUtility.Parameters)netParam;
}
if (param.WindowSize == 0)
param = new ReliableUtility.Parameters{WindowSize = ReliableUtility.ParameterConstants.WindowSize};
if (param.WindowSize <= 0 || param.WindowSize > 32)
throw new System.ArgumentOutOfRangeException("The reliability pipeline does not support negative WindowSize nor WindowSizes larger than 32");
UnsafeUtility.MemCpy(staticInstanceBuffer, &param, UnsafeUtility.SizeOf<ReliableUtility.Parameters>());
return new NetworkPipelineStage(
Receive: ReceiveFunctionPointer,
Send: SendFunctionPointer,
InitializeConnection: InitializeConnectionFunctionPointer,
ReceiveCapacity: ReliableUtility.ProcessCapacityNeeded(param),
SendCapacity: ReliableUtility.ProcessCapacityNeeded(param),
HeaderCapacity: UnsafeUtility.SizeOf<ReliableUtility.PacketHeader>(),
SharedStateCapacity: ReliableUtility.SharedCapacityNeeded(param),
NetworkParameterConstants.MTU
);
}
public int StaticSize => UnsafeUtility.SizeOf<ReliableUtility.Parameters>();
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.ReceiveDelegate))]
private static void Receive(ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
// Request a send update to see if a queued packet needs to be resent later or if an ack packet should be sent
requests = NetworkPipelineStage.Requests.SendUpdate;
bool needsResume = false;
var header = default(ReliableUtility.PacketHeader);
var slice = default(InboundRecvBuffer);
ReliableUtility.Context* reliable = (ReliableUtility.Context*) ctx.internalProcessBuffer;
ReliableUtility.SharedContext* shared = (ReliableUtility.SharedContext*) ctx.internalSharedProcessBuffer;
shared->errorCode = 0;
if (reliable->Resume == ReliableUtility.NullEntry)
{
if (inboundBuffer.bufferLength <= 0)
{
inboundBuffer = slice;
return;
}
var inboundArray = NativeArrayUnsafeUtility.ConvertExistingDataToNativeArray<byte>(inboundBuffer.buffer, inboundBuffer.bufferLength, Allocator.Invalid);
#if ENABLE_UNITY_COLLECTIONS_CHECKS
var safetyHandle = AtomicSafetyHandle.GetTempMemoryHandle();
NativeArrayUnsafeUtility.SetAtomicSafetyHandle(ref inboundArray, safetyHandle);
#endif
var reader = new DataStreamReader(inboundArray);
reader.ReadBytes((byte*)&header, UnsafeUtility.SizeOf<ReliableUtility.PacketHeader>());
if (header.Type == (ushort)ReliableUtility.PacketType.Ack)
{
ReliableUtility.ReadAckPacket(ctx, header);
inboundBuffer = default;
return;
}
var result = ReliableUtility.Read(ctx, header);
if (result >= 0)
{
var nextExpectedSequenceId = (ushort) (reliable->Delivered + 1);
if (result == nextExpectedSequenceId)
{
reliable->Delivered = result;
slice = inboundBuffer.Slice(UnsafeUtility.SizeOf<ReliableUtility.PacketHeader>());
if (needsResume = SequenceHelpers.GreaterThan16((ushort) shared->ReceivedPackets.Sequence,
(ushort) result))
{
reliable->Resume = (ushort)(result + 1);
}
}
else
{
ReliableUtility.SetPacket(ctx.internalProcessBuffer, result, inboundBuffer.Slice(UnsafeUtility.SizeOf<ReliableUtility.PacketHeader>()));
slice = ReliableUtility.ResumeReceive(ctx, reliable->Delivered + 1, ref needsResume);
}
}
}
else
{
slice = ReliableUtility.ResumeReceive(ctx, reliable->Resume, ref needsResume);
}
if (needsResume)
requests |= NetworkPipelineStage.Requests.Resume;
inboundBuffer = slice;
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.SendDelegate))]
private static int Send(ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
// Request an update to see if a queued packet needs to be resent later or if an ack packet should be sent
requests = NetworkPipelineStage.Requests.Update;
bool needsResume = false;
var header = new ReliableUtility.PacketHeader();
var reliable = (ReliableUtility.Context*) ctx.internalProcessBuffer;
needsResume = ReliableUtility.ReleaseOrResumePackets(ctx);
if (needsResume)
requests |= NetworkPipelineStage.Requests.Resume;
if (inboundBuffer.bufferLength > 0)
{
reliable->LastSentTime = ctx.timestamp;
if (ReliableUtility.Write(ctx, inboundBuffer, ref header) < 0)
{
// We failed to store the packet for possible later resends, abort and report this as a send error
inboundBuffer = default;
requests |= NetworkPipelineStage.Requests.Error;
return (int)Error.StatusCode.NetworkSendQueueFull;
}
ctx.header.WriteBytes((byte*)&header, UnsafeUtility.SizeOf<ReliableUtility.PacketHeader>());
if (reliable->Resume != ReliableUtility.NullEntry)
requests |= NetworkPipelineStage.Requests.Resume;
reliable->PreviousTimestamp = ctx.timestamp;
return (int)Error.StatusCode.Success;
}
if (reliable->Resume != ReliableUtility.NullEntry)
{
reliable->LastSentTime = ctx.timestamp;
inboundBuffer = ReliableUtility.ResumeSend(ctx, out header, ref needsResume);
if (needsResume)
requests |= NetworkPipelineStage.Requests.Resume;
ctx.header.Clear();
ctx.header.WriteBytes((byte*)&header, UnsafeUtility.SizeOf<ReliableUtility.PacketHeader>());
reliable->PreviousTimestamp = ctx.timestamp;
return (int)Error.StatusCode.Success;
}
if (ReliableUtility.ShouldSendAck(ctx))
{
reliable->LastSentTime = ctx.timestamp;
ReliableUtility.WriteAckPacket(ctx, ref header);
ctx.header.WriteBytes((byte*)&header, UnsafeUtility.SizeOf<ReliableUtility.PacketHeader>());
reliable->PreviousTimestamp = ctx.timestamp;
// TODO: Sending dummy byte over since the pipeline won't send an empty payload (ignored on receive)
inboundBuffer.bufferWithHeadersLength = inboundBuffer.headerPadding + 1;
inboundBuffer.bufferWithHeaders = (byte*)UnsafeUtility.Malloc(inboundBuffer.bufferWithHeadersLength, 8, Allocator.Temp);
inboundBuffer.SetBufferFrombufferWithHeaders();
return (int)Error.StatusCode.Success;
}
reliable->PreviousTimestamp = ctx.timestamp;
return (int)Error.StatusCode.Success;
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.InitializeConnectionDelegate))]
private static void InitializeConnection(byte* staticInstanceBuffer, int staticInstanceBufferLength,
byte* sendProcessBuffer, int sendProcessBufferLength, byte* recvProcessBuffer, int recvProcessBufferLength,
byte* sharedProcessBuffer, int sharedProcessBufferLength)
{
ReliableUtility.Parameters param;
UnsafeUtility.MemCpy(&param, staticInstanceBuffer, UnsafeUtility.SizeOf<ReliableUtility.Parameters>());
if (sharedProcessBufferLength >= ReliableUtility.SharedCapacityNeeded(param) &&
(sendProcessBufferLength + recvProcessBufferLength) >= ReliableUtility.ProcessCapacityNeeded(param) * 2)
{
ReliableUtility.InitializeContext(sharedProcessBuffer, sharedProcessBufferLength, sendProcessBuffer, sendProcessBufferLength, recvProcessBuffer, recvProcessBufferLength, param);
}
}
}
}

11
Packages/com.unity.transport/Runtime/Pipelines/ReliableSequencedPipelineStage.cs.meta


fileFormatVersion: 2
guid: d4a883e653375453c8a2dcaec90940a2
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

761
Packages/com.unity.transport/Runtime/Pipelines/ReliableUtility.cs


using System;
using System.Runtime.InteropServices;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Mathematics;
namespace Unity.Networking.Transport.Utilities
{
public struct SequenceBufferContext
{
public int Sequence;
public int Acked;
public uint AckMask;
public uint LastAckMask;
}
public struct ReliableUtility
{
public struct Statistics
{
public int PacketsReceived;
public int PacketsSent;
public int PacketsDropped;
public int PacketsOutOfOrder;
public int PacketsDuplicated;
public int PacketsStale;
public int PacketsResent;
}
public struct RTTInfo
{
public int LastRtt;
public float SmoothedRtt;
public float SmoothedVariance;
public int ResendTimeout;
}
public const int NullEntry = -1;
// The least amount of time we'll wait until a packet resend is performed
// This is 4x16ms (assumes a 60hz update rate)
public const int DefaultMinimumResendTime = 64;
public const int MaximumResendTime = 200;
public enum ErrorCodes
{
Stale_Packet = -1,
Duplicated_Packet = -2,
OutgoingQueueIsFull = -7,
InsufficientMemory = -8
}
public enum PacketType : ushort
{
Payload = 0,
Ack = 1
}
public struct SharedContext
{
public int WindowSize;
public int MinimumResendTime;
/// <summary>
/// Context of sent packets, last sequence ID sent (-1), last ID of our sent packet acknowledged by
/// remote peer, ackmask of acknowledged packets. This is used when determining if a resend
/// is needed.
/// </summary>
public SequenceBufferContext SentPackets;
/// <summary>
/// Context of received packets, last sequence ID received, and ackmask of received packets. Acked is not used.
/// This is sent back to the remote peer in the header when sending.
/// </summary>
public SequenceBufferContext ReceivedPackets;
public Statistics stats;
public ErrorCodes errorCode;
// Timing information for calculating resend times for packets
public RTTInfo RttInfo;
public int TimerDataOffset;
public int TimerDataStride;
public int RemoteTimerDataOffset;
public int RemoteTimerDataStride;
}
public struct Context
{
public int Capacity;
public int Resume;
public int Delivered;
public int IndexStride;
public int IndexPtrOffset;
public int DataStride;
public int DataPtrOffset;
public long LastSentTime;
public long PreviousTimestamp;
}
public struct Parameters : INetworkParameter
{
public int WindowSize;
}
public struct ParameterConstants
{
public const int WindowSize = 32;
}
[StructLayout(LayoutKind.Sequential)]
public struct PacketHeader
{
public ushort Type;
public ushort ProcessingTime;
public ushort SequenceId;
public ushort AckedSequenceId;
public uint AckMask;
}
[StructLayout(LayoutKind.Sequential)]
public struct PacketInformation
{
public int SequenceId;
public ushort Size;
public ushort HeaderPadding;
public long SendTime;
}
// Header is inside the total packet length (Buffer size)
[StructLayout(LayoutKind.Explicit)]
public unsafe struct Packet
{
internal const int Length = NetworkParameterConstants.MTU;
[FieldOffset(0)] public PacketHeader Header;
[FieldOffset(0)] public fixed byte Buffer[Length];
}
[StructLayout(LayoutKind.Sequential)]
public struct PacketTimers
{
public ushort ProcessingTime;
public ushort Padding;
public int SequenceId;
public long SentTime;
public long ReceiveTime;
}
private static int AlignedSizeOf<T>() where T: struct
{
return (UnsafeUtility.SizeOf<T>() + NetworkPipelineProcessor.AlignmentMinusOne) & (~NetworkPipelineProcessor.AlignmentMinusOne);
}
public static int SharedCapacityNeeded(Parameters param)
{
int capacityNeeded;
unsafe
{
// Timers are stored for both remote packets (processing time) and local packets (round trip time)
// The amount of timestamps needed in the queues is the same as the window size capacity
var timerDataSize = AlignedSizeOf<PacketTimers>() * param.WindowSize * 2;
capacityNeeded = AlignedSizeOf<SharedContext>() + timerDataSize;
}
return capacityNeeded;
}
public static int ProcessCapacityNeeded(Parameters param)
{
int capacityNeeded;
unsafe
{
var infoSize = AlignedSizeOf<PacketInformation>();
var dataSize = (Packet.Length + UnsafeUtility.SizeOf<PacketHeader>() + NetworkPipelineProcessor.AlignmentMinusOne) & (~NetworkPipelineProcessor.AlignmentMinusOne);
infoSize *= param.WindowSize;
dataSize *= param.WindowSize;
capacityNeeded = AlignedSizeOf<Context>() + infoSize + dataSize;
}
return capacityNeeded;
}
public static unsafe SharedContext InitializeContext(byte* sharedBuffer, int sharedBufferLength,
byte* sendBuffer, int sendBufferLength, byte* recvBuffer, int recvBufferLength, Parameters param)
{
InitializeProcessContext(sendBuffer, sendBufferLength, param);
InitializeProcessContext(recvBuffer, recvBufferLength, param);
SharedContext* notifier = (SharedContext*) sharedBuffer;
*notifier = new SharedContext
{
WindowSize = param.WindowSize,
SentPackets = new SequenceBufferContext { Acked = NullEntry },
MinimumResendTime = DefaultMinimumResendTime,
ReceivedPackets = new SequenceBufferContext { Sequence = NullEntry },
RttInfo = new RTTInfo { SmoothedVariance = 5, SmoothedRtt = 50, ResendTimeout = 50, LastRtt = 50},
TimerDataOffset = AlignedSizeOf<SharedContext>(),
TimerDataStride = AlignedSizeOf<PacketTimers>(),
RemoteTimerDataOffset = AlignedSizeOf<SharedContext>() + AlignedSizeOf<PacketTimers>() * param.WindowSize,
RemoteTimerDataStride = AlignedSizeOf<PacketTimers>()
};
return *notifier;
}
public static unsafe int InitializeProcessContext(byte* buffer, int bufferLength, Parameters param)
{
int totalCapacity = ProcessCapacityNeeded(param);
if (bufferLength != totalCapacity)
{
return (int) ErrorCodes.InsufficientMemory;
}
Context* ctx = (Context*) buffer;
ctx->Capacity = param.WindowSize;
ctx->IndexStride = AlignedSizeOf<PacketInformation>();
ctx->IndexPtrOffset = AlignedSizeOf<Context>();
ctx->DataStride = (Packet.Length + UnsafeUtility.SizeOf<PacketHeader>() + NetworkPipelineProcessor.AlignmentMinusOne) & (~NetworkPipelineProcessor.AlignmentMinusOne);
ctx->DataPtrOffset = ctx->IndexPtrOffset + (ctx->IndexStride * ctx->Capacity);
ctx->Resume = NullEntry;
ctx->Delivered = NullEntry;
Release(buffer, 0, param.WindowSize);
return 0;
}
public static unsafe void SetPacket(byte* self, int sequence, InboundRecvBuffer data)
{
SetPacket(self, sequence, data.buffer, data.bufferLength);
}
public static unsafe void SetPacket(byte* self, int sequence, void* data, int length)
{
Context* ctx = (Context*) self;
if (length > ctx->DataStride)
#if ENABLE_UNITY_COLLECTIONS_CHECKS
throw new OverflowException();
#else
return;
#endif
var index = sequence % ctx->Capacity;
PacketInformation* info = GetPacketInformation(self, sequence);
info->SequenceId = sequence;
info->Size = (ushort)length;
info->HeaderPadding = 0; // Not used for packets queued for resume receive
info->SendTime = -1; // Not used for packets queued for resume receive
var offset = ctx->DataPtrOffset + (index * ctx->DataStride);
void* dataPtr = (self + offset);
UnsafeUtility.MemCpy(dataPtr, data, length);
}
/// <summary>
/// Write packet, packet header and tracking information to the given buffer space. This buffer
/// should contain the reliability Context at the front, that contains the capacity of the buffer
/// and pointer offsets needed to find the slots we can copy the packet to.
/// </summary>
/// <param name="self">Buffer space where we can store packets.</param>
/// <param name="sequence">The sequence ID of the packet, this is used to find a slot inside the buffer.</param>
/// <param name="header">The packet header which we'll store with the packet payload.</param>
/// <param name="data">The packet data which we're storing.</param>
/// <exception cref="OverflowException"></exception>
public static unsafe void SetHeaderAndPacket(byte* self, int sequence, PacketHeader header, InboundSendBuffer data, long timestamp)
{
Context* ctx = (Context*) self;
int totalSize = data.bufferLength + data.headerPadding;
if (totalSize + UnsafeUtility.SizeOf<PacketHeader>() > ctx->DataStride)
#if ENABLE_UNITY_COLLECTIONS_CHECKS
throw new OverflowException();
#else
return;
#endif
var index = sequence % ctx->Capacity;
PacketInformation* info = GetPacketInformation(self, sequence);
info->SequenceId = sequence;
info->Size = (ushort)totalSize;
info->HeaderPadding = (ushort)data.headerPadding;
info->SendTime = timestamp;
Packet* packet = GetPacket(self, sequence);
packet->Header = header;
var offset = (ctx->DataPtrOffset + (index * ctx->DataStride)) + UnsafeUtility.SizeOf<PacketHeader>();
void* dataPtr = (self + offset);
if (data.bufferLength > 0)
UnsafeUtility.MemCpy((byte*)dataPtr + data.headerPadding, data.buffer, data.bufferLength);
}
public static unsafe PacketInformation* GetPacketInformation(byte* self, int sequence)
{
Context* ctx = (Context*) self;
var index = sequence % ctx->Capacity;
return (PacketInformation*) ((self + ctx->IndexPtrOffset) + (index * ctx->IndexStride));
}
public static unsafe Packet* GetPacket(byte* self, int sequence)
{
Context* ctx = (Context*) self;
var index = sequence % ctx->Capacity;
var offset = ctx->DataPtrOffset + (index * ctx->DataStride);
return (Packet*) (self + offset);
}
public static unsafe bool TryAquire(byte* self, int sequence)
{
Context* ctx = (Context*) self;
var index = sequence % ctx->Capacity;
var currentSequenceId = GetIndex(self, index);
if (currentSequenceId == NullEntry)
{
SetIndex(self, index, sequence);
return true;
}
return false;
}
public static unsafe void Release(byte* self, int sequence)
{
Release(self, sequence, 1);
}
public static unsafe void Release(byte* self, int start_sequence, int count)
{
Context* ctx = (Context*) self;
for (int i = 0; i < count; i++)
{
SetIndex(self, (start_sequence + i) % ctx->Capacity, NullEntry);
}
}
static unsafe void SetIndex(byte* self, int index, int sequence)
{
Context* ctx = (Context*) self;
int* value = (int*) ((self + ctx->IndexPtrOffset) + (index * ctx->IndexStride));
*value = sequence;
}
static unsafe int GetIndex(byte* self, int index)
{
Context* ctx = (Context*) self;
int* value = (int*) ((self + ctx->IndexPtrOffset) + (index * ctx->IndexStride));
return *value;
}
/// <summary>
/// Acknowledge the reception of packets which have been sent. The reliability
/// shared context/state is updated when packets are received from the other end
/// of the connection. The other side will update it's ackmask with which packets
/// have been received (starting from last received sequence ID) each time it sends
/// a packet back. This checks the resend timers on each non-acknowledged packet
/// and notifies if it's time to resend yet.
/// </summary>
/// <param name="context">Pipeline context, contains the buffer slices this pipeline connection owns.</param>
/// <returns></returns>
public static unsafe bool ReleaseOrResumePackets(NetworkPipelineContext context)
{
SharedContext* reliable = (SharedContext*) context.internalSharedProcessBuffer;
Context* ctx = (Context*) context.internalProcessBuffer;
// Last sequence ID and ackmask we received from the remote peer, these are confirmed delivered packets
var lastReceivedAckMask = reliable->SentPackets.AckMask;
var lastOwnSequenceIdAckedByRemote = (ushort)reliable->SentPackets.Acked;
// To deal with wrapping, chop off the upper half of the sequence ID and multiply by window size, it
// will then never wrap but will map to the correct index in the packet storage, wrapping happens when
// sending low sequence IDs (since it checks sequence IDs backwards in time).
var sequence = (ushort)(reliable->WindowSize * ((1 - lastOwnSequenceIdAckedByRemote)>>15));
// Check each slot in the window, starting from the sequence ID calculated above (this isn't the
// latest sequence ID though as it was adjusted to avoid wrapping)
for (int i = 0; i < reliable->WindowSize; i++)
{
var info = GetPacketInformation(context.internalProcessBuffer, sequence);
if (info->SequenceId >= 0)
{
// Check the bit for this sequence ID against the ackmask. Bit 0 in the ackmask is the latest
// ackedSeqId, bit 1 latest ackedSeqId - 1 (one older) and so on. If bit X is 1 then ackedSeqId-X is acknowledged
var ackBits = 1 << (lastOwnSequenceIdAckedByRemote - info->SequenceId);
// Release if this seqId has been flipped on in the ackmask (so it's acknowledged)
// Ignore if sequence ID is out of window range of the last acknowledged id
if (SequenceHelpers.AbsDistance((ushort)lastOwnSequenceIdAckedByRemote, (ushort)info->SequenceId) < reliable->WindowSize && (ackBits & lastReceivedAckMask) != 0)
{
Release(context.internalProcessBuffer, info->SequenceId);
info->SendTime = -1;
sequence = (ushort) (sequence - 1);
continue;
}
var timeToResend = CurrentResendTime(context.internalSharedProcessBuffer);
if (context.timestamp > info->SendTime + timeToResend)
{
ctx->Resume = info->SequenceId;
}
}
sequence = (ushort) (sequence - 1);
}
return ctx->Resume != NullEntry;
}
/// <summary>
/// Resume or play back a packet we had received earlier out of order. When an out of order packet is received
/// it is stored since we need to first return the packet with the next sequence ID. When that packet finally
/// arrives it is returned but a pipeline resume is requested since we already have the next packet stored
/// and it can be processed immediately after.
/// </summary>
/// <param name="context">Pipeline context, we'll use both the shared reliability context and receive context.</param>
/// <param name="startSequence">The first packet which we need to retrieve now, there could be more after that.</param>
/// <param name="needsResume">Indicates if we need the pipeline to resume again.</param>
/// <returns></returns>
public static unsafe InboundRecvBuffer ResumeReceive(NetworkPipelineContext context, int startSequence, ref bool needsResume)
{
if (startSequence == NullEntry) return default;
SharedContext* shared = (SharedContext*) context.internalSharedProcessBuffer;
Context* reliable = (Context*)context.internalProcessBuffer;
reliable->Resume = NullEntry;
PacketInformation* info = GetPacketInformation(context.internalProcessBuffer, startSequence);
var latestReceivedPacket = shared->ReceivedPackets.Sequence;
if (info->SequenceId == startSequence)
{
var offset = reliable->DataPtrOffset + ((startSequence % reliable->Capacity) * reliable->DataStride);
InboundRecvBuffer inBuffer;
inBuffer.buffer = context.internalProcessBuffer + offset;
inBuffer.bufferLength = info->Size;
reliable->Delivered = startSequence;
if ((ushort)(startSequence + 1) <= latestReceivedPacket)
{
reliable->Resume = (ushort)(startSequence + 1);
needsResume = true;
}
return inBuffer;
}
return default;
}
/// <summary>
/// Resend a packet which we have not received an acknowledgement for in time. Pipeline resume
/// will be enabled if there are more packets which we need to resend. The send reliability context
/// will then also be updated to track the next packet we need to resume.
/// </summary>
/// <param name="context">Pipeline context, we'll use both the shared reliability context and send context.</param>
/// <param name="header">Packet header for the packet payload we're resending.</param>
/// <param name="needsResume">Indicates if a pipeline resume is needed again.</param>
/// <returns>Buffer slice to packet payload.</returns>
/// <exception cref="InvalidOperationException"></exception>
public static unsafe InboundSendBuffer ResumeSend(NetworkPipelineContext context, out PacketHeader header, ref bool needsResume)
{
SharedContext* reliable = (SharedContext*) context.internalSharedProcessBuffer;
Context* ctx = (Context*)context.internalProcessBuffer;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (ctx->Resume == NullEntry)
throw new InvalidOperationException("This function should not be called unless there is data in resume");
#endif
var sequence = (ushort) ctx->Resume;
PacketInformation* information;
information = GetPacketInformation(context.internalProcessBuffer, sequence);
// Reset the resend timer
information->SendTime = context.timestamp;
Packet *packet = GetPacket(context.internalProcessBuffer, sequence);
header = packet->Header;
// Update acked/ackmask to latest values
header.AckedSequenceId = (ushort) reliable->ReceivedPackets.Sequence;
header.AckMask = reliable->ReceivedPackets.AckMask;
var offset = (ctx->DataPtrOffset + ((sequence % ctx->Capacity) * ctx->DataStride)) + UnsafeUtility.SizeOf<PacketHeader>();
var inbound = default(InboundSendBuffer);
inbound.bufferWithHeaders = context.internalProcessBuffer + offset;
inbound.bufferWithHeadersLength = information->Size;
inbound.headerPadding = information->HeaderPadding;
inbound.SetBufferFrombufferWithHeaders();
reliable->stats.PacketsResent++;
needsResume = false;
ctx->Resume = -1;
// Check if another packet needs to be resent right after this one
for (int i = sequence + 1; i < reliable->ReceivedPackets.Sequence + 1; i++)
{
var timeToResend = CurrentResendTime(context.internalSharedProcessBuffer);
information = GetPacketInformation(context.internalProcessBuffer, i);
if (information->SequenceId >= 0 && information->SendTime + timeToResend > context.timestamp)
{
needsResume = true;
ctx->Resume = i;
}
}
return inbound;
}
/// <summary>
/// Store the packet for possible later resends, and fill in the header we'll use to send it (populate with
/// sequence ID, last acknowledged ID from remote with ackmask.
/// </summary>
/// <param name="context">Pipeline context, the reliability shared state is used here.</param>
/// <param name="inboundBuffer">Buffer with packet data.</param>
/// <param name="header">Packet header which will be populated.</param>
/// <returns>Sequence ID assigned to this packet.</returns>
public static unsafe int Write(NetworkPipelineContext context, InboundSendBuffer inboundBuffer, ref PacketHeader header)
{
SharedContext* reliable = (SharedContext*) context.internalSharedProcessBuffer;
var sequence = (ushort) reliable->SentPackets.Sequence;
if (!TryAquire(context.internalProcessBuffer, sequence))
{
reliable->errorCode = ErrorCodes.OutgoingQueueIsFull;
return (int)ErrorCodes.OutgoingQueueIsFull;
}
reliable->stats.PacketsSent++;
header.SequenceId = sequence;
header.AckedSequenceId = (ushort) reliable->ReceivedPackets.Sequence;
header.AckMask = reliable->ReceivedPackets.AckMask;
reliable->ReceivedPackets.Acked = reliable->ReceivedPackets.Sequence;
reliable->ReceivedPackets.LastAckMask = header.AckMask;
// Attach our processing time of the packet we're acknowledging (time between receiving it and sending this ack)
header.ProcessingTime =
CalculateProcessingTime(context.internalSharedProcessBuffer, header.AckedSequenceId, context.timestamp);
reliable->SentPackets.Sequence = (ushort) (reliable->SentPackets.Sequence + 1);
SetHeaderAndPacket(context.internalProcessBuffer, sequence, header, inboundBuffer, context.timestamp);
StoreTimestamp(context.internalSharedProcessBuffer, sequence, context.timestamp);
return sequence;
}
/// <summary>
/// Write an ack packet, only the packet header is used and this doesn't advance the sequence ID.
/// The packet is not stored away for resend routine.
/// </summary>
/// <param name="context">Pipeline context, the reliability shared state is used here.</param>
/// <param name="header">Packet header which will be populated.</param>
/// <returns></returns>
public static unsafe void WriteAckPacket(NetworkPipelineContext context, ref PacketHeader header)
{
SharedContext* reliable = (SharedContext*) context.internalSharedProcessBuffer;
header.Type = (ushort)PacketType.Ack;
header.AckedSequenceId = (ushort) reliable->ReceivedPackets.Sequence;
header.AckMask = reliable->ReceivedPackets.AckMask;
header.ProcessingTime =
CalculateProcessingTime(context.internalSharedProcessBuffer, header.AckedSequenceId, context.timestamp);
reliable->ReceivedPackets.Acked = reliable->ReceivedPackets.Sequence;
reliable->ReceivedPackets.LastAckMask = header.AckMask;
}
public static unsafe void StoreTimestamp(byte* sharedBuffer, ushort sequenceId, long timestamp)
{
var timerData = GetLocalPacketTimer(sharedBuffer, sequenceId);
timerData->SequenceId = sequenceId;
timerData->SentTime = timestamp;
timerData->ProcessingTime = 0;
timerData->ReceiveTime = 0;
}
public static unsafe void StoreReceiveTimestamp(byte* sharedBuffer, ushort sequenceId, long timestamp, ushort processingTime)
{
var sharedCtx = (SharedContext*) sharedBuffer;
var rttInfo = sharedCtx->RttInfo;
var timerData = GetLocalPacketTimer(sharedBuffer, sequenceId);
if (timerData != null && timerData->SequenceId == sequenceId)
{
// Ignore the receive time if we've already received it (remote doesn't have new acks)
if (timerData->ReceiveTime > 0)
return;
timerData->ReceiveTime = timestamp;
timerData->ProcessingTime = processingTime;
rttInfo.LastRtt = (int)Math.Max(timerData->ReceiveTime - timerData->SentTime - timerData->ProcessingTime, 1);
var delta = rttInfo.LastRtt - rttInfo.SmoothedRtt;
rttInfo.SmoothedRtt += delta / 8;
rttInfo.SmoothedVariance += (math.abs(delta) - rttInfo.SmoothedVariance) / 4;
rttInfo.ResendTimeout = (int)(rttInfo.SmoothedRtt + 4 * rttInfo.SmoothedVariance);
sharedCtx->RttInfo = rttInfo;
}
}
public static unsafe void StoreRemoteReceiveTimestamp(byte* sharedBuffer, ushort sequenceId, long timestamp)
{
var timerData = GetRemotePacketTimer(sharedBuffer, sequenceId);
timerData->SequenceId = sequenceId;
timerData->ReceiveTime = timestamp;
}
static unsafe int CurrentResendTime(byte* sharedBuffer)
{
var sharedCtx = (SharedContext*) sharedBuffer;
if (sharedCtx->RttInfo.ResendTimeout > MaximumResendTime)
return MaximumResendTime;
return Math.Max(sharedCtx->RttInfo.ResendTimeout, sharedCtx->MinimumResendTime);
}
public static unsafe ushort CalculateProcessingTime(byte* sharedBuffer, ushort sequenceId, long timestamp)
{
// Look up previously recorded receive timestamp, subtract that from current timestamp and return as processing time
var timerData = GetRemotePacketTimer(sharedBuffer, sequenceId);
if (timerData != null && timerData->SequenceId == sequenceId)
return Math.Min((ushort) (timestamp - timerData->ReceiveTime), ushort.MaxValue);
return 0;
}
public static unsafe PacketTimers* GetLocalPacketTimer(byte* sharedBuffer, ushort sequenceId)
{
var sharedCtx = (SharedContext*) sharedBuffer;
var index = sequenceId % sharedCtx->WindowSize;
var timerPtr = (long)sharedBuffer + sharedCtx->TimerDataOffset + sharedCtx->TimerDataStride * index;
return (PacketTimers*) timerPtr;
}
public static unsafe PacketTimers* GetRemotePacketTimer(byte* sharedBuffer, ushort sequenceId)
{
var sharedCtx = (SharedContext*) sharedBuffer;
var index = sequenceId % sharedCtx->WindowSize;
var timerPtr = (long)sharedBuffer + sharedCtx->RemoteTimerDataOffset + sharedCtx->RemoteTimerDataStride * index;
return (PacketTimers*) timerPtr;
}
/// <summary>
/// Read header data and update reliability tracking information in the shared context.
/// - If the packets sequence ID is lower than the last received ID+1, then it's stale
/// - If the packets sequence ID is higher, then we'll process it and update tracking info in the shared context
/// </summary>
/// <param name="context">Pipeline context, the reliability shared state is used here.</param>
/// <param name="header">Packet header of a new received packet.</param>
/// <returns>Sequence ID of the received packet.</returns>
public static unsafe int Read(NetworkPipelineContext context, PacketHeader header)
{
SharedContext* reliable = (SharedContext*) context.internalSharedProcessBuffer;
reliable->stats.PacketsReceived++;
if (SequenceHelpers.StalePacket(
header.SequenceId,
(ushort) (reliable->ReceivedPackets.Sequence + 1),
(ushort) reliable->WindowSize))
{
reliable->stats.PacketsStale++;
return (int) ErrorCodes.Stale_Packet;
}
var window = reliable->WindowSize - 1;
if (SequenceHelpers.GreaterThan16((ushort) (header.SequenceId + 1), (ushort) reliable->ReceivedPackets.Sequence))
{
int distance = SequenceHelpers.AbsDistance(header.SequenceId, (ushort)reliable->ReceivedPackets.Sequence);
for (var i = 0; i < Math.Min(distance, window); ++i)
{
if ((reliable->ReceivedPackets.AckMask & 1 << (window - i)) == 0)
{
reliable->stats.PacketsDropped++;
}
}
if (distance > window)
{
reliable->stats.PacketsDropped += distance - window;
reliable->ReceivedPackets.AckMask = 1;
}
else
{
reliable->ReceivedPackets.AckMask <<= distance;
reliable->ReceivedPackets.AckMask |= 1;
}
reliable->ReceivedPackets.Sequence = header.SequenceId;
}
else if (SequenceHelpers.LessThan16(header.SequenceId, (ushort) reliable->ReceivedPackets.Sequence))
{
int distance = SequenceHelpers.AbsDistance(header.SequenceId, (ushort)reliable->ReceivedPackets.Sequence);
// If this is a resent packet the distance will seem very big and needs to be calculated again with adjustment for wrapping
if (distance >= ushort.MaxValue - reliable->WindowSize)
distance = reliable->ReceivedPackets.Sequence - header.SequenceId;
var ackBit = 1 << distance;
if ((ackBit & reliable->ReceivedPackets.AckMask) != 0)
{
reliable->stats.PacketsDuplicated++;
return (int) ErrorCodes.Duplicated_Packet;
}
reliable->stats.PacketsOutOfOrder++;
reliable->ReceivedPackets.AckMask |= (uint) ackBit;
}
// Store receive timestamp for remote sequence ID we just received
StoreRemoteReceiveTimestamp(context.internalSharedProcessBuffer, header.SequenceId, context.timestamp);
ReadAckPacket(context, header);
return header.SequenceId;
}
public static unsafe void ReadAckPacket(NetworkPipelineContext context, PacketHeader header)
{
SharedContext* reliable = (SharedContext*) context.internalSharedProcessBuffer;
// Store receive timestamp for our acked sequence ID with remote processing time
StoreReceiveTimestamp(context.internalSharedProcessBuffer, header.AckedSequenceId, context.timestamp, header.ProcessingTime);
// Check the distance of the acked seqId in the header, if it's too far away from last acked packet we
// can't process it and add it to the ack mask
if (SequenceHelpers.GreaterThan16((ushort) reliable->SentPackets.Acked, header.AckedSequenceId))
{
// No new acks;
return;
}
if (reliable->SentPackets.Acked == header.AckedSequenceId)
{
// If the current packet is the same as the last one we acked we do not know which one is newer, but it is safe to keep any packet acked by either ack since we never un-ack
reliable->SentPackets.AckMask |= header.AckMask;
}
else
{
reliable->SentPackets.Acked = header.AckedSequenceId;
reliable->SentPackets.AckMask = header.AckMask;
}
}
public static unsafe bool ShouldSendAck(NetworkPipelineContext ctx)
{
var reliable = (Context*) ctx.internalProcessBuffer;
var shared = (SharedContext*) ctx.internalSharedProcessBuffer;
// If more than one full frame (timestamp - prevTimestamp = one frame) has elapsed then send ack packet
// and if the last received sequence ID has not been acked yet, or the set of acked packet in the window
// changed without the sequence ID updating (can happen when receiving out of order packets)
if (reliable->LastSentTime < reliable->PreviousTimestamp &&
(shared->ReceivedPackets.Acked < shared->ReceivedPackets.Sequence ||
shared->ReceivedPackets.AckMask != shared->ReceivedPackets.LastAckMask))
return true;
return false;
}
public static unsafe void SetMinimumResendTime(int value, NetworkDriver driver,
NetworkPipeline pipeline, NetworkConnection con)
{
driver.GetPipelineBuffers(pipeline, NetworkPipelineStageCollection.GetStageId(typeof(ReliableSequencedPipelineStage)), con, out var receiveBuffer, out var sendBuffer, out var sharedBuffer);
var sharedCtx = (ReliableUtility.SharedContext*) sharedBuffer.GetUnsafePtr();
sharedCtx->MinimumResendTime = value;
}
}
}

11
Packages/com.unity.transport/Runtime/Pipelines/ReliableUtility.cs.meta


fileFormatVersion: 2
guid: d04421de89975435aa55643e2d609958
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

214
Packages/com.unity.transport/Runtime/Pipelines/SimulatorPipelineStage.cs


using AOT;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Networking.Transport.Utilities;
using Unity.Burst;
namespace Unity.Networking.Transport
{
[BurstCompile]
public unsafe struct SimulatorPipelineStage : INetworkPipelineStage
{
static TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate> ReceiveFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate>(Receive);
static TransportFunctionPointer<NetworkPipelineStage.SendDelegate> SendFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.SendDelegate>(Send);
static TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate> InitializeConnectionFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate>(InitializeConnection);
public NetworkPipelineStage StaticInitialize(byte* staticInstanceBuffer, int staticInstanceBufferLength, INetworkParameter[] netParams)
{
SimulatorUtility.Parameters param = default;
foreach (var netParam in netParams)
{
if (netParam is SimulatorUtility.Parameters)
param = (SimulatorUtility.Parameters)netParam;
}
UnsafeUtility.MemCpy(staticInstanceBuffer, &param, UnsafeUtility.SizeOf<SimulatorUtility.Parameters>());
return new NetworkPipelineStage(
Receive: ReceiveFunctionPointer,
Send: SendFunctionPointer,
InitializeConnection: InitializeConnectionFunctionPointer,
ReceiveCapacity: param.MaxPacketCount * (param.MaxPacketSize+UnsafeUtility.SizeOf<SimulatorUtility.DelayedPacket>()),
SendCapacity: 0,
HeaderCapacity: 0,
SharedStateCapacity: UnsafeUtility.SizeOf<SimulatorUtility.Context>()
);
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.InitializeConnectionDelegate))]
private static void InitializeConnection(byte* staticInstanceBuffer, int staticInstanceBufferLength,
byte* sendProcessBuffer, int sendProcessBufferLength, byte* recvProcessBuffer, int recvProcessBufferLength,
byte* sharedProcessBuffer, int sharedProcessBufferLength)
{
SimulatorUtility.Parameters param = default;
UnsafeUtility.MemCpy(&param, staticInstanceBuffer, UnsafeUtility.SizeOf<SimulatorUtility.Parameters>());
if (sharedProcessBufferLength >= UnsafeUtility.SizeOf<SimulatorUtility.Parameters>())
{
SimulatorUtility.InitializeContext(param, sharedProcessBuffer);
}
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.SendDelegate))]
private static int Send(ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
return (int)Error.StatusCode.Success;
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.ReceiveDelegate))]
private static void Receive(ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
var context = (SimulatorUtility.Context*) ctx.internalSharedProcessBuffer;
var param = *(SimulatorUtility.Parameters*) ctx.staticInstanceBuffer;
var simulator = new SimulatorUtility(param.MaxPacketCount, param.MaxPacketSize, param.PacketDelayMs, param.PacketJitterMs);
if (inboundBuffer.bufferLength > param.MaxPacketSize)
{
//UnityEngine.Debug.LogWarning("Incoming packet too large for internal storage buffer. Passing through. [buffer=" + inboundBuffer.Length + " packet=" + param->MaxPacketSize + "]");
// TODO: Add error code for this
return;
}
var timestamp = ctx.timestamp;
// Inbound buffer is empty if this is a resumed receive
if (inboundBuffer.bufferLength > 0)
{
context->PacketCount++;
if (simulator.ShouldDropPacket(context, param, timestamp))
{
context->PacketDropCount++;
inboundBuffer = default;
return;
}
var bufferVec = default(InboundSendBuffer);
bufferVec.bufferWithHeaders = inboundBuffer.buffer;
bufferVec.bufferWithHeadersLength = inboundBuffer.bufferLength;
bufferVec.buffer = inboundBuffer.buffer;
bufferVec.bufferLength = inboundBuffer.bufferLength;
bufferVec.headerPadding = 0;
if (context->PacketDelayMs == 0 ||
!simulator.DelayPacket(ref ctx, bufferVec, ref requests, timestamp))
{
return;
}
}
InboundSendBuffer returnPacket = default;
if (simulator.GetDelayedPacket(ref ctx, ref returnPacket, ref requests, timestamp))
{
inboundBuffer.buffer = returnPacket.bufferWithHeaders;
inboundBuffer.bufferLength = returnPacket.bufferWithHeadersLength;
return;
}
inboundBuffer = default;
}
public int StaticSize => UnsafeUtility.SizeOf<SimulatorUtility.Parameters>();
}
[BurstCompile]
public unsafe struct SimulatorPipelineStageInSend : INetworkPipelineStage
{
static TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate> ReceiveFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate>(Receive);
static TransportFunctionPointer<NetworkPipelineStage.SendDelegate> SendFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.SendDelegate>(Send);
static TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate> InitializeConnectionFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate>(InitializeConnection);
public NetworkPipelineStage StaticInitialize(byte* staticInstanceBuffer, int staticInstanceBufferLength, INetworkParameter[] netParams)
{
SimulatorUtility.Parameters param = default;
foreach (var netParam in netParams)
{
if (netParam is SimulatorUtility.Parameters parameters)
param = parameters;
}
UnsafeUtility.MemCpy(staticInstanceBuffer, &param, UnsafeUtility.SizeOf<SimulatorUtility.Parameters>());
return new NetworkPipelineStage(
Receive: ReceiveFunctionPointer,
Send: SendFunctionPointer,
InitializeConnection: InitializeConnectionFunctionPointer,
ReceiveCapacity: 0,
SendCapacity: param.MaxPacketCount * (param.MaxPacketSize+UnsafeUtility.SizeOf<SimulatorUtility.DelayedPacket>()),
HeaderCapacity: 0,
SharedStateCapacity: UnsafeUtility.SizeOf<SimulatorUtility.Context>()
);
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.InitializeConnectionDelegate))]
private static void InitializeConnection(byte* staticInstanceBuffer, int staticInstanceBufferLength,
byte* sendProcessBuffer, int sendProcessBufferLength, byte* recvProcessBuffer, int recvProcessBufferLength,
byte* sharedProcessBuffer, int sharedProcessBufferLength)
{
SimulatorUtility.Parameters param = default;
UnsafeUtility.MemCpy(&param, staticInstanceBuffer, UnsafeUtility.SizeOf<SimulatorUtility.Parameters>());
if (sharedProcessBufferLength >= UnsafeUtility.SizeOf<SimulatorUtility.Parameters>())
{
SimulatorUtility.InitializeContext(param, sharedProcessBuffer);
}
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.SendDelegate))]
private static int Send(ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
var context = (SimulatorUtility.Context*) ctx.internalSharedProcessBuffer;
var param = *(SimulatorUtility.Parameters*) ctx.staticInstanceBuffer;
var simulator = new SimulatorUtility(param.MaxPacketCount, param.MaxPacketSize, param.PacketDelayMs, param.PacketJitterMs);
if (inboundBuffer.headerPadding+inboundBuffer.bufferLength > param.MaxPacketSize)
{
//UnityEngine.Debug.LogWarning("Incoming packet too large for internal storage buffer. Passing through. [buffer=" + (inboundBuffer.headerPadding+inboundBuffer.buffer.Length) + " packet=" + param.MaxPacketSize + "]");
return (int) Error.StatusCode.NetworkPacketOverflow;
}
var timestamp = ctx.timestamp;
if (inboundBuffer.bufferLength > 0)
{
context->PacketCount++;
if (simulator.ShouldDropPacket(context, param, timestamp))
{
context->PacketDropCount++;
inboundBuffer = default;
return (int)Error.StatusCode.Success;
}
if (context->FuzzFactor > 0)
{
simulator.FuzzPacket(context, ref inboundBuffer);
}
if (context->PacketDelayMs == 0 ||
!simulator.DelayPacket(ref ctx, inboundBuffer, ref requests, timestamp))
{
return (int)Error.StatusCode.Success;
}
}
InboundSendBuffer returnPacket = default;
if (simulator.GetDelayedPacket(ref ctx, ref returnPacket, ref requests, timestamp))
{
inboundBuffer = returnPacket;
return (int)Error.StatusCode.Success;
}
inboundBuffer = default;
return (int)Error.StatusCode.Success;
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.ReceiveDelegate))]
private static void Receive(ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer,
ref NetworkPipelineStage.Requests requests)
{
}
public int StaticSize => UnsafeUtility.SizeOf<SimulatorUtility.Parameters>();
}
}

11
Packages/com.unity.transport/Runtime/Pipelines/SimulatorPipelineStage.cs.meta


fileFormatVersion: 2
guid: 6c94271ce32ea46fa9d8019f313443eb
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

284
Packages/com.unity.transport/Runtime/Pipelines/SimulatorUtility.cs


using System.Runtime.InteropServices;
using Unity.Collections.LowLevel.Unsafe;
using Random = Unity.Mathematics.Random;
namespace Unity.Networking.Transport.Utilities
{
public struct SimulatorUtility
{
private int m_PacketCount;
private int m_MaxPacketSize;
private int m_PacketDelayMs;
private int m_PacketJitterMs;
/// <summary>
/// Configuration parameters for the simulator pipeline stage.
/// </summary>
[StructLayout(LayoutKind.Sequential)]
public struct Parameters : INetworkParameter
{
/// <summary>
/// The maximum amount of packets the pipeline can keep track of. This used when a
/// packet is delayed, the packet is stored in the pipeline processing buffer and can
/// be later brought back.
/// </summary>
public int MaxPacketCount;
/// <summary>
/// The maximum size of a packet which the simulator stores. If a packet exceeds this size it will
/// bypass the simulator.
/// </summary>
public int MaxPacketSize;
/// <summary>
/// Fixed delay to apply to all packets which pass through.
/// </summary>
public int PacketDelayMs;
/// <summary>
/// Variable delay to apply to all packets which pass through, adds or subtracts amount from fixed delay.
/// </summary>
public int PacketJitterMs;
/// <summary>
/// Fixed interval to drop packets on. This is most suitable for tests where predictable
/// behaviour is desired, every Xth packet will be dropped. If PacketDropInterval is 5
/// every 5th packet is dropped.
/// </summary>
public int PacketDropInterval;
/// <summary>
/// Use a drop percentage when deciding when to drop packet. For every packet
/// a random number generator is used to determine if the packet should be dropped or not.
/// A percentage of 5 means approximately every 20th packet will be dropped.
/// </summary>
public int PacketDropPercentage;
/// <summary>
/// Use the fuzz factor when you want to fuzz a packet. For every packet
/// a random number generator is used to determine if the packet should have the internal bits flipped.
/// A percentage of 5 means approximately every 20th packet will be fuzzed, and that each bit in the packet
/// has a 5 percent chance to get flipped.
/// </summary>
public int FuzzFactor;
/// <summary>
/// Use the fuzz offset in conjunction with the fuzz factor, the fuzz offset will offset where we start
/// flipping bits. This is useful if you want to only fuzz a part of the packet.
/// </summary>
public int FuzzOffset;
/// <summary>
/// The random seed is used to set the initial seed of the random number generator. This is useful to get
/// deterministic runs in tests for example that are dependant on the random number generator.
/// </summary>
public uint RandomSeed;
}
[StructLayout(LayoutKind.Sequential)]
public struct Context
{
public int MaxPacketCount;
public int MaxPacketSize;
public int PacketDelayMs;
public int PacketJitterMs;
public int PacketDrop;
public int FuzzOffset;
public int FuzzFactor;
public uint RandomSeed;
public Random Random;
// Statistics
public int PacketCount;
public int PacketDropCount;
public int ReadyPackets;
public int WaitingPackets;
public long NextPacketTime;
public long StatsTime;
}
[StructLayout(LayoutKind.Sequential)]
public struct DelayedPacket
{
public int processBufferOffset;
public ushort packetSize;
public ushort packetHeaderPadding;
public long delayUntil;
}
public SimulatorUtility(int packetCount, int maxPacketSize, int packetDelayMs, int packetJitterMs)
{
m_PacketCount = packetCount;
m_MaxPacketSize = maxPacketSize;
m_PacketDelayMs = packetDelayMs;
m_PacketJitterMs = packetJitterMs;
}
public static unsafe void InitializeContext(Parameters param, byte* sharedProcessBuffer)
{
// Store parameters in the shared buffer space
Context* ctx = (Context*) sharedProcessBuffer;
ctx->MaxPacketCount = param.MaxPacketCount;
ctx->MaxPacketSize = param.MaxPacketSize;
ctx->PacketDelayMs = param.PacketDelayMs;
ctx->PacketJitterMs = param.PacketJitterMs;
ctx->PacketDrop = param.PacketDropInterval;
ctx->FuzzFactor = param.FuzzFactor;
ctx->FuzzOffset = param.FuzzOffset;
ctx->PacketCount = 0;
ctx->PacketDropCount = 0;
ctx->Random = new Random();
if (param.RandomSeed > 0)
{
ctx->Random.InitState(param.RandomSeed);
ctx->RandomSeed = param.RandomSeed;
}
else
ctx->Random.InitState();
}
public unsafe bool GetEmptyDataSlot(byte* processBufferPtr, ref int packetPayloadOffset,
ref int packetDataOffset)
{
var dataSize = UnsafeUtility.SizeOf<DelayedPacket>();
var packetPayloadStartOffset = m_PacketCount * dataSize;
bool foundSlot = false;
for (int i = 0; i < m_PacketCount; i++)
{
packetDataOffset = dataSize * i;
DelayedPacket* packetData = (DelayedPacket*) (processBufferPtr + packetDataOffset);
// Check if this slot is empty
if (packetData->delayUntil == 0)
{
foundSlot = true;
packetPayloadOffset = packetPayloadStartOffset + m_MaxPacketSize * i;
break;
}
}
return foundSlot;
}
public unsafe bool GetDelayedPacket(ref NetworkPipelineContext ctx, ref InboundSendBuffer delayedPacket,
ref NetworkPipelineStage.Requests requests, long currentTimestamp)
{
requests = NetworkPipelineStage.Requests.None;
var dataSize = UnsafeUtility.SizeOf<DelayedPacket>();
byte* processBufferPtr = (byte*) ctx.internalProcessBuffer;
var simCtx = (Context*) ctx.internalSharedProcessBuffer;
int oldestPacketIndex = -1;
long oldestTime = long.MaxValue;
int readyPackets = 0;
int packetsInQueue = 0;
for (int i = 0; i < m_PacketCount; i++)
{
DelayedPacket* packet = (DelayedPacket*) (processBufferPtr + dataSize * i);
if ((int) packet->delayUntil == 0) continue;
packetsInQueue++;
if (packet->delayUntil > currentTimestamp) continue;
readyPackets++;
if (oldestTime <= packet->delayUntil) continue;
oldestPacketIndex = i;
oldestTime = packet->delayUntil;
}
simCtx->ReadyPackets = readyPackets;
simCtx->WaitingPackets = packetsInQueue;
simCtx->NextPacketTime = oldestTime;
simCtx->StatsTime = currentTimestamp;
// If more than one item has expired timer we need to resume this pipeline stage
if (readyPackets > 1)
{
requests |= NetworkPipelineStage.Requests.Resume;
}
// If more than one item is present (but doesn't have expired timer) we need to re-run the pipeline
// in a later update call
else if (packetsInQueue > 0)
{
requests |= NetworkPipelineStage.Requests.Update;
}
if (oldestPacketIndex >= 0)
{
DelayedPacket* packet = (DelayedPacket*) (processBufferPtr + dataSize * oldestPacketIndex);
packet->delayUntil = 0;
delayedPacket.bufferWithHeaders = ctx.internalProcessBuffer + packet->processBufferOffset;
delayedPacket.bufferWithHeadersLength = packet->packetSize;
delayedPacket.headerPadding = packet->packetHeaderPadding;
delayedPacket.SetBufferFrombufferWithHeaders();
return true;
}
return false;
}
public unsafe void FuzzPacket(Context *ctx, ref InboundSendBuffer inboundBuffer)
{
int fuzzFactor = ctx->FuzzFactor;
int fuzzOffset = ctx->FuzzOffset;
int rand = ctx->Random.NextInt(0, 100);
if (rand > fuzzFactor)
return;
var length = inboundBuffer.bufferLength;
for (int i = fuzzOffset; i < length; ++i)
{
for (int j = 0; j < 8; ++j)
{
if (fuzzFactor > ctx->Random.NextInt(0, 100))
{
inboundBuffer.buffer[i] ^= (byte)(1 << j);
}
}
}
}
public unsafe bool DelayPacket(ref NetworkPipelineContext ctx, InboundSendBuffer inboundBuffer,
ref NetworkPipelineStage.Requests requests,
long timestamp)
{
// Find empty slot in bookkeeping data space to track this packet
int packetPayloadOffset = 0;
int packetDataOffset = 0;
var processBufferPtr = (byte*) ctx.internalProcessBuffer;
bool foundSlot = GetEmptyDataSlot(processBufferPtr, ref packetPayloadOffset, ref packetDataOffset);
if (!foundSlot)
{
//UnityEngine.Debug.LogWarning("No space left for delaying packet (" + m_PacketCount + " packets in queue)");
return false;
}
UnsafeUtility.MemCpy(ctx.internalProcessBuffer + packetPayloadOffset + inboundBuffer.headerPadding, inboundBuffer.buffer, inboundBuffer.bufferLength);
var param = (SimulatorUtility.Context*) ctx.internalSharedProcessBuffer;
// Add tracking for this packet so we can resurrect later
DelayedPacket packet;
packet.delayUntil = timestamp + m_PacketDelayMs + param->Random.NextInt(m_PacketJitterMs*2) - m_PacketJitterMs;
packet.processBufferOffset = packetPayloadOffset;
packet.packetSize = (ushort)(inboundBuffer.headerPadding + inboundBuffer.bufferLength);
packet.packetHeaderPadding = (ushort)inboundBuffer.headerPadding;
byte* packetPtr = (byte*) &packet;
UnsafeUtility.MemCpy(processBufferPtr + packetDataOffset, packetPtr, UnsafeUtility.SizeOf<DelayedPacket>());
// Schedule an update call so packet can be resurrected later
requests |= NetworkPipelineStage.Requests.Update;
return true;
}
public unsafe bool ShouldDropPacket(Context* ctx, Parameters param, long timestamp)
{
if (param.PacketDropInterval > 0 && (ctx->PacketCount - 1) % param.PacketDropInterval == 0)
return true;
if (param.PacketDropPercentage > 0)
{
//var packetLoss = new System.Random().NextDouble() * 100;
var packetLoss = ctx->Random.NextInt(0, 100);
if (packetLoss < param.PacketDropPercentage)
return true;
}
return false;
}
}
}

11
Packages/com.unity.transport/Runtime/Pipelines/SimulatorUtility.cs.meta


fileFormatVersion: 2
guid: b8e5e0ea1828b4b52bd56c023866f6a6
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

75
Packages/com.unity.transport/Runtime/Pipelines/UnreliableSequencedPipelineStage.cs


using AOT;
using Unity.Burst;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Networking.Transport.Utilities;
namespace Unity.Networking.Transport
{
[BurstCompile]
public unsafe struct UnreliableSequencedPipelineStage : INetworkPipelineStage
{
static TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate> ReceiveFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.ReceiveDelegate>(Receive);
static TransportFunctionPointer<NetworkPipelineStage.SendDelegate> SendFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.SendDelegate>(Send);
static TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate> InitializeConnectionFunctionPointer = new TransportFunctionPointer<NetworkPipelineStage.InitializeConnectionDelegate>(InitializeConnection);
public NetworkPipelineStage StaticInitialize(byte* staticInstanceBuffer, int staticInstanceBufferLength, INetworkParameter[] netParams)
{
return new NetworkPipelineStage(
Receive: ReceiveFunctionPointer,
Send: SendFunctionPointer,
InitializeConnection: InitializeConnectionFunctionPointer,
ReceiveCapacity: UnsafeUtility.SizeOf<int>(),
SendCapacity: UnsafeUtility.SizeOf<int>(),
HeaderCapacity: UnsafeUtility.SizeOf<ushort>(),
SharedStateCapacity: 0
);
}
public int StaticSize => 0;
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.ReceiveDelegate))]
private static void Receive(ref NetworkPipelineContext ctx, ref InboundRecvBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
var inboundArray = NativeArrayUnsafeUtility.ConvertExistingDataToNativeArray<byte>(inboundBuffer.buffer, inboundBuffer.bufferLength, Allocator.Invalid);
#if ENABLE_UNITY_COLLECTIONS_CHECKS
var safetyHandle = AtomicSafetyHandle.GetTempMemoryHandle();
NativeArrayUnsafeUtility.SetAtomicSafetyHandle(ref inboundArray, safetyHandle);
#endif
var reader = new DataStreamReader(inboundArray);
var oldSequenceId = (int*) ctx.internalProcessBuffer;
ushort sequenceId = reader.ReadUShort();
if (SequenceHelpers.GreaterThan16(sequenceId, (ushort)*oldSequenceId))
{
*oldSequenceId = sequenceId;
// Skip over the part of the buffer which contains the header
inboundBuffer = inboundBuffer.Slice(sizeof(ushort));
return;
}
inboundBuffer = default;
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.SendDelegate))]
private static int Send(ref NetworkPipelineContext ctx, ref InboundSendBuffer inboundBuffer, ref NetworkPipelineStage.Requests requests)
{
var sequenceId = (int*) ctx.internalProcessBuffer;
ctx.header.WriteUShort((ushort)*sequenceId);
*sequenceId = (ushort)(*sequenceId + 1);
return (int)Error.StatusCode.Success;
}
[BurstCompile(DisableDirectCall = true)]
[MonoPInvokeCallback(typeof(NetworkPipelineStage.InitializeConnectionDelegate))]
private static void InitializeConnection(byte* staticInstanceBuffer, int staticInstanceBufferLength,
byte* sendProcessBuffer, int sendProcessBufferLength, byte* recvProcessBuffer, int recvProcessBufferLength,
byte* sharedProcessBuffer, int sharedProcessBufferLength)
{
if (recvProcessBufferLength > 0)
{
// The receive processing buffer contains the current sequence ID, initialize it to -1 as it will be incremented when used.
*(int*) recvProcessBuffer = -1;
}
}
}
}

11
Packages/com.unity.transport/Runtime/Pipelines/UnreliableSequencedPipelineStage.cs.meta


fileFormatVersion: 2
guid: 7e08c23b1aba74c94b04b54b8ea39e80
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

8
Packages/com.unity.transport/Runtime/Relay.meta


fileFormatVersion: 2
guid: 44f6fc6bdd501984f8e5a15129b51288
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

8
Packages/com.unity.transport/Runtime/Relay/Messages.meta


fileFormatVersion: 2
guid: f5bc3869f1e541f48bc0d29a8c18533b
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

部分文件因为文件数量过多而无法显示

正在加载...
取消
保存