ホームページ  >  記事  >  バックエンド開発  >  C# での高性能 TCP サービスの複数の実装

C# での高性能 TCP サービスの複数の実装

PHPz
PHPzオリジナル
2017-03-12 16:47:332573ブラウズ

この記事の主な目的は、.NET/C#を使用してTCPの高性能サービスを実装するためのさまざまな方法です。これには以下が含まれますが、これらに限定されません:

  • APMメソッド、つまりAsynchr onous ProgrammingModel

  • TAPメソッド、つまりタスクベースの非同期パターン

  • SAEAメソッド、つまりSocketAsyncEventArgs

  • RIOメソッド、つまりRegistered I/O

.NET/C# で S ソケットをサポート これらはすべて Windows I/O 完了ポートに基づいてポート テクノロジのパッケージ化を完了し、さまざまなノンブロッキング パッケージ構造を通じてさまざまな プログラミング ニーズに対応します。上記のメソッドは Cowboy.Sockets に完全に実装されており、APM メソッドと TAP メソッドは実際のプロジェクトに適用されています。 Cowboy.Sockets はまだ進化し、改善されています。問題がある場合は、すぐに修正してください。

非常に多くの実装メソッドがありますが、抽象的に見るとそれらは同じであり、以下の図に示すように、

Accept LoopRead Loop の 2 つのループで説明できます。 (ここで言う「Loop」は、具体的に while/for やその他のキーワードではなく、 ループ メソッド を指します。)

  • TCP サーバー実装には、クライアントから接続要求を受信して​​ TCP 接続を確立するための Accept Socket Loop。

  • TCP サーバーの実装では、クライアントによって書き込まれたデータを受信するための読み取りソケット ループが必要です。

Accept ループがブロックされている場合、接続はすぐに確立できず、サーバーの P

ending バックログがいっぱいになり、クライアントが Connect Timeout 例外を受け取ることになります。読み取りループがブロックされると、明らかにクライアントからデータを時間内に受信できなくなり、クライアントの送信バッファがいっぱいになり、データを送信できなくなります。

実装の詳細の観点から見ると、サービスのブロックを引き起こす可能性がある場所は次のとおりです。

  1. 新しいソケットへの受け入れ。新しい接続の構築にはさまざまなリソースの割り当てが必要ですが、リソースの割り当てには時間がかかります。新しいソケット Socket、次の Accept が時間内にトリガーされません。

  2. 新しいバッファに読み取り、ペイロード メッセージの長さを決定します。決定プロセスは長いです。

  3. 新しいバッファに読み取り、ペイロードが検出されます。収集されていないため、読み続けてください。その後、「バッファ

    コピー

    が発生する可能性があります。
  4. ペイロードが受信され、認識可能なプロトコル メッセージに逆シリアル化され、逆シリアル化が遅くなります。

  5. は」対応する Protocol Message を処理するために Business

    Module

    によって処理されます。処理プロセスは低速です。
  6. 1-2 には Accept プロセスと Connection 確立プロセスが含まれ、3-4 には ReceiveBuffer 処理プロセスが含まれます。アプリケーションロジック側の実装。

    Java の有名な Netty ネットワーク ライブラリは、バージョン 4.0 以降、バッファ部分で新しい試みを行っています。ByteBuf と呼ばれる設計を採用して、バッファ ゼロ コピーを実装し、同時実行性が高い条件下でバッファ コピーによって引き起こされるパフォーマンスの損失と GC プレッシャーを軽減します。 。 DotNetty、Orleans、Helios などのプロジェクトは、同様の ByteBuf を C# で実装しようとしています。
APM メソッド: TcpSocketServer

TcpSocketServer の実装は、.NET Framework に付属する TcpListener と TcpClient のさらなるカプセル化に基づいており、APM ベースの BeginXXX および EndXXX

インターフェイス

を使用して実装されます。

TcpSocketServer の Accept ループは、

BeginAccept -> EndAccept -> BeginAccept -> を参照します。 Session

を処理するため、TcpSocketSession には読み取りループが含まれます。
  • BeginRead -> EndRead -> BeginRead -> ...

TcpSocketServer は、イベントを公開することで接続の確立と切断、およびデータ受信の通知を実装します。


  event EventHandler<TcpClientConnectedEventArgs> ClientConnected;  event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;  event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;

もシンプルで使いやすく、イベントの通知を直接購読するだけです。


  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected += server_ClientConnected;
      _server.ClientDisconnected += server_ClientDisconnected;
      _server.ClientDataReceived += server_ClientDataReceived;
      _server.Listen();
  }  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }
TAP メソッド: AsyncTcpSocketServer

AsyncTcpSocketServer の実装は、.NET Framework に付属する TcpListener と TcpClient のさらなるカプセル化に基づいており、TAP に基づく async/await の XXXAsync インターフェイスを使用して実装されます。

ただし、実際には、XXXAsync は魔法のような効果を生み出すわけではなく、APM メソッドを TAP 呼び出しメソッドに変換するだけです。


  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]  public Task<Socket> AcceptSocketAsync()
  {      return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]  public Task<TcpClient> AcceptTcpClientAsync()
  {      return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }
AsyncTcpSocketServer の Accept ループは、


  while (IsListening)
  {      var tcpClient = await _listener.AcceptTcpClientAsync();
  }

正常に確立されたすべての Connection が AsyncTcpSocketSession によって処理されるため、 Async TcpSocketSession には読み取りループが含まれます


  while (State == TcpSocketConnectionState.Connected)
  {      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }
async/awaitをあくまで非同期にするために、AsyncTcpSocketServerで公開するインターフェースもAwaitableにしています。


  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }
使用する場合は、このインターフェースを実装する

オブジェクトをAsyncTcpSocketServerのコンストラクターに注入するだけです。


  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }
もちろん、インターフェースの実装は必須ではなく、メソッドの実装をコンストラクターに直接注入することもできます。


  public AsyncTcpSocketServer(
      IPEndPoint listenedEndPoint,
      Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
      Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
      Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
      AsyncTcpSocketServerConfiguration configuration = null)
  {}
SAEAメソッド: TcpSocketSaeaServer

SAEAはSocketAsyncEventArgsの略称です。 SocketAsyncEventArgs は、.NET Framework 3.5 以降の高性能ソケット通信をサポートする実装です。 APM メソッドと比較した SocketAsyncEventArgs の主な利点は、次のように説明できます。

これらの機能強化の

主な特徴は、大量の非同期ソケット I 中に繰り返される すべての位置とオブジェクトの同期を回避することです。 /O. Begin/End デザイン パターンは、現在、非同期ソケット I/O 用の Socket クラスによって実装されており、それぞれの 非同期ソケット操作に System.IAsyncResult オブジェクトを割り当てる必要があります利点は、各呼び出しでネイティブ ソケットに近い IAsyncResult などのオブジェクトを生成する必要がないことです。 SocketAsyncEventArgs を使用するための推奨手順は次のとおりです:

新しい SocketAsyncEventArgs コンテキスト オブジェクトを割り当てるか、アプリケーション プールから空きオブジェクトを取得します。

  1. 実行しようとしている操作にコンテキスト オブジェクトのプロパティを設定します。 (コールバックデリゲートメソッドとデータバッファーなど)。

  2. 適切なソケットメソッド (xxxAsync) を呼び出して、非同期操作を開始します。

  3. If

    非同期ソケットメソッド (xxxAsync) がコールバックで true を返した場合、完了ステータスについてコンテキスト プロパティを照会します。
  4. 非同期ソケット メソッド (xxxAsync) がコールバックで false を返した場合、操作は同期的に完了し、操作結果についてコンテキスト プロパティを照会できます。

  5. コンテキストを再利用します。別の操作の場合は、プールに戻すか、破棄してください。

  6. プーリングの目的は、再利用し、ランタイム割り当てとガベージ コレクションの負荷を軽減することです。

    TcpSocketSaeaServer は、SocketAsyncEventArgs のアプリケーションおよびカプセル化であり、プーリング テクノロジーを実装しています。 TcpSocketSaeaServer の焦点は SaeaAwaitable クラスであり、SaeaAwaitable には組み込みの SocketAsyncEventArgs があり、非同期/待機操作をサポートするために GetAwaiter を通じて SaeaAwaiter を返します。同時に、SocketAsyncEventArgs の Awaitable 実装は、SaeaExtensions 拡張メソッド ペアを通じて拡張されます。

  public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)

SaeaPool は QueuedObjectPool の派生実装であり、SaeaAwaitable インスタンスをプールするために使用されます。同時に、TcpSocketSaeaSession の構築プロセスを削減するために、SessionPool、つまり QueuedObjectPool も実装されます。

TcpSocketSaeaServer 中的 Accept Loop 指的就是,


  while (IsListening)
  {      var saea = _acceptSaeaPool.Take();  
      var socketError = await _listener.AcceptAsync(saea);      if (socketError == SocketError.Success)
      {          var acceptedSocket = saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }

每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,


  var saea = _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);  
  while (State == TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(0, _receiveBuffer.Length);  
      var socketError = await _socket.ReceiveAsync(saea);      if (socketError != SocketError.Success)          break;  
      var receiveCount = saea.Saea.BytesTransferred;      if (receiveCount == 0)          break;
  }

同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。


  public interface ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }

使用起来也是简单直接:


  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {      public async Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

RIO 方式:TcpSocketRioServer

从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue

  • RIOCreateCompletionQueue

  • RIOCreateRequestQueue

  • RIODequeueCompletion

  • RIODeregisterBuffer

  • RIONotify

  • RIOReceive

  • RIOReceiveEx

  • RIORegisterBuffer

  • RIOResizeCompletionQueue

  • RIOResizeRequestQueue

  • RIOSend

  • RIOSendEx

到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。

Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。

同样,通过 TcpSocketRioServer 来实现 Accept Loop,


_listener.OnAccepted = (acceptedSocket) =>{
    Task.Run(async () =>
    {        await Process(acceptedSocket);
    })
    .Forget();
};

通过 TcpSocketRioSession 来处理 Read Loop,


  while (State == TcpSocketConnectionState.Connected)
  {      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);      if (receiveCount == 0)          break;
  }

测试代码一如既往的类似:


  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {      public async Task OnSessionStarted(TcpSocketRioSession session)
      {          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          Console.WriteLine(string.Format("TCP session has connected {0}.", session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.Write(string.Format("Client : --> "));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

参考资料

  • Asynchronous Programming Model (APM)

  • Task-based Asynchronous Pattern (TAP)

  • Event-based Asynchronous Pattern (EAP)

  • SocketAsyncEventArgs

  • Registered I/O

  • Netty: Reference counted objects

  • Socket Performance Enhancements in Version 3.5

  • What's New for Windows Sockets for Windows 8.1 and Windows Server 2012 R2

  • RIO_EXTENSION_FUNCTION_TABLE structure

  • Windows 8 Registered I/O Networking Extensions

本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。

以上がC# での高性能 TCP サービスの複数の実装の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。