Home  >  Article  >  Backend Development  >  Multiple implementations of high-performance TCP services in C#

Multiple implementations of high-performance TCP services in C#

PHPz
PHPzOriginal
2017-03-12 16:47:332583browse

The main purpose of this article is to use .NET/C# to implement TCP high-performance services in different ways, including but not limited to the following:

  • APM method, that is, Asynchronous Programming Model

  • TAP method, that is, Task-based Asynchronous Pattern

  • SAEA method, that is, SocketAsyncEventArgs

  • RIO method, that is, Registered I/O

The support for Socket in .NET/C# is based on Windows I/O Completion Ports to complete the port technology encapsulation, and uses different Non-Blocking encapsulation structures to meet different programming needs. The above methods have been fully implemented in Cowboy.Sockets, and the APM and TAP methods have been applied in actual projects. Cowboy.Sockets is still evolving and improving. If there are any problems, please correct them in time.

Although there are so many implementation methods, from an abstract perspective, they are the same and can be described with two Loops: Accept Loop and Read Loop ,As shown below. (The "Loop" mentioned here refers to a loop way, not specifically while/for and other keywords.)

  • ##In any TCP Server implementation, there must be an Accept Socket Loop for receiving Connect requests from the Client. to establish a TCP Connection.

  • In any TCP Server implementation, there must be a Read Socket Loop for receiving data written by the Client.

If the Accept loop is blocked, the connection will not be established quickly, and the server P

ending Backlog will be full, which will cause the Client to receive Connect Timeout exception. If the Read loop is blocked, it will obviously result in the inability to receive data from the client in time, which will cause the client's Send Buffer to be full and no longer able to send data.

From the perspective of implementation details, the location that can cause service blocking may be:

  1. Accept to a new Socket, and building a new Connection requires allocating various resources. Allocating resources is slow;

  2. Accept to the new Socket, but the next Accept is not triggered in time;

  3. Read to the new Buffer, determine the Payload message Length, the judgment process is long;

  4. Read to the new Buffer and find that the Payload has not been collected yet. If you continue to read, it "may" result in a Buffer

    Copy;

  5. After the Payload is received, De-Serialization is performed and converted into a recognizable Protocol Message. The deserialization is slow;

  6. By Business

    Module to process the corresponding Protocol Message, the processing process is slow;

1-2 involves the Accept process and the establishment process of Connection, 3-4 involves the ReceiveBuffer processing process, 5-6 involves the implementation of the application logic side.

The famous Netty network library in Java has made a new attempt at the Buffer part since version 4.0, adopting a design called ByteBuf to implement Buffer Zero Copy to reduce the performance caused by Buffer copy under high concurrency conditions. losses and GC pressure. Projects such as DotNetty, Orleans, and Helios are trying to implement similar ByteBuf in C#.

APM method: TcpSocketServer

The implementation of TcpSocketServer is based on the further encapsulation of TcpListener and TcpClient that comes with the .NET Framework, using BeginXXX and EndXXX based on APM

Interface accomplish.

The Accept Loop in TcpSocketServer refers to,

  • BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> ...

Each successfully established Connection is processed by TcpSocket

Session, so TcpSocketSession will contain Read Loop,

  • BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...

##TcpSocketServer is implemented by exposing Events Notification of Connection establishment and disconnection and data reception.


  event EventHandler<TcpClientConnectedEventArgs> ClientConnected;  event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;  event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;

It is also simple and straightforward to use, just subscribe to event notifications directly.


  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected += server_ClientConnected;
      _server.ClientDisconnected += server_ClientDisconnected;
      _server.ClientDataReceived += server_ClientDataReceived;
      _server.Listen();
  }  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }
TAP method: AsyncTcpSocketServer

The implementation of AsyncTcpSocketServer is based on the further encapsulation of TcpListener and TcpClient that come with the .NET Framework, using XXXAsync interface implementation based on TAP's async/await.

However, in fact, XXXAsync does not create any magical effects. Its internal implementation only converts APM methods into TAP calling methods.


  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]  public Task<Socket> AcceptSocketAsync()
  {      return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]  public Task<TcpClient> AcceptTcpClientAsync()
  {      return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }
The Accept Loop in AsyncTcpSocketServer refers to,


  while (IsListening)
  {      var tcpClient = await _listener.AcceptTcpClientAsync();
  }

Every successfully established Connection It is processed by AsyncTcpSocketSession, so AsyncTcpSocketSession will contain Read Loop,


  while (State == TcpSocketConnectionState.Connected)
  {      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }
In order to make async/await asynchronous to the end, the interface exposed by AsyncTcpSocketServer is also Awaitable.


  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }
When using it, you only need to inject an

object that implements this interface into the constructor of AsyncTcpSocketServer.


  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }
Of course, the implementation of the interface is not mandatory, and the implementation of the method can also be directly injected into the constructor.


  public AsyncTcpSocketServer(
      IPEndPoint listenedEndPoint,
      Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
      Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
      Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
      AsyncTcpSocketServerConfiguration configuration = null)
  {}
SAEA method: TcpSocketSaeaServer

SAEA is the abbreviation of SocketAsyncEventArgs. SocketAsyncEventArgs is an implementation that supports high-performance Socket communication starting from .NET Framework 3.5. The main advantages of SocketAsyncEventArgs compared to the APM method can be described as follows:

The

main feature of these enhancements is the avoidance of the repeated all location and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O require s a System.IAsyncResult object be allocated for each asynchronous socket operation.

In other words, the advantage is that there is no need to generate IAsyncResult and other objects for each call, closer to the native Socket Some.

The recommended steps for using SocketAsyncEventArgs are as follows:

  1. Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.

  2. Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).

  3. Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.

  4. If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.

  5. If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.

  6. Reuse the context for another operation, put it back in the pool, or discard it.

The focus is on pooling. The purpose of pooling is to reuse and reduce runtime allocation. and garbage collection pressure.

TcpSocketSaeaServer is the application and encapsulation of SocketAsyncEventArgs and implements Pooling technology. The focus of TcpSocketSaeaServer is the SaeaAwaitable class. SaeaAwaitable has a built-in SocketAsyncEventArgs and returns SaeaAwaiter through GetAwaiter to support async/await operations. At the same time, the Awaitable implementation of SocketAsyncEventArgs is extended through the SaeaExtensions extension method pair.


  public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)
SaeaPool is a derived implementation of QueuedObjectPool, used to pool SaeaAwaitable instances. At the same time, in order to reduce the construction process of TcpSocketSaeaSession, SessionPool, namely QueuedObjectPool, is also implemented.

TcpSocketSaeaServer 中的 Accept Loop 指的就是,


  while (IsListening)
  {      var saea = _acceptSaeaPool.Take();  
      var socketError = await _listener.AcceptAsync(saea);      if (socketError == SocketError.Success)
      {          var acceptedSocket = saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }

每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,


  var saea = _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);  
  while (State == TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(0, _receiveBuffer.Length);  
      var socketError = await _socket.ReceiveAsync(saea);      if (socketError != SocketError.Success)          break;  
      var receiveCount = saea.Saea.BytesTransferred;      if (receiveCount == 0)          break;
  }

同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。


  public interface ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }

使用起来也是简单直接:


  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {      public async Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

RIO 方式:TcpSocketRioServer

从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue

  • RIOCreateCompletionQueue

  • RIOCreateRequestQueue

  • RIODequeueCompletion

  • RIODeregisterBuffer

  • RIONotify

  • RIOReceive

  • RIOReceiveEx

  • RIORegisterBuffer

  • RIOResizeCompletionQueue

  • RIOResizeRequestQueue

  • RIOSend

  • RIOSendEx

到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。

Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。

同样,通过 TcpSocketRioServer 来实现 Accept Loop,


_listener.OnAccepted = (acceptedSocket) =>{
    Task.Run(async () =>
    {        await Process(acceptedSocket);
    })
    .Forget();
};

通过 TcpSocketRioSession 来处理 Read Loop,


  while (State == TcpSocketConnectionState.Connected)
  {      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);      if (receiveCount == 0)          break;
  }

测试代码一如既往的类似:


  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {      public async Task OnSessionStarted(TcpSocketRioSession session)
      {          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          Console.WriteLine(string.Format("TCP session has connected {0}.", session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.Write(string.Format("Client : --> "));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

参考资料

  • Asynchronous Programming Model (APM)

  • Task-based Asynchronous Pattern (TAP)

  • Event-based Asynchronous Pattern (EAP)

  • SocketAsyncEventArgs

  • Registered I/O

  • Netty: Reference counted objects

  • Socket Performance Enhancements in Version 3.5

  • What's New for Windows Sockets for Windows 8.1 and Windows Server 2012 R2

  • RIO_EXTENSION_FUNCTION_TABLE structure

  • Windows 8 Registered I/O Networking Extensions

本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。

The above is the detailed content of Multiple implementations of high-performance TCP services in C#. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn