Heim  >  Artikel  >  Backend-Entwicklung  >  Mehrere Implementierungen leistungsstarker TCP-Dienste in C#

Mehrere Implementierungen leistungsstarker TCP-Dienste in C#

PHPz
PHPzOriginal
2017-03-12 16:47:332532Durchsuche

Der Hauptzweck dieses Artikels besteht darin, .NET/C# zu verwenden, um TCP-Hochleistungs-Dienste auf verschiedene Arten zu implementieren, einschließlich, aber nicht beschränkt auf die folgenden:

  • APM-Methode, das heißt Asynchronous Programming Modell

  • TAP-Methode, das heißt, Aufgabenbasiertes asynchrones Muster

  • SAEEine Methode, also SocketAsyncEventArgs

  • RIO-Methode, also Registered I/O

Socket-Unterstützung in .NET/C# basiert auf Windows I/O Completion Ports, um die Kapselung der Port-Technologie zu vervollständigen, und verwendet verschiedene nicht blockierende Kapselungsstrukturen, um unterschiedliche Programmierung Bedürfnisse. Die oben genannten Methoden wurden vollständig in Cowboy.Sockets implementiert und die APM- und TAP-Methoden wurden in tatsächlichen Projekten angewendet. Cowboy.Sockets entwickelt sich weiter und verbessert sich. Sollten Probleme auftreten, beheben Sie diese bitte rechtzeitig.

Obwohl es so viele Implementierungsmethoden gibt, sind sie in abstrakter Sicht gleich, was durch zwei Schleifen beschrieben werden kann:

Accept Loop und Read Loop , as in der folgenden Abbildung dargestellt. (Die hier erwähnte „Schleife“ bezieht sich auf einen -Schleifen-Weg, nicht speziell auf while/für und andere Schlüsselwörter. )

  • In jeder TCP-Server-Implementierung muss eine Accept-Socket-Schleife zum Empfangen von Verbindungsanfragen vom Client vorhanden sein, um ein TCP einzurichten Verbindung.

  • In jeder TCP-Server-Implementierung muss eine Lese-Socket-Schleife vorhanden sein, um vom Client geschriebene Daten zu empfangen.

Wenn die Accept-Schleife blockiert ist, wird die Verbindung nicht schnell hergestellt und der P

Ending Backlog des Servers ist voll, was dazu führt, dass der Client dies tut Erhalten Sie eine Connect-TimeOut-Ausnahme. Wenn die Leseschleife blockiert ist, führt dies offensichtlich dazu, dass Daten nicht rechtzeitig vom Client empfangen werden können, was dazu führt, dass der Sendepuffer des Clients voll ist und keine Daten mehr senden können.

Aus Sicht der Implementierungsdetails kann der Ort, der zu einer Dienstblockierung führen kann, sein:

  1. Akzeptieren Sie einen neuen Socket und der Aufbau einer neuen Verbindung erfordert die Zuweisung verschiedener Ressourcen . Die Zuweisung von Ressourcen ist langsam;

  2. Akzeptieren in einen neuen Socket und das nächste Akzeptieren wird nicht rechtzeitig ausgelöst

  3. Lesen in einen neuen Puffer und bestimmen Sie die Länge der Nutzlastnachricht. Der Beurteilungsprozess ist langwierig.

  4. Lesen Sie den neuen Puffer und stellen Sie fest, dass die Nutzlast noch nicht erfasst wurde. kann zu einer Puffer-

    Kopie führen;

  5. Nachdem die Nutzlast empfangen wurde, wird eine De-Serialisierung durchgeführt und in eine erkennbare Protokollnachricht umgewandelt. Die Deserialisierung ist langsam;

  6. Vom Business

    Modul zur Verarbeitung der entsprechenden Protokollnachricht ist der Verarbeitungsprozess langsam

1-2; Der Akzeptanzprozess und der Verbindungsaufbauprozess, 3-4 umfasst den Verarbeitungsprozess von ReceiveBuffer, 5-6 umfasst die Implementierung der Anwendungslogikseite.

Die berühmte Netty-Netzwerkbibliothek in Java hat ab Version 4.0 einen neuen Versuch im Pufferteil unternommen und ein Design namens ByteBuf übernommen, um Buffer Zero Copy zu implementieren und die durch Pufferkopie unter Bedingungen hoher Parallelität verursachte Leistung zu reduzieren. Verluste und GC-Druck. Projekte wie DotNetty, Orleans und Helios versuchen, ähnliches ByteBuf in C# zu implementieren.

APM-Methode: TcpSocketServer

Die Implementierung von TcpSocketServer basiert auf der weiteren Kapselung von TcpListener und TcpClient, die mit dem .NET Framework geliefert wird, unter Verwendung der APM-basierten BeginXXX- und EndXXX-

Schnittstelle erreichen.

Die Accept-Schleife in TcpSocketServer bezieht sich auf:

  • BeginAccept -> EndAccept -> 🎜>

  • Jede erfolgreich hergestellte Verbindung wird von TcpSocket
Session

verarbeitet, sodass TcpSocketSession eine Leseschleife enthält,


ist ebenfalls einfach und unkompliziert zu verwenden. Abonnieren Sie einfach direkt die

Ereignisbenachrichtigung
  event EventHandler<TcpClientConnectedEventArgs> ClientConnected;  event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;  event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;
.

TAP-Methode: AsyncTcpSocketServer

Die Implementierung von AsyncTcpSocketServer basiert auf der weiteren Kapselung von TcpListener und TcpClient, die mit der . NET Framework, implementiert mithilfe der XXXAsync-Schnittstelle basierend auf TAPs async/await.
  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected += server_ClientConnected;
      _server.ClientDisconnected += server_ClientDisconnected;
      _server.ClientDataReceived += server_ClientDataReceived;
      _server.Listen();
  }  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }

Tatsächlich erzeugt XXXAsync jedoch keine magischen Effekte. Seine interne Implementierung wandelt lediglich APM-Methoden in TAP-Aufrufmethoden um.

Die Akzeptanzschleife in AsyncTcpSocketServer bezieht sich auf

  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]  public Task<Socket> AcceptSocketAsync()
  {      return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]  public Task<TcpClient> AcceptTcpClientAsync()
  {      return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }


Alle erfolgreich Die hergestellte Verbindung wird von Async

  while (IsListening)
  {      var tcpClient = await _listener.AcceptTcpClientAsync();
  }
TcpSocketSession

verarbeitet, daher enthält AsyncTcpSocketSession eine Leseschleife,

Um async/await bis zum Ende asynchron zu machen, ist die von AsyncTcpSocketServer bereitgestellte Schnittstelle auch Awaitable.

  while (State == TcpSocketConnectionState.Connected)
  {      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }

Wenn Sie es verwenden, müssen Sie nur ein
Objekt

injizieren, das diese Schnittstelle in den
  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }
Konstruktor

von AsyncTcpSocketServer implementiert . Dürfen.

Natürlich ist die Implementierung der Schnittstelle nicht zwingend erforderlich, und die Implementierung der Methode kann auch direkt in den Konstruktor eingefügt werden.

  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

SAEA-Methode: TcpSocketSaeaServer

SAEA ist die Abkürzung für SocketAsyncEventArgs. SocketAsyncEventArgs ist eine Implementierung, die leistungsstarke Socket-Kommunikation ab .NET Framework 3.5 unterstützt. Die Hauptvorteile von SocketAsyncEventArgs im Vergleich zur APM-Methode lassen sich wie folgt beschreiben:
  public AsyncTcpSocketServer(
      IPEndPoint listenedEndPoint,
      Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
      Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
      Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
      AsyncTcpSocketServerConfiguration configuration = null)
  {}

Das

Hauptmerkmal

dieser Erweiterungen ist die
Vermeidung des wiederholten

alles Ort und Synchronisierung von Objektenbei asynchronen Socket-I/O-Vorgängen mit hohem Volumen aktuellwird von der Socket-Klasse für asynchrone Socket-I/O erfordert Es wird ein System.IAsyncResult-Objekt für jeden asynchronen Socket-Vorgang zugewiesen.Mit anderen Worten: Der Vorteil besteht darin, dass kein IAsyncResult und andere Objekte generiert werden müssen Jeder Anruf nähert sich dem nativen Socket.

Die empfohlenen Schritte für die Verwendung von SocketAsyncEventArgs lauten wie folgt:

Ordnen Sie ein neues SocketAsyncEventArgs-Kontextobjekt zu oder holen Sie sich ein kostenloses Objekt aus einem Anwendungspool.

  1. Legen Sie Eigenschaften für das Kontextobjekt auf den auszuführenden Vorgang fest (z. B. die Callback-Delegatenmethode und den Datenpuffer).

  2. Rufen Sie die entsprechende Socket-Methode auf (xxxAsync), um den asynchronen Vorgang zu initiieren.

  3. Wenn
  4. die asynchrone Socket-Methode (xxxAsync) im Rückruf „true“ zurückgibt, fragen Sie die Kontexteigenschaften nach dem Abschlussstatus ab.
  5. Wenn die asynchrone Socket-Methode (xxxAsync) im Rückruf false zurückgibt, wird der Vorgang synchron abgeschlossen. Die Kontexteigenschaften können nach dem Operationsergebnis abgefragt werden.

  6. Verwenden Sie den Kontext für eine andere Operation, legen Sie ihn wieder in den Pool oder verwerfen Sie ihn.

  7. Der Schwerpunkt liegt auf dem Pooling. Der Zweck des Poolings besteht darin, die Laufzeitzuweisung wiederzuverwenden und zu reduzieren und Müllsammeldruck.

  8. TcpSocketSaeaServer ist die Anwendung und Kapselung von SocketAsyncEventArgs und implementiert die Pooling-Technologie. Der Schwerpunkt von TcpSocketSaeaServer liegt auf der SaeaAwaitable-Klasse, die über ein integriertes SocketAsyncEventArgs verfügt und SaeaAwaiter über GetAwaiter zurückgibt, um asynchrone/wartende Vorgänge zu unterstützen. Gleichzeitig wird die Awaitable-Implementierung von SocketAsyncEventArgs durch das Erweiterungsmethodenpaar SaeaExtensions erweitert.

SaeaPool ist eine abgeleitete Implementierung von QueuedObjectPool, die zum Poolen von SaeaAwaitable-Instanzen verwendet wird. Um den Erstellungsprozess von TcpSocketSaeaSession zu verkürzen, wird gleichzeitig auch SessionPool, nämlich QueuedObjectPool, implementiert.

TcpSocketSaeaServer 中的 Accept Loop 指的就是,


  while (IsListening)
  {      var saea = _acceptSaeaPool.Take();  
      var socketError = await _listener.AcceptAsync(saea);      if (socketError == SocketError.Success)
      {          var acceptedSocket = saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }

每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,


  var saea = _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);  
  while (State == TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(0, _receiveBuffer.Length);  
      var socketError = await _socket.ReceiveAsync(saea);      if (socketError != SocketError.Success)          break;  
      var receiveCount = saea.Saea.BytesTransferred;      if (receiveCount == 0)          break;
  }

同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。


  public interface ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }

使用起来也是简单直接:


  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {      public async Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

RIO 方式:TcpSocketRioServer

从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue

  • RIOCreateCompletionQueue

  • RIOCreateRequestQueue

  • RIODequeueCompletion

  • RIODeregisterBuffer

  • RIONotify

  • RIOReceive

  • RIOReceiveEx

  • RIORegisterBuffer

  • RIOResizeCompletionQueue

  • RIOResizeRequestQueue

  • RIOSend

  • RIOSendEx

到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。

Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。

同样,通过 TcpSocketRioServer 来实现 Accept Loop,


_listener.OnAccepted = (acceptedSocket) =>{
    Task.Run(async () =>
    {        await Process(acceptedSocket);
    })
    .Forget();
};

通过 TcpSocketRioSession 来处理 Read Loop,


  while (State == TcpSocketConnectionState.Connected)
  {      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);      if (receiveCount == 0)          break;
  }

测试代码一如既往的类似:


  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {      public async Task OnSessionStarted(TcpSocketRioSession session)
      {          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          Console.WriteLine(string.Format("TCP session has connected {0}.", session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.Write(string.Format("Client : --> "));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

参考资料

  • Asynchronous Programming Model (APM)

  • Task-based Asynchronous Pattern (TAP)

  • Event-based Asynchronous Pattern (EAP)

  • SocketAsyncEventArgs

  • Registered I/O

  • Netty: Reference counted objects

  • Socket Performance Enhancements in Version 3.5

  • What's New for Windows Sockets for Windows 8.1 and Windows Server 2012 R2

  • RIO_EXTENSION_FUNCTION_TABLE structure

  • Windows 8 Registered I/O Networking Extensions

本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。

Das obige ist der detaillierte Inhalt vonMehrere Implementierungen leistungsstarker TCP-Dienste in C#. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn