哎~~ 想想大部分园友应该对"高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是 《猴赛雷,C#编写TCP服务的花样姿势
哎~~ 想想大部分园友应该对 "高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是《猴赛雷,C#编写TCP服务的花样姿势!》。
本篇文章的主旨是使用 .NET/C# 实现 TCP 高性能服务的不同方式,包括但不限于如下内容:
在 .NET/C# 中对于 Socket 的支持均是基于 Windows I/O Completion Ports 完成端口技术的封装,通过不同的 Non-Blocking 封装结构来满足不同的编程需求。以上方式均已在 Cowboy.Sockets 中有完整实现,并且 APM 和 TAP 方式已经在实际项目中应用。Cowboy.Sockets 还在不断的进化和完善中,如有任何问题请及时指正。
虽然有这么多种实现方式,但抽象的看,它们是一样一样的,用两个 Loop 即可描述:Accept Loop 和 Read Loop,如下图所示。(这里提及的 "Loop" 指的是一种循环方式,而非特指 while/for 等关键字。)
如果 Accept 循环阻塞,则会导致无法快速的建立连接,服务端 Pending Backlog 满,进而导致 Client 端收到 Connect Timeout 的异常。如果 Read 循环阻塞,则显然会导致无法及时收到 Client 端发过来的数据,进而导致 Client 端 Send Buffer 满,无法再发送数据。
从实现细节的角度看,能够导致服务阻塞的位置可能在:
1-2 涉及到 Accept 过程和 Connection 的建立过程,3-4 涉及到 ReceiveBuffer 的处理过程,5-6 涉及到应用逻辑侧的实现。
java 中著名的 Netty 网络库从 4.0 版本开始对于 Buffer 部分做了全新的尝试,采用了名叫 ByteBuf 的设计,实现 Buffer Zero Copy 以减少高并发条件下 Buffer 拷贝带来的性能损失和 GC 压力。DotNetty,Orleans ,Helios 等项目正在尝试在 C# 中进行类似的 ByteBuf 的实现。
TcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 APM 的 BeginXXX 和 EndXXX 接口实现。
TcpSocketServer 中的 Accept Loop 指的就是,
每一个建立成功的 Connection 由 TcpSocketsession 来处理,所以 TcpSocketSession 中会包含 Read Loop,
TcpSocketServer 通过暴露 Event 来实现 Connection 的建立与断开和数据接收的通知。
<span style="color: #0000ff;">event</span> EventHandler<tcpclientconnectedeventargs><span style="color: #000000;"> ClientConnected; </span><span style="color: #0000ff;">event</span> EventHandler<tcpclientdisconnectedeventargs><span style="color: #000000;"> ClientDisconnected; </span><span style="color: #0000ff;">event</span> EventHandler<tcpclientdatareceivedeventargs> ClientDataReceived;</tcpclientdatareceivedeventargs></tcpclientdisconnectedeventargs></tcpclientconnectedeventargs>
使用也是简单直接,直接订阅事件通知。
<span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> StartServer() { _server </span>= <span style="color: #0000ff;">new</span> TcpSocketServer(<span style="color: #800080;">22222</span><span style="color: #000000;">); _server.ClientConnected </span>+=<span style="color: #000000;"> server_ClientConnected; _server.ClientDisconnected </span>+=<span style="color: #000000;"> server_ClientDisconnected; _server.ClientDataReceived </span>+=<span style="color: #000000;"> server_ClientDataReceived; _server.Listen(); } </span><span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span> server_ClientConnected(<span style="color: #0000ff;">object</span><span style="color: #000000;"> sender, TcpClientConnectedEventArgs e) { Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP client {0} has connected {1}.</span><span style="color: #800000;">"</span><span style="color: #000000;">, e.Session.RemoteEndPoint, e.Session)); } </span><span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span> server_ClientDisconnected(<span style="color: #0000ff;">object</span><span style="color: #000000;"> sender, TcpClientDisconnectedEventArgs e) { Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP client {0} has disconnected.</span><span style="color: #800000;">"</span><span style="color: #000000;">, e.Session)); } </span><span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span> server_ClientDataReceived(<span style="color: #0000ff;">object</span><span style="color: #000000;"> sender, TcpClientDataReceivedEventArgs e) { </span><span style="color: #0000ff;">var</span> text =<span style="color: #000000;"> Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength); Console.Write(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">Client : {0} {1} --> </span><span style="color: #800000;">"</span><span style="color: #000000;">, e.Session.RemoteEndPoint, e.Session)); Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">{0}</span><span style="color: #800000;">"</span><span style="color: #000000;">, text)); _server.Broadcast(Encoding.UTF8.GetBytes(text)); }</span>
AsyncTcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 TAP 的 async/await 的 XXXAsync 接口实现。
然而,实际上 XXXAsync 并没有创建什么神奇的效果,其内部实现只是将 APM 的方法转换成了 TAP 的调用方式。
<span style="color: #008000;">//</span><span style="color: #008000;">************* Task-based async public methods *************************</span> [HostProtection(ExternalThreading = <span style="color: #0000ff;">true</span><span style="color: #000000;">)] </span><span style="color: #0000ff;">public</span> Task<socket><span style="color: #000000;"> AcceptSocketAsync() { </span><span style="color: #0000ff;">return</span> Task<socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, <span style="color: #0000ff;">null</span><span style="color: #000000;">); } [HostProtection(ExternalThreading </span>= <span style="color: #0000ff;">true</span><span style="color: #000000;">)] </span><span style="color: #0000ff;">public</span> Task<tcpclient><span style="color: #000000;"> AcceptTcpClientAsync() { </span><span style="color: #0000ff;">return</span> Task<tcpclient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, <span style="color: #0000ff;">null</span><span style="color: #000000;">); }</span></tcpclient></tcpclient></socket></socket>
AsyncTcpSocketServer 中的 Accept Loop 指的就是,
<span style="color: #0000ff;">while</span><span style="color: #000000;"> (IsListening) { </span><span style="color: #0000ff;">var</span> tcpClient = <span style="color: #0000ff;">await</span><span style="color: #000000;"> _listener.AcceptTcpClientAsync(); }</span>
每一个建立成功的 Connection 由 AsyncTcpSocketSession 来处理,所以 AsyncTcpSocketSession 中会包含 Read Loop,
<span style="color: #0000ff;">while</span> (State ==<span style="color: #000000;"> TcpSocketConnectionState.Connected) { </span><span style="color: #0000ff;">int</span> receiveCount = <span style="color: #0000ff;">await</span> _stream.ReadAsync(_receiveBuffer, <span style="color: #800080;">0</span><span style="color: #000000;">, _receiveBuffer.Length); }</span>
为了将 async/await 异步到底,AsyncTcpSocketServer 所暴露的接口也同样是 Awaitable 的。
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">interface</span><span style="color: #000000;"> IAsyncTcpSocketServerMessageDispatcher { Task OnSessionStarted(AsyncTcpSocketSession session); Task OnSessionDataReceived(AsyncTcpSocketSession session, </span><span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count); Task OnSessionClosed(AsyncTcpSocketSession session); }</span>
使用时仅需将一个实现了该接口的对象注入到 AsyncTcpSocketServer 的构造函数中即可。
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher { </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionStarted(AsyncTcpSocketSession session) { Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has connected {1}.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session.RemoteEndPoint, session)); </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask; } </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span> Task OnSessionDataReceived(AsyncTcpSocketSession session, <span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count) { </span><span style="color: #0000ff;">var</span> text =<span style="color: #000000;"> Encoding.UTF8.GetString(data, offset, count); Console.Write(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">Client : {0} --> </span><span style="color: #800000;">"</span><span style="color: #000000;">, session.RemoteEndPoint)); Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">{0}</span><span style="color: #800000;">"</span><span style="color: #000000;">, text)); </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> session.SendAsync(Encoding.UTF8.GetBytes(text)); } </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionClosed(AsyncTcpSocketSession session) { Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has disconnected.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session)); </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask; } }</span>
当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。
<span style="color: #0000ff;">public</span><span style="color: #000000;"> AsyncTcpSocketServer( ipEndPoint listenedEndPoint, Func</span><asynctcpsocketsession style="color: #0000ff;">byte[], <span style="color: #0000ff;">int</span>, <span style="color: #0000ff;">int</span>, Task> onSessionDataReceived = <span style="color: #0000ff;">null</span><span style="color: #000000;">, Func</span><asynctcpsocketsession task> onSessionStarted = <span style="color: #0000ff;">null</span><span style="color: #000000;">, Func</span><asynctcpsocketsession task> onSessionClosed = <span style="color: #0000ff;">null</span><span style="color: #000000;">, AsyncTcpSocketServerConfiguration configuration </span>= <span style="color: #0000ff;">null</span><span style="color: #000000;">) {}</span></asynctcpsocketsession></asynctcpsocketsession></asynctcpsocketsession>
SAEA 是 SocketAsyncEventArgs 的简写。SocketAsyncEventArgs 是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。SocketAsyncEventArgs 相比于 APM 方式的主要优点可以描述如下:
The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket Operation.
也就是说,优点就是无需为每次调用都生成 IAsyncResult 等对象,向原生 Socket 更靠近一些。
使用 SocketAsyncEventArgs 的推荐步骤如下:
重点在于池化(Pooling),池化的目的就是为了重用和减少运行时分配和垃圾回收的压力。
TcpSocketSaeaServer 即是对 SocketAsyncEventArgs 的应用和封装,并实现了 Pooling 技术。TcpSocketSaeaServer 中的重点是 SaeaAwaitable 类,SaeaAwaitable 中内置了一个 SocketAsyncEventArgs,并通过 GetAwaiter 返回 SaeaAwaiter 来支持 async/await 操作。同时,通过 SaeaExtensions 扩展方法对来扩展 SocketAsyncEventArgs 的 Awaitable 实现。
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable AcceptAsync(<span style="color: #0000ff;">this</span><span style="color: #000000;"> Socket socket, SaeaAwaitable awaitable) </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable ConnectAsync(<span style="color: #0000ff;">this</span><span style="color: #000000;"> Socket socket, SaeaAwaitable awaitable) </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable DisonnectAsync(<span style="color: #0000ff;">this</span><span style="color: #000000;"> Socket socket, SaeaAwaitable awaitable) </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable ReceiveAsync(<span style="color: #0000ff;">this</span><span style="color: #000000;"> Socket socket, SaeaAwaitable awaitable) </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable SendAsync(<span style="color: #0000ff;">this</span> Socket socket, SaeaAwaitable awaitable)
SaeaPool 则是一个 QueuedObjectPool
TcpSocketSaeaServer 中的 Accept Loop 指的就是,
<span style="color: #0000ff;">while</span><span style="color: #000000;"> (IsListening) { </span><span style="color: #0000ff;">var</span> saea =<span style="color: #000000;"> _acceptSaeaPool.Take(); </span><span style="color: #0000ff;">var</span> socketError = <span style="color: #0000ff;">await</span><span style="color: #000000;"> _listener.AcceptAsync(saea); </span><span style="color: #0000ff;">if</span> (socketError ==<span style="color: #000000;"> SocketError.Success) { </span><span style="color: #0000ff;">var</span> acceptedSocket =<span style="color: #000000;"> saea.Saea.AcceptSocket; } _acceptSaeaPool.Return(saea); }</span>
每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,
<span style="color: #0000ff;">var</span> saea =<span style="color: #000000;"> _saeaPool.Take(); saea.Saea.SetBuffer(_receiveBuffer, </span><span style="color: #800080;">0</span><span style="color: #000000;">, _receiveBuffer.Length); </span><span style="color: #0000ff;">while</span> (State ==<span style="color: #000000;"> TcpSocketConnectionState.Connected) { saea.Saea.SetBuffer(</span><span style="color: #800080;">0</span><span style="color: #000000;">, _receiveBuffer.Length); </span><span style="color: #0000ff;">var</span> socketError = <span style="color: #0000ff;">await</span><span style="color: #000000;"> _socket.ReceiveAsync(saea); </span><span style="color: #0000ff;">if</span> (socketError !=<span style="color: #000000;"> SocketError.Success) </span><span style="color: #0000ff;">break</span><span style="color: #000000;">; </span><span style="color: #0000ff;">var</span> receiveCount =<span style="color: #000000;"> saea.Saea.BytesTransferred; </span><span style="color: #0000ff;">if</span> (receiveCount == <span style="color: #800080;">0</span><span style="color: #000000;">) </span><span style="color: #0000ff;">break</span><span style="color: #000000;">; }</span>
同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">interface</span><span style="color: #000000;"> ITcpSocketSaeaServerMessageDispatcher { Task OnSessionStarted(TcpSocketSaeaSession session); Task OnSessionDataReceived(TcpSocketSaeaSession session, </span><span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count); Task OnSessionClosed(TcpSocketSaeaSession session); }</span>
使用起来也是简单直接:
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher { </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionStarted(TcpSocketSaeaSession session) { Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has connected {1}.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session.RemoteEndPoint, session)); </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask; } </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span> Task OnSessionDataReceived(TcpSocketSaeaSession session, <span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count) { </span><span style="color: #0000ff;">var</span> text =<span style="color: #000000;"> Encoding.UTF8.GetString(data, offset, count); Console.Write(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">Client : {0} --> </span><span style="color: #800000;">"</span><span style="color: #000000;">, session.RemoteEndPoint)); Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">{0}</span><span style="color: #800000;">"</span><span style="color: #000000;">, text)); </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> session.SendAsync(Encoding.UTF8.GetBytes(text)); } </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionClosed(TcpSocketSaeaSession session) { Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has disconnected.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session)); </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask; } }</span>
从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。
The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.
到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。
Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。
同样,通过 TcpSocketRioServer 来实现 Accept Loop,
_listener.OnAccepted = (acceptedSocket) =><span style="color: #000000;"> { Task.Run(</span><span style="color: #0000ff;">async</span> () =><span style="color: #000000;"> { </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Process(acceptedSocket); }) .Forget(); };</span>
通过 TcpSocketRioSession 来处理 Read Loop,
<span style="color: #0000ff;">while</span> (State ==<span style="color: #000000;"> TcpSocketConnectionState.Connected) { </span><span style="color: #0000ff;">int</span> receiveCount = <span style="color: #0000ff;">await</span> _stream.ReadAsync(_receiveBuffer, <span style="color: #800080;">0</span><span style="color: #000000;">, _receiveBuffer.Length); </span><span style="color: #0000ff;">if</span> (receiveCount == <span style="color: #800080;">0</span><span style="color: #000000;">) </span><span style="color: #0000ff;">break</span><span style="color: #000000;">; }</span>
测试代码一如既往的类似:
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher { </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionStarted(TcpSocketRioSession session) { </span><span style="color: #008000;">//</span><span style="color: #008000;">Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));</span> Console.WriteLine(<span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session has connected {0}.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session)); </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask; } </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span> Task OnSessionDataReceived(TcpSocketRioSession session, <span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count) { </span><span style="color: #0000ff;">var</span> text =<span style="color: #000000;"> Encoding.UTF8.GetString(data, offset, count); </span><span style="color: #008000;">//</span><span style="color: #008000;">Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));</span> Console.Write(<span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">Client : --> </span><span style="color: #800000;">"</span><span style="color: #000000;">)); Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">{0}</span><span style="color: #800000;">"</span><span style="color: #000000;">, text)); </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> session.SendAsync(Encoding.UTF8.GetBytes(text)); } </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionClosed(TcpSocketRioSession session) { Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has disconnected.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session)); </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask; } }</span>
本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。