Home >Backend Development >C#.Net Tutorial >Multiple implementations of high-performance TCP services in C#
The main purpose of this article is to use .NET/C# to implement TCP high-performance services in different ways, including but not limited to the following:
TAP method, that is, Task-based Asynchronous Pattern
SAEA method, that is, SocketAsyncEventArgs
RIO method, that is, Registered I/O
The support for Socket in .NET/C# is based on Windows I/O Completion Ports to complete the port technology encapsulation, and uses different Non-Blocking encapsulation structures to meet different programming needs. The above methods have been fully implemented in Cowboy.Sockets, and the APM and TAP methods have been applied in actual projects. Cowboy.Sockets is still evolving and improving. If there are any problems, please correct them in time.
Although there are so many implementation methods, from an abstract perspective, they are the same and can be described with two Loops: Accept Loop and Read Loop ,As shown below. (The "Loop" mentioned here refers to a loop way, not specifically while/for and other keywords.)
ending Backlog will be full, which will cause the Client to receive Connect Timeout exception. If the Read loop is blocked, it will obviously result in the inability to receive data from the client in time, which will cause the client's Send Buffer to be full and no longer able to send data.
From the perspective of implementation details, the location that can cause service blocking may be:Copy;
Module to process the corresponding Protocol Message, the processing process is slow;
Interface accomplish.
The Accept Loop in TcpSocketServer refers to,Session, so TcpSocketSession will contain Read Loop,
BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...
event EventHandler<TcpClientConnectedEventArgs> ClientConnected; event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected; event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;
It is also simple and straightforward to use, just subscribe to event notifications directly.
private static void StartServer() { _server = new TcpSocketServer(22222); _server.ClientConnected += server_ClientConnected; _server.ClientDisconnected += server_ClientDisconnected; _server.ClientDataReceived += server_ClientDataReceived; _server.Listen(); } static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session)); } static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session)); } static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e) { var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength); Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session)); Console.WriteLine(string.Format("{0}", text)); _server.Broadcast(Encoding.UTF8.GetBytes(text)); }TAP method: AsyncTcpSocketServerThe implementation of AsyncTcpSocketServer is based on the further encapsulation of TcpListener and TcpClient that come with the .NET Framework, using XXXAsync interface implementation based on TAP's async/await. However, in fact, XXXAsync does not create any magical effects. Its internal implementation only converts APM methods into TAP calling methods.
//************* Task-based async public methods ************************* [HostProtection(ExternalThreading = true)] public Task<Socket> AcceptSocketAsync() { return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null); } [HostProtection(ExternalThreading = true)] public Task<TcpClient> AcceptTcpClientAsync() { return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null); }The Accept Loop in AsyncTcpSocketServer refers to,
while (IsListening) { var tcpClient = await _listener.AcceptTcpClientAsync(); }
Every successfully established Connection It is processed by AsyncTcpSocketSession, so AsyncTcpSocketSession will contain Read Loop,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); }In order to make async/await asynchronous to the end, the interface exposed by AsyncTcpSocketServer is also Awaitable.
public interface IAsyncTcpSocketServerMessageDispatcher { Task OnSessionStarted(AsyncTcpSocketSession session); Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count); Task OnSessionClosed(AsyncTcpSocketSession session); }When using it, you only need to inject an
object that implements this interface into the constructor of AsyncTcpSocketServer.
public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher { public async Task OnSessionStarted(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }Of course, the implementation of the interface is not mandatory, and the implementation of the method can also be directly injected into the constructor.
public AsyncTcpSocketServer( IPEndPoint listenedEndPoint, Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null, Func<AsyncTcpSocketSession, Task> onSessionStarted = null, Func<AsyncTcpSocketSession, Task> onSessionClosed = null, AsyncTcpSocketServerConfiguration configuration = null) {}SAEA method: TcpSocketSaeaServerSAEA is the abbreviation of SocketAsyncEventArgs. SocketAsyncEventArgs is an implementation that supports high-performance Socket communication starting from .NET Framework 3.5. The main advantages of SocketAsyncEventArgs compared to the APM method can be described as follows:
TheIn other words, the advantage is that there is no need to generate IAsyncResult and other objects for each call, closer to the native Socket Some. The recommended steps for using SocketAsyncEventArgs are as follows:main feature of these enhancements is the avoidance of the repeated all location and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O require s a System.IAsyncResult object be allocated for each asynchronous socket operation.
public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)SaeaPool is a derived implementation of QueuedObjectPool
TcpSocketSaeaServer 中的 Accept Loop 指的就是,
while (IsListening) { var saea = _acceptSaeaPool.Take(); var socketError = await _listener.AcceptAsync(saea); if (socketError == SocketError.Success) { var acceptedSocket = saea.Saea.AcceptSocket; } _acceptSaeaPool.Return(saea); }
每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,
var saea = _saeaPool.Take(); saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length); while (State == TcpSocketConnectionState.Connected) { saea.Saea.SetBuffer(0, _receiveBuffer.Length); var socketError = await _socket.ReceiveAsync(saea); if (socketError != SocketError.Success) break; var receiveCount = saea.Saea.BytesTransferred; if (receiveCount == 0) break; }
同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。
public interface ITcpSocketSaeaServerMessageDispatcher { Task OnSessionStarted(TcpSocketSaeaSession session); Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count); Task OnSessionClosed(TcpSocketSaeaSession session); }
使用起来也是简单直接:
public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。
The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.
RIOCloseCompletionQueue
RIOCreateCompletionQueue
RIOCreateRequestQueue
RIODequeueCompletion
RIODeregisterBuffer
RIONotify
RIOReceive
RIOReceiveEx
RIORegisterBuffer
RIOResizeCompletionQueue
RIOResizeRequestQueue
RIOSend
RIOSendEx
到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。
Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。
同样,通过 TcpSocketRioServer 来实现 Accept Loop,
_listener.OnAccepted = (acceptedSocket) =>{ Task.Run(async () => { await Process(acceptedSocket); }) .Forget(); };
通过 TcpSocketRioSession 来处理 Read Loop,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); if (receiveCount == 0) break; }
测试代码一如既往的类似:
public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketRioSession session) { //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); Console.WriteLine(string.Format("TCP session has connected {0}.", session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.Write(string.Format("Client : --> ")); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketRioSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
Asynchronous Programming Model (APM)
Task-based Asynchronous Pattern (TAP)
Event-based Asynchronous Pattern (EAP)
SocketAsyncEventArgs
Registered I/O
Netty: Reference counted objects
Socket Performance Enhancements in Version 3.5
What's New for Windows Sockets for Windows 8.1 and Windows Server 2012 R2
RIO_EXTENSION_FUNCTION_TABLE structure
Windows 8 Registered I/O Networking Extensions
本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。
The above is the detailed content of Multiple implementations of high-performance TCP services in C#. For more information, please follow other related articles on the PHP Chinese website!