Heim  >  Artikel  >  Backend-Entwicklung  >  Beispiel für die Einführung eines Socket-Übertragungs-Protobuf-Byte-Streams

Beispiel für die Einführung eines Socket-Übertragungs-Protobuf-Byte-Streams

零下一度
零下一度Original
2017-06-29 14:31:191949Durchsuche

Im vorherigen Artikel ging es hauptsächlich um die Verwendung von Multithreads zum Senden und Empfangen von Nachrichten.

//创建消息数据模型  2
//正式项目中,消息的结构一般是消息长度+消息id+消息主体内容  3 
public class Message  4
 {     
 public IExtensible protobuf;
 
  public int messageId;  7 }  8   9 public class SocketClientTemp : MonoBehaviour 10 { 11     
  const int packageMaxLength = 1024; 12  13     Socket mSocket; 14     Thread threadSend; 15     
  Thread threadRecive; 16     Queue<Message> allMessages = new Queue<Message>(); 17     
  Queue<byte[]> sendQueue = new Queue<byte[]>();
  public bool Init() 20     
  { 21         //创建一个socket对象 22        
   mSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 23         
   return SocketConnection("此处是ip", 1111); 24     } 25  26     void Update() 27     { 28        
   AnalysisMessage(); 29     } 30  31     /// <summary> 32     /// 建立服务器连接 33     /// </summary> 34     
   /// <param name="ip">服务器的ip地址</param> 35     /// <param name="port">端口</param> 36     
   bool SocketConnection(string ip, int port) 37     { 38         try 39         { 40             
   IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(ip), port); 41            
   //同步连接服务器,实际使用时推荐使用异步连接,处理方式会在下一篇讲断线重连时讲到 42            
   mSocket.Connect(ipep); 43             //连接成功后,创建两个线程,分别用于发送和接收消息 44            
   threadSend = new Thread(new ThreadStart(SendMessage)); 45             threadSend.Start(); 46            
   threadRecive = new Thread(new ThreadStart(ReceiveMessage)); 47             threadRecive.Start(); 48            
   return true; 49         } 50         catch (Exception e) 51         { 52            
   Debug.Log(e.ToString()); 53             Close(); 54             return false; 55       
    }      }
         #region ...发送消息 59     
 /// <summary> 60     /// 添加数据到发送队列 61     
 /// </summary> 62     /// <param name="protobufModel"></param> 63     
 /// <param name="messageId"></param> 64    
 public void AddSendMessageQueue(IExtensible protobufModel, int messageId) 65     
{ 66         sendQueue.Enqueue(BuildPackage(protobufModel, messageId)); 67     } 
void SendMessage() 70     { 71         //循环获取发送队列中第一个数据,然后发送到服务器 72         
 while (true) 73         { 74             if (sendQueue.Count == 0) 75             { 76                 
 Thread.Sleep(100); 77                 continue; 78             } 79             
if (!mSocket.Connected) 80             { 81                 Close(); 82                 
break; 83             } 84             
else 85                 Send(sendQueue.Peek());
//发送队列中第一条数据 86         } 87     } 88  89    
 void Send(byte[] bytes) 90     { 91         try 92         
 { 93             mSocket.Send(bytes, SocketFlags.None); 94             
 //发送成功后,从发送队列中移除已发送的消息 95             sendQueue.Dequeue(); 96         
 } 97         catch (SocketException e) 98         { 99             
 //如果错误码为10035,说明服务器缓存区满了,所以等100毫秒再次发送100             
 if (e.NativeErrorCode == 10035)101             {102                
 Thread.Sleep(100);103                 Send(bytes);104             
 }105             else106                 
 Debug.Log(e.ToString());107         }108     }109     #endregion110 111     
 #region ...接收消息112     /// <summary>113     /// 解析收到的消息114     
 /// </summary>115     void AnalysisMessage()116     {117         
 while (allMessages.Count > 0)118         {119             
 int id = allMessages.Dequeue().messageId;120             
 switch (id)121             {122                
 //根据消息id做不同的处理123             }124         }125     }126 127     
 /// <summary>128     /// 接收数据129     /// </summary>130     
 void ReceiveMessage()131     {132         while (true)133         
 {134             if (!mSocket.Connected)135                 
 break;136             byte[] recvBytesHead = GetBytesReceive(4);137             
 int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(recvBytesHead, 0));138             
 byte[] recvBytesBody = GetBytesReceive(bodyLength);139 140             
 byte[] messageId = new byte[4];141             
 Array.Copy(recvBytesBody, 0, messageId, 0, 4);142             
 byte[] messageBody = new byte[bodyLength - 4];143             
 Array.Copy(recvBytesBody, 4, messageBody, 0, bodyLength - 4);144 145             
 if (BitConverter.IsLittleEndian)146                 
 Array.Reverse(messageId);147             
 FillAllPackages(BitConverter.ToInt32(messageId, 0), messageBody);148         
 }    
 }
  /// <summary>152    
   /// 填充接收消息队列153     /// </summary>154     
   /// <param name="messageId"></param>155     /// <param name="messageBody"></param>156     void FillAllPackages(int messageId, byte[] messageBody)157     {158         switch (messageId)159         {160             //根据消息id处理消息,并添加到接收消息队列161             case 1:162                 allMessages.Enqueue(new Message() 
163                 
{164                     
protobuf = ProtobufSerilizer.DeSerialize<TestTemp>(messageBody), 
165                     
messageId = messageId 
166                 });167                
 break;168         }169     }170 171     
/// <summary>172     /// 接收数据并处理173     
/// </summary>174     /// <param name="length"></param>175     
/// <returns></returns>176     byte[] GetBytesReceive(int length)177     
{178         byte[] recvBytes = new byte[length];179         while (length > 0)180         
{181             
byte[] receiveBytes = new byte[length < packageMaxLength ? length : packageMaxLength];182             
int iBytesBody = 0;183             if (length >= receiveBytes.Length)184                 
iBytesBody = mSocket.Receive(receiveBytes, receiveBytes.Length, 0);185             
else186                 iBytesBody = mSocket.Receive(receiveBytes, length, 0);187             
receiveBytes.CopyTo(recvBytes, recvBytes.Length - length);188             length -= iBytesBody;189         
}190         return recvBytes;191     }192     #endregion193 194     /// <summary>195     /// 构建消息数据包196     
/// </summary>197     /// <param name="protobufModel"></param>198     /// <param name="messageId"></param>199     
byte[] BuildPackage(IExtensible protobufModel, int messageId)200     {201         byte[] b;202         
if (protobufModel != null)203             b = ProtobufSerilizer.Serialize(protobufModel);204         
else205             b = new byte[0];206         //消息长度(int数据,长度4) + 消息id(int数据,长度4) + 消息主体内容207         
ByteBuffer buf = ByteBuffer.Allocate(b.Length + 4 + 4);208         //消息长度 = 消息主体内容长度 + 消息id长度209         
buf.WriteInt(b.Length + 4);210         buf.WriteInt(messageId);211 212         if (protobufModel != null)213            
 buf.WriteBytes(b);214         return buf.GetBytes();215     }216 217     void OnDestroy()218     {219         
 //停止运行后,如果不关闭socket多线程,再次运行时,unity会卡死220         Close();221     }222 223     
 /// <summary>224     /// 关闭socket,终止线程225     /// </summary>226     public void Close()227     
 {228         if (mSocket != null)229         {230             
 //微软官方推荐在关闭socket前先shutdown,但是经过测试,发现网络断开后,shutdown会无法执行231             
 if (mSocket.Connected)232                 mSocket.Shutdown(SocketShutdown.Both);233             
 mSocket.Close();234             mSocket = null;235         }236         //关闭线程237         
 if (threadSend != null)238             threadSend.Abort();239         if (threadRecive != null)240            
 threadRecive.Abort();241         threadSend = null;242         threadRecive = null;243     }244 }

An dieser Stelle geht es um die Verwendung von Sockets zum Verarbeiten von Nachrichten und Empfangen ist im Grunde genommen vorbei. Um das Erlebnis zu verbessern, können einige Projekte auch die Funktion zum Trennen und erneuten Verbinden hinzufügen. Diese Funktion wird im nächsten Artikel besprochen

Das obige ist der detaillierte Inhalt vonBeispiel für die Einführung eines Socket-Übertragungs-Protobuf-Byte-Streams. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn