>백엔드 개발 >C#.Net 튜토리얼 >C#에서 고성능 TCP 서비스의 다중 구현

C#에서 고성능 TCP 서비스의 다중 구현

PHPz
PHPz원래의
2017-03-12 16:47:332623검색

이 문서의 주요 목적은 .NET/C#을 사용하여 다음을 포함하되 이에 국한되지 않는 다양한 방식으로 TCP 고성능 서비스를 구현하는 것입니다.

  • APM 방식, 즉 Asynchronous 프로그래밍 Model

  • TAP 방식, 즉, 작업 기반 비동기 패턴

  • SAE메서드, 즉 SocketAsyncEventArgs

  • RIO 메서드, 즉 Registered I/O

.NET/C#의 소켓 지원은 Windows I/O 완료 포트를 기반으로 포트 기술 캡슐화를 완성하고 다양한 Non-Blocking 캡슐화 구조를 사용하여 다양한 🎜>프로그래밍 이 필요합니다. 위 메소드들은 Cowboy.Sockets에서 완벽하게 구현되었으며, 실제 프로젝트에는 APM과 TAP 메소드가 적용

되었습니다. Cowboy.Sockets는 여전히 발전하고 개선되고 있습니다. 문제가 있으면 제때에 수정해 주세요.

구현 방법은 너무 많지만 추상적인 관점에서는 동일합니다. Accept LoopRead Loop 두 개의 루프로 설명할 수 있습니다. 아래 그림에 나와 있습니다. (여기서 언급된 "루프"는 루프 메서드를 의미하며, 구체적으로 while/for

를 의미하는 것은 아니며, 기타 키워드)

  • 모든 TCP 서버 구현에는 TCP를 설정하기 위해 클라이언트로부터 연결 요청을 수신하기 위한 Accept Socket 루프가 있어야 합니다. 연결.
  • 모든 TCP 서버 구현에는 클라이언트가 작성한 데이터를 수신하기 위한 읽기 소켓 루프가 있어야 합니다.

Accept 루프가 차단되면 연결이 빠르게 설정되지 않고 서버 P종료ing 백로그가 가득 차서 클라이언트가 Connect Time

예외를 받습니다. 읽기 루프가 차단되면 클라이언트로부터 데이터를 제때에 수신할 수 없게 되어 클라이언트의 전송 버퍼가 가득 차서 더 이상 데이터를 보낼 수 없게 됩니다.

구현 세부 사항의 관점에서 서비스 차단을 일으킬 수 있는 위치는 다음과 같습니다.
  1. 새 소켓을 수락하고 새 연결을 구축하려면 다양한 리소스를 할당해야 합니다. . 리소스 할당이 느립니다.
  2. 새 소켓을 수락하고 다음 수락이 제때에 트리거되지 않습니다.
  3. 새 버퍼를 읽습니다. 페이로드 메시지 길이를 결정하는 데 시간이 오래 걸립니다.
  4. 새 버퍼를 읽고 계속 읽으면 페이로드가 아직 수집되지 않았음을 알 수 있습니다. may"는 버퍼 복사

    ;
  5. 페이로드가 수신된 후 De-Serialization이 수행되어 인식 가능한 프로토콜 메시지로 변환됩니다. deserialization은 느립니다.
  6. 비즈니스 모듈

    에 따라 해당 프로토콜 메시지를 처리하는 데 처리 속도가 느립니다.

1-2개 Accept 프로세스와 Connection 설정 프로세스, 3-4는 ReceiverBuffer의 처리 프로세스를 포함하고, 5-6은 애플리케이션 로직 측면의 구현을 포함합니다.

Java의 유명한 Netty 네트워크 라이브러리는 버전 4.0부터 Buffer 부분에 새로운 시도를 했으며, 높은 동시성 조건에서 Buffer Copy로 인한 성능을 줄이기 위해 ByteBuf라는 설계를 채택하여 Buffer Zero Copy를 구현했습니다. 손실 및 GC 압력. DotNetty, Orleans 및 Helios와 같은 프로젝트는 C#에서 유사한 ByteBuf를 구현하려고 시도하고 있습니다.

APM 방법: TcpSocketServer

TcpSocketServer의 구현은 APM 기반 BeginXXX 및 EndXXX 인터페이스를 사용하여 .NET Framework와 함께 제공되는 TcpListener 및 TcpClient의 추가 캡슐화를 기반으로 합니다.

성취하세요.

TcpSocketServer의 Accept 루프는
  • BeginAccept -> EndAccept -> 🎜>

성공적으로 설정된 각 연결은 TcpSocketSession에 의해 처리되므로 TcpSocketSession에는 Read Loop,

이 포함됩니다.
  • BeginRead -> EndRead -> BeginRead -> ...

TcpSocketServer는 이벤트를 노출하여 구현됩니다. 연결 설정 및 연결 끊김 및 데이터 수신 알림.


  event EventHandler<TcpClientConnectedEventArgs> ClientConnected;  event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;  event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;

도 간단하고 사용하기 쉽습니다. 이벤트 알림을 직접 구독하기만 하면 됩니다.


  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected += server_ClientConnected;
      _server.ClientDisconnected += server_ClientDisconnected;
      _server.ClientDataReceived += server_ClientDataReceived;
      _server.Listen();
  }  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }

TAP 방법: AsyncTcpSocketServer

AsyncTcpSocketServer의 구현은 .NET Framework와 함께 제공되는 TcpListener 및 TcpClient의 추가 캡슐화를 기반으로 합니다. , TAP의 async/await를 기반으로 하는 XXXAsync 인터페이스 구현을 사용합니다.

그러나 실제로 XXXAsync는 내부 구현이 APM 메서드를 TAP 호출 메서드로 변환하는 마법 같은 효과를 생성하지 않습니다.


  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]  public Task<Socket> AcceptSocketAsync()
  {      return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]  public Task<TcpClient> AcceptTcpClientAsync()
  {      return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }

AsyncTcpSocketServer의 Accept 루프는


  while (IsListening)
  {      var tcpClient = await _listener.AcceptTcpClientAsync();
  }

성공적으로 설정된 모든 연결이 처리됩니다. by AsyncTcpSocketSession이므로 AsyncTcpSocketSession에는 읽기 루프가 포함됩니다.


  while (State == TcpSocketConnectionState.Connected)
  {      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }

async/await를 끝까지 비동기화하기 위해 AsyncTcpSocketServer에서 노출하는 인터페이스도 Awaitable입니다.


  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }

사용 시 이 인터페이스를 구현하는 객체를 AsyncTcpSocketServer의 생성자에 삽입하기만 하면 됩니다.


  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

물론 인터페이스 구현이 필수는 아니며, 메소드 구현을 생성자에 직접 주입할 수도 있습니다.


  public AsyncTcpSocketServer(
      IPEndPoint listenedEndPoint,
      Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
      Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
      Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
      AsyncTcpSocketServerConfiguration configuration = null)
  {}

SAEA 메서드: TcpSocketSaeaServer

SAEA는 SocketAsyncEventArgs의 약어입니다. SocketAsyncEventArgs는 .NET Framework 3.5부터 고성능 소켓 통신을 지원하는 구현입니다. APM 방법과 비교하여 SocketAsyncEventArgs의 주요 장점은 다음과 같이 설명할 수 있습니다.

이러한 개선 사항의 주요 기능은 반복되는 모든 것을 방지하는 것입니다. 대용량 비동기 소켓 I/O 중 객체 위치 및 동기화. 비동기 소켓 I/O용 소켓 클래스에서 현재구현된 시작/끝 디자인 패턴이 필요합니다. 비동기 소켓 작업에는 System.IAsyncResult 개체가 할당됩니다.

즉, 비동기 소켓 작업을 위해 IAsyncResult 및 기타 개체를 생성할 필요가 없다는 장점이 있습니다. 각 호출은 기본 Socket Some에 더 가깝습니다.

SocketAsyncEventArgs 사용에 권장되는 단계는 다음과 같습니다.

  1. 새 SocketAsyncEventArgs 컨텍스트 개체를 할당하거나 애플리케이션 풀에서 무료 개체를 가져옵니다.

  2. 컨텍스트 개체의 속성을 수행할 작업으로 설정합니다(예: 콜백 대리자 메서드 및 데이터 버퍼).

  3. 적절한 소켓 메서드를 호출합니다. (xxxAsync)를 사용하여 비동기 작업을 시작합니다.

  4. 비동기 소켓 메서드(xxxAsync)가 콜백에서 true를 반환하는 경우 컨텍스트 속성에서 완료 상태를 쿼리합니다.

  5. 콜백에서 비동기 소켓 메서드(xxxAsync)가 false를 반환하는 경우 작업이 동기적으로 완료되어 작업 결과를 쿼리할 수 있습니다.

  6. 다른 작업을 위해 컨텍스트를 재사용하거나 풀에 다시 넣거나 폐기합니다.

풀링의 목적은 재사용하고 런타임 할당을 줄이는 것입니다. 그리고 쓰레기 수거 압력.

TcpSocketSaeaServer는 SocketAsyncEventArgs의 애플리케이션이자 캡슐화이며 풀링 기술을 구현합니다. TcpSocketSaeaServer의 초점은 SaeaAwaitable 클래스입니다. SaeaAwaitable에는 내장 SocketAsyncEventArgs가 있으며 GetAwaiter를 통해 SaeaAwaiter를 반환하여 비동기/대기 작업을 지원합니다. 동시에 SocketAsyncEventArgs의 Awaitable 구현은 SaeaExtensions 확장 메서드 쌍을 통해 확장됩니다.


  public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable)  public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)

SaeaPool은 SaeaAwaitable 인스턴스를 풀링하는 데 사용되는 QueuedObjectPool의 파생 구현입니다. 동시에 TcpSocketSaeaSession의 구성 과정을 줄이기 위해 SessionPool, 즉 QueuedObjectPool도 구현된다.

TcpSocketSaeaServer 中的 Accept Loop 指的就是,


  while (IsListening)
  {      var saea = _acceptSaeaPool.Take();  
      var socketError = await _listener.AcceptAsync(saea);      if (socketError == SocketError.Success)
      {          var acceptedSocket = saea.Saea.AcceptSocket;
      }
  
      _acceptSaeaPool.Return(saea);
  }

每一个建立成功的 Connection 由 TcpSocketSaeaSession 来处理,所以 TcpSocketSaeaSession 中会包含 Read Loop,


  var saea = _saeaPool.Take();
  saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);  
  while (State == TcpSocketConnectionState.Connected)
  {
      saea.Saea.SetBuffer(0, _receiveBuffer.Length);  
      var socketError = await _socket.ReceiveAsync(saea);      if (socketError != SocketError.Success)          break;  
      var receiveCount = saea.Saea.BytesTransferred;      if (receiveCount == 0)          break;
  }

同样,TcpSocketSaeaServer 对外所暴露的接口也同样是 Awaitable 的。


  public interface ITcpSocketSaeaServerMessageDispatcher
  {
      Task OnSessionStarted(TcpSocketSaeaSession session);
      Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(TcpSocketSaeaSession session);
  }

使用起来也是简单直接:


  public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher
  {      public async Task OnSessionStarted(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(TcpSocketSaeaSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

RIO 方式:TcpSocketRioServer

从 Windows 8.1 / Windows Server 2012 R2 开始,微软推出了 Registered I/O Networking Extensions 来支持高性能 Socket 服务的实现,简称 RIO。

The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.

  • RIOCloseCompletionQueue

  • RIOCreateCompletionQueue

  • RIOCreateRequestQueue

  • RIODequeueCompletion

  • RIODeregisterBuffer

  • RIONotify

  • RIOReceive

  • RIOReceiveEx

  • RIORegisterBuffer

  • RIOResizeCompletionQueue

  • RIOResizeRequestQueue

  • RIOSend

  • RIOSendEx

到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过 P/Invoke 方式,RioSharp 是开源项目中的一个比较完整的实现。

Cowboy.Sockets 直接引用了 RioSharp 的源代码,放置在 Cowboy.Sockets.Experimental 名空间下,以供实验和测试使用。

同样,通过 TcpSocketRioServer 来实现 Accept Loop,


_listener.OnAccepted = (acceptedSocket) =>{
    Task.Run(async () =>
    {        await Process(acceptedSocket);
    })
    .Forget();
};

通过 TcpSocketRioSession 来处理 Read Loop,


  while (State == TcpSocketConnectionState.Connected)
  {      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);      if (receiveCount == 0)          break;
  }

测试代码一如既往的类似:


  public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher
  {      public async Task OnSessionStarted(TcpSocketRioSession session)
      {          //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          Console.WriteLine(string.Format("TCP session has connected {0}.", session));          await Task.CompletedTask;
      }  
      public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
      {          var text = Encoding.UTF8.GetString(data, offset, count);          //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.Write(string.Format("Client : --> "));
          Console.WriteLine(string.Format("{0}", text));  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }  
      public async Task OnSessionClosed(TcpSocketRioSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));          await Task.CompletedTask;
      }
  }

参考资料

  • Asynchronous Programming Model (APM)

  • Task-based Asynchronous Pattern (TAP)

  • Event-based Asynchronous Pattern (EAP)

  • SocketAsyncEventArgs

  • Registered I/O

  • Netty: Reference counted objects

  • Socket Performance Enhancements in Version 3.5

  • What's New for Windows Sockets for Windows 8.1 and Windows Server 2012 R2

  • RIO_EXTENSION_FUNCTION_TABLE structure

  • Windows 8 Registered I/O Networking Extensions

本篇文章《C#高性能TCP服务的多种实现方式》由 Dennis Gao 发表自博客园个人博客,未经作者本人同意禁止以任何的形式转载,任何自动的或人为的爬虫转载行为均为耍流氓。

위 내용은 C#에서 고성능 TCP 서비스의 다중 구현의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.