ホームページ >php教程 >php手册 >C# の高パフォーマンス TCP サービスのさまざまな実装方法

C# の高パフォーマンス TCP サービスのさまざまな実装方法

WBOY
WBOYオリジナル
2016-07-06 13:30:321132ブラウズ

ねえ~~ ほとんどの庭師は、注目を集めるためには「高性能」という言葉にもっと興味を持つべきだと思います。実際、私が好むタイトルは「Monkey Sai Lei、TCP を書くための C# のコツ」です。サービス」

ねえ〜〜、ほとんどの庭師は「高性能」という言葉にもっと興味を持つべきだと思います。注目を集めるためには、タイトルが目立つ必要があります。実際、私が好むタイトルは「Monkey Sai Lei」という書き方です。 「C#」の姿勢での TCP サービス! 》

この記事の主な目的は、.NET/C# を使用して TCP 高パフォーマンス サービスを実装するためのさまざまな方法です。これには次のものが含まれますが、これらに限定されません。

    APM手法、すなわち非同期PRプログラミングモデル
  • TAP方式、すなわちタスクベースの非同期パターン
  • SAEA メソッド、つまり SocketAsyncEventArgs
  • RIO メソッド、つまり Registered I/O
.NET/C# のソケット サポートは、Windows I/O 完了ポートに基づいてポート テクノロジのカプセル化を完了し、さまざまなプログラミング ニーズを満たすためにさまざまなノンブロッキング カプセル化構造を使用します。上記のメソッドは Cowboy.Sockets に完全に実装されており、APM メソッドと TAP メソッドは実際のプロジェクトに適用されています。 Cowboy.Sockets はまだ進化し、改善されています。問題がある場合は、すぐに修正してください。

実装メソッドは非常に多くありますが、抽象的には同じであり、以下の図に示すように、

Accept ループRead ループ の 2 つのループで説明できます。 (ここで言う「ループ」はループメソッドを指し、特にwhile/forなどのキーワードを指すものではありません。)

    TCP サーバーの実装には、クライアントの接続リクエストを受信して​​ TCP 接続を確立するために使用される Accept Socket Loop が必要です。
  • TCP サーバーの実装には、クライアントによって書き込まれたデータを受信するための読み取りソケット ループが必要です。
Accept ループがブロックされている場合、接続はすぐに確立できず、サーバーの保留中のバックログがいっぱいになり、クライアントは接続タイムアウト例外を受け取ります。読み取りループがブロックされると、明らかにクライアントからデータを時間内に受信できなくなり、クライアントの送信バッファがいっぱいになり、データを送信できなくなります。

実装の詳細の観点から、サービスのブロックを引き起こす可能性のある場所は次のとおりです:

    新しいソケットを受け入れて新しい接続を構築するには、さまざまなリソースを割り当てる必要があり、リソースの割り当てが遅くなります。
  1. 新しいソケットに受け入れますが、次の受け入れが時間内にトリガーされません
  2. 新しいバッファを読み取り、ペイロード メッセージの長さを決定します。
  3. 決定プロセスは長いです。
  4. 新しいバッファに読み込むと、ペイロードがまだ収集されていないことがわかります。読み込みを続けると、バッファ コピーが発生する可能性があります。
  5. ペイロードを受信した後、シリアル化を実行し、認識可能なプロトコル メッセージに変換します。
  6. シリアル化は遅いです。
  7. 対応するプロトコル メッセージはビジネス モジュールによって処理され、処理プロセスが遅くなります。
  8. 1-2はAccept処理とConnection確立処理、3-4はReceiveBuffer処理処理、5-6はアプリケーションロジック側の実装です。
  9. Java の有名な Netty ネットワーク ライブラリは、バージョン 4.0 以降、バッファ部分で新しい試みを行い、ByteBuf と呼ばれる設計を採用してバッファ ゼロ コピーを実装し、高い同時実行条件下でのバッファ コピーによって引き起こされるパフォーマンスの損失と GC プレッシャーを軽減しました。 DotNetty、Orleans、Helios などのプロジェクトは、同様の ByteBuf を C# で実装しようとしています。

APMメソッド: TcpSocketServer

TcpSocketServer の実装は、.NET Framework に付属する TcpListener と TcpClient のさらなるカプセル化に基づいており、APM ベースの BeginXXX および EndXXX インターフェイスを使用して実装されます。

TcpSocketServer の Accept ループは、

を参照します。

開始受け入れ ->終了受け入れ ->

正常に確立された各接続は TcpSocketSession によって処理されるため、TcpSocketSession には読み取りループが含まれます。
  • 読み取り開始 ->読み取り終了 ->
TcpSocketServer は、Event を公開することで、接続の確立と切断、およびデータ受信の通知を実装します。

リーリー

    使い方もシンプルで簡単で、イベント通知を直接購読するだけです。
  • リーリー
TAPメソッド: AsyncTcpSocketServer

AsyncTcpSocketServer の実装は、.NET Framework に付属する TcpListener と TcpClient のさらなるカプセル化に基づいており、TAP に基づく async/await の XXXAsync インターフェイスを使用して実装されます。

ただし、実際には、XXXAsync は魔法のような効果を生み出すわけではなく、その内部実装は APM メソッドを TAP 呼び出しメソッドに変換するだけです。 リーリー

AsyncTcpSocketServer の Accept ループは、

を参照します。
  <span style="color: #0000ff;">while</span><span style="color: #000000;"> (IsListening)
  {
      </span><span style="color: #0000ff;">var</span> tcpClient = <span style="color: #0000ff;">await</span><span style="color: #000000;"> _listener.AcceptTcpClientAsync();
  }</span>

每一个建立成功的 Connection 由 AsyncTcpSocketSession 来处理,所以 AsyncTcpSocketSession 中会包含 Read Loop,

  <span style="color: #0000ff;">while</span> (State ==<span style="color: #000000;"> TcpSocketConnectionState.Connected)
  {
      </span><span style="color: #0000ff;">int</span> receiveCount = <span style="color: #0000ff;">await</span> _stream.ReadAsync(_receiveBuffer, <span style="color: #800080;">0</span><span style="color: #000000;">, _receiveBuffer.Length);
  }</span>

为了将 async/await 异步到底,AsyncTcpSocketServer 所暴露的接口也同样是 Awaitable 的。

  <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">interface</span><span style="color: #000000;"> IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, </span><span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }</span>

使用时仅需将一个实现了该接口的对象注入到 AsyncTcpSocketServer 的构造函数中即可。

  <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {
      </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has connected {1}.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session.RemoteEndPoint, session));
          </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask;
      }
  
      </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span> Task OnSessionDataReceived(AsyncTcpSocketSession session, <span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count)
      {
          </span><span style="color: #0000ff;">var</span> text =<span style="color: #000000;"> Encoding.UTF8.GetString(data, offset, count);
          Console.Write(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">Client : {0} --> </span><span style="color: #800000;">"</span><span style="color: #000000;">, session.RemoteEndPoint));
          Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">{0}</span><span style="color: #800000;">"</span><span style="color: #000000;">, text));
  
          </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has disconnected.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session));
          </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask;
      }
  }</span>

当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。

  <span style="color: #0000ff;">public</span><span style="color: #000000;"> AsyncTcpSocketServer(
      ipEndPoint listenedEndPoint,
      Func</span><AsyncTcpSocketSession, <span style="color: #0000ff;">byte</span>[], <span style="color: #0000ff;">int</span>, <span style="color: #0000ff;">int</span>, Task> onSessionDataReceived = <span style="color: #0000ff;">null</span><span style="color: #000000;">,
      Func</span><AsyncTcpSocketSession, Task> onSessionStarted = <span style="color: #0000ff;">null</span><span style="color: #000000;">,
      Func</span><AsyncTcpSocketSession, Task> onSessionClosed = <span style="color: #0000ff;">null</span><span style="color: #000000;">,
      AsyncTcpSocketServerConfiguration configuration </span>= <span style="color: #0000ff;">null</span><span style="color: #000000;">)
  {}</span>

SAEA 方式:TcpSocketSaeaServer

SAEA 是 SocketAsyncEventArgs 的简写。SocketAsyncEventArgs 是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。SocketAsyncEventArgs 相比于 APM 方式的主要优点可以描述如下:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket Operation.

也就是说,优点就是无需为每次调用都生成 IAsyncResult 等对象,向原生 Socket 更靠近一些。

使用 SocketAsyncEventArgs 的推荐步骤如下:

  1. Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
  2. Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).
  3. Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.
  4. If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.
  5. If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.
  6. Reuse the context for another operation, put it back in the pool, or discard it.

重点在于池化(Pooling),池化的目的就是为了重用和减少运行时分配和垃圾回收的压力。

TcpSocketSaeaServer 即是对 SocketAsyncEventArgs 的应用和封装,并实现了 Pooling 技术。TcpSocketSaeaServer 中的重点是 SaeaAwaitable 类,SaeaAwaitable 中内置了一个 SocketAsyncEventArgs,并通过 GetAwaiter 返回 SaeaAwaiter 来支持 async/await 操作。同时,通过 SaeaExtensions 扩展方法对来扩展 SocketAsyncEventArgs 的 Awaitable 实现。

  <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable AcceptAsync(<span style="color: #0000ff;">this</span><span style="color: #000000;"> Socket socket, SaeaAwaitable awaitable)
  </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable ConnectAsync(<span style="color: #0000ff;">this</span><span style="color: #000000;"> Socket socket, SaeaAwaitable awaitable)
  </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable DisonnectAsync(<span style="color: #0000ff;">this</span><span style="color: #000000;"> Socket socket, SaeaAwaitable awaitable)
  </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable ReceiveAsync(<span style="color: #0000ff;">this</span><span style="color: #000000;"> Socket socket, SaeaAwaitable awaitable)
  </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> SaeaAwaitable SendAsync(<span style="color: #0000ff;">this</span> Socket socket, SaeaAwaitable awaitable)

SaeaPool 则是一个 QueuedObjectPool 的衍生实现,用于池化 SaeaAwaitable 实例。同时,为了减少 TcpSocketSaeaSession 的构建过程,也实现了 SessionPool 即 QueuedObjectPool

TcpSocketSaeaServer 中的 Accept Loop 指的就是,

  <span style="color: #0000ff;">while</span><span style="color: #000000;"> (IsListening)
  {
      </span><span style="color: #0000ff;">var</span> saea =<span style="color: #000000;"> _acceptSaeaPool.Take();
  
      </span><span style="color: #0000ff;">var</span> socketError = <span style="color: #0000ff;">await</span><span style="color: #000000;"> _listener.AcceptAsync(saea);
      </span><span style="color: #0000ff;">if</span> (socketError ==<span style="color: #000000;"> SocketError.Success)
      {
          </span><span style="color: #0000ff;">var</span> acceptedSocket =<span style="color: #000000;"> saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }</span>

每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,

  <span style="color: #0000ff;">var</span> saea =<span style="color: #000000;"> _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, </span><span style="color: #800080;">0</span><span style="color: #000000;">, _receiveBuffer.Length);
  
  </span><span style="color: #0000ff;">while</span> (State ==<span style="color: #000000;"> TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(</span><span style="color: #800080;">0</span><span style="color: #000000;">, _receiveBuffer.Length);
  
      </span><span style="color: #0000ff;">var</span> socketError = <span style="color: #0000ff;">await</span><span style="color: #000000;"> _socket.ReceiveAsync(saea);
      </span><span style="color: #0000ff;">if</span> (socketError !=<span style="color: #000000;"> SocketError.Success)
          </span><span style="color: #0000ff;">break</span><span style="color: #000000;">;
  
      </span><span style="color: #0000ff;">var</span> receiveCount =<span style="color: #000000;"> saea.Saea.BytesTransferred;
      </span><span style="color: #0000ff;">if</span> (receiveCount == <span style="color: #800080;">0</span><span style="color: #000000;">)
          </span><span style="color: #0000ff;">break</span><span style="color: #000000;">;
  }</span>

同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。

  <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">interface</span><span style="color: #000000;"> ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, </span><span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }</span>

使用起来也是简单直接:

  <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {
      </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has connected {1}.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session.RemoteEndPoint, session));
          </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask;
      }
  
      </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span> Task OnSessionDataReceived(TcpSocketSaeaSession session, <span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count)
      {
          </span><span style="color: #0000ff;">var</span> text =<span style="color: #000000;"> Encoding.UTF8.GetString(data, offset, count);
          Console.Write(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">Client : {0} --> </span><span style="color: #800000;">"</span><span style="color: #000000;">, session.RemoteEndPoint));
          Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">{0}</span><span style="color: #800000;">"</span><span style="color: #000000;">, text));
  
          </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has disconnected.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session));
          </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask;
      }
  }</span>

RIO 方式:TcpSocketRioServer

从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue
  • RIOCreateCompletionQueue
  • RIOCreateRequestQueue
  • RIODequeueCompletion
  • RIODeregisterBuffer
  • RIONotify
  • RIOReceive
  • RIOReceiveEx
  • RIORegisterBuffer
  • RIOResizeCompletionQueue
  • RIOResizeRequestQueue
  • RIOSend
  • RIOSendEx

到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。

Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。

同样,通过 TcpSocketRioServer 来实现 Accept Loop,

_listener.OnAccepted = (acceptedSocket) =><span style="color: #000000;">
{
    Task.Run(</span><span style="color: #0000ff;">async</span> () =><span style="color: #000000;">
    {
        </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Process(acceptedSocket);
    })
    .Forget();
};</span>

通过 TcpSocketRioSession 来处理 Read Loop,

  <span style="color: #0000ff;">while</span> (State ==<span style="color: #000000;"> TcpSocketConnectionState.Connected)
  {
      </span><span style="color: #0000ff;">int</span> receiveCount = <span style="color: #0000ff;">await</span> _stream.ReadAsync(_receiveBuffer, <span style="color: #800080;">0</span><span style="color: #000000;">, _receiveBuffer.Length);
      </span><span style="color: #0000ff;">if</span> (receiveCount == <span style="color: #800080;">0</span><span style="color: #000000;">)
          </span><span style="color: #0000ff;">break</span><span style="color: #000000;">;
  }</span>

测试代码一如既往的类似:

  <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {
      </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionStarted(TcpSocketRioSession session)
      {
          </span><span style="color: #008000;">//</span><span style="color: #008000;">Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));</span>
          Console.WriteLine(<span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session has connected {0}.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session));
          </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask;
      }
  
      </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span> Task OnSessionDataReceived(TcpSocketRioSession session, <span style="color: #0000ff;">byte</span>[] data, <span style="color: #0000ff;">int</span> offset, <span style="color: #0000ff;">int</span><span style="color: #000000;"> count)
      {
          </span><span style="color: #0000ff;">var</span> text =<span style="color: #000000;"> Encoding.UTF8.GetString(data, offset, count);
          </span><span style="color: #008000;">//</span><span style="color: #008000;">Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));</span>
          Console.Write(<span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">Client : --> </span><span style="color: #800000;">"</span><span style="color: #000000;">));
          Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">{0}</span><span style="color: #800000;">"</span><span style="color: #000000;">, text));
  
          </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">async</span><span style="color: #000000;"> Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(</span><span style="color: #0000ff;">string</span>.Format(<span style="color: #800000;">"</span><span style="color: #800000;">TCP session {0} has disconnected.</span><span style="color: #800000;">"</span><span style="color: #000000;">, session));
          </span><span style="color: #0000ff;">await</span><span style="color: #000000;"> Task.CompletedTask;
      }
  }</span>

参考资料

  • Asynchronous Programming Model (APM)
  • Task-based Asynchronous Pattern (TAP)
  • Event-based Asynchronous Pattern (EAP)
  • SocketAsyncEventArgs
  • Registered I/O
  • Netty: Reference counted objects
  • Socket Performance Enhancements in Version 3.5
  • What's New for Windows Sockets for Windows 8.1 and Windows Server 2012 R2
  • RIO_EXTENSION_FUNCTION_TABLE structure
  • Windows 8 Registered I/O Networking Extensions

本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。


声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。