Home >Backend Development >C#.Net Tutorial >About UDP in C# to achieve reliable transmission (group transmission of data packets)
UDP is used when doing connectionless transmission in c#, although it is not as stable and reliable as TCP. But the efficiency is higher, there are advantages and disadvantages
That is, sometimes packets are lost, and sometimes UDP has to be used, but how to achieve reliable transmission more stably is a question.
There is no size limit when TCP transmits data, but there is a size limit when UDP transmits. How can we achieve stable transmission of big data. We thought of subcontracting the data packets.
Split a large data into a series of small data packets and send them separately, and then the server will piece together the complete data after receiving it.
If packet loss occurs midway, resend.
UDP thread class implements packetized sending and resending of data. The specific receiving operation needs to implement the events
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net.Sockets; using Model; using System.Net; using Tool; using System.Threading; namespace ZZUdp.Core { //udp的类 public class UDPThread { #region 私有变量 UdpClient client;//UDP客户端 List<UdpPacket> sendlist;// 用于轮询是否发送成功的记录 Dictionary<long, RecDataList> RecListDic = new Dictionary<long, RecDataList>();//数据接收列表,每一个sequence对应一个 IPEndPoint remotIpEnd = null;//用来在接收数据的时候对远程主机的信息存放 int port=6666;//定义服务器的端口号 #endregion #region 属性 public int CheckQueueTimeInterval { get; set; }//检查发送队列间隔 public int MaxResendTimes { get; set; }//没有收到确认包时,最大重新发送的数目,超过此数目会丢弃并触发PackageSendFailture事件 #endregion #region 事件 /// <summary> /// 当数据包收到时触发 /// </summary> public event EventHandler<PackageEventArgs> PackageReceived; /// <summary> /// 当数据包收到事件触发时,被调用 /// </summary> /// <param name="e">包含事件的参数</param> protected virtual void OnPackageReceived(PackageEventArgs e) { if (PackageReceived != null) PackageReceived(this, e); } /// <summary> /// 数据包发送失败 /// </summary> public event EventHandler<PackageEventArgs> PackageSendFailure; /// <summary> /// 当数据发送失败时调用 /// </summary> /// <param name="e">包含事件的参数</param> protected virtual void OnPackageSendFailure(PackageEventArgs e) { if (PackageSendFailure != null) PackageSendFailure(this, e); } /// <summary> /// 数据包未接收到确认,重新发送 /// </summary> public event EventHandler<PackageEventArgs> PackageResend; /// <summary> /// 触发重新发送事件 /// </summary> /// <param name="e">包含事件的参数</param> protected virtual void OnPackageResend(PackageEventArgs e) { if (PackageResend != null) PackageResend(this, e); } #endregion //无参构造函数 public UDPThread() { } //构造函数 public UDPThread(string ipaddress, int port) { IPAddress ipA = IPAddress.Parse(ipaddress);//构造远程连接的参数 IPEndPoint ipEnd = new IPEndPoint(ipA, port); client = new UdpClient();// client = new UdpClient(ipEnd)这样的话就没有创建远程连接 client.Connect(ipEnd);//使用指定的远程主机信息建立默认远程主机连接 sendlist = new List<UdpPacket>(); CheckQueueTimeInterval = 2000;//轮询间隔时间 MaxResendTimes = 5;//最大发送次数 new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();//启动轮询线程 //开始监听数据 AsyncReceiveData(); } /// <summary> /// 同步数据接收方法 /// </summary> public void ReceiveData() { while (true) { IPEndPoint retip = null; UdpPacket udpp = null; try { byte[] data = client.Receive(ref retip);//接收数据,当Client端连接主机的时候,retip就变成Cilent端的IP了 udpp = (UdpPacket)SerializationUnit.DeserializeObject(data); } catch (Exception ex) { //异常处理操作 } if (udpp != null) { PackageEventArgs arg = new PackageEventArgs(udpp, retip); OnPackageReceived(arg);//数据包收到触发事件 } } } //异步接受数据 public void AsyncReceiveData() { try { client.BeginReceive(new AsyncCallback(ReceiveCallback), null); } catch (SocketException ex) { throw ex; } } //接收数据的回调函数 public void ReceiveCallback(IAsyncResult param) { if (param.IsCompleted) { UdpPacket udpp = null; try { byte[] data = client.EndReceive(param, ref remotIpEnd);//接收数据,当Client端连接主机的时候,test就变成Cilent端的IP了 udpp = (UdpPacket)SerializationUnit.DeserializeObject(data); } catch (Exception ex) { //异常处理操作 } finally { AsyncReceiveData(); } if (udpp != null)//触发数据包收到事件 { PackageEventArgs arg = new PackageEventArgs(udpp, null); OnPackageReceived(arg); } } } /// <summary> /// 同步发送分包数据 /// </summary> /// <param name="message"></param> public void SendData(Msg message) { ICollection<UdpPacket> udpPackets = UdpPacketSplitter.Split(message); foreach (UdpPacket udpPacket in udpPackets) { byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket); //使用同步发送 client.Send(udpPacketDatagram, udpPacketDatagram.Length,udpPacket.remoteip); if (udpPacket.IsRequireReceiveCheck) PushSendItemToList(udpPacket);//将该消息压入列表 } } /// <summary> /// 异步分包发送数组的方法 /// </summary> /// <param name="message"></param> public void AsyncSendData(Msg message) { ICollection<UdpPacket> udpPackets = UdpPacketSplitter.Split(message); foreach (UdpPacket udpPacket in udpPackets) { byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket); //使用同步发送 //client.Send(udpPacketDatagram, udpPacketDatagram.Length); //使用异步的方法发送数据 this.client.BeginSend(udpPacketDatagram, udpPacketDatagram.Length, new AsyncCallback(SendCallback), null); } } //发送完成后的回调方法 public void SendCallback(IAsyncResult param) { if (param.IsCompleted) { try { client.EndSend(param);//这句话必须得写,BeginSend()和EndSend()是成对出现的 } catch (Exception e) { //其他处理异常的操作 } } } static object lockObj = new object(); /// <summary> /// 自由线程,检测未发送的数据并发出,存在其中的就是没有收到确认包的数据包 /// </summary> void CheckUnConfirmedQueue() { do { if (sendlist.Count > 0) { UdpPacket[] array = null; lock (sendlist) { array = sendlist.ToArray(); } //挨个重新发送并计数 Array.ForEach(array, s => { s.sendtimes++; if (s.sendtimes >= MaxResendTimes) { //sOnPackageSendFailure//出发发送失败事件 sendlist.Remove(s);//移除该包 } else { //重新发送 byte[] udpPacketDatagram = SerializationUnit.SerializeObject(s); client.Send(udpPacketDatagram, udpPacketDatagram.Length, s.remoteip); } }); } Thread.Sleep(CheckQueueTimeInterval);//间隔一定时间重发数据 } while (true); } /// <summary> /// 将数据信息压入列表 /// </summary> /// <param name="item"></param> void PushSendItemToList(UdpPacket item) { sendlist.Add(item); } /// <summary> /// 将数据包从列表中移除 /// </summary> /// <param name="packageNo">数据包编号</param> /// <param name="packageIndex">数据包分包索引</param> public void PopSendItemFromList(long packageNo, int packageIndex) { lock (lockObj) { Array.ForEach(sendlist.Where(s => s.sequence == packageNo && s.index == packageIndex).ToArray(), s => sendlist.Remove(s)); } } /// <summary> /// 关闭客户端并释放资源 /// </summary> public void Dispose() { if (client != null) { client.Close(); client = null; } } } }
The first is the data information entity class
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net; namespace Model { //封装消息类 [Serializable] public class Msg { //所属用户的用户名 public string name { get; set; } //所属用户的ip public string host { get; set; } //命令的名称 public string command { get; set; } //收信人的姓名 public string desname { get; set; } //你所发送的消息的目的地ip,应该是对应在服务器的列表里的主键值 public string destinationIP { get; set; } //端口号 public int port { get; set; } //文本消息 public string msg { get; set; } //二进制消息 public byte[] byte_msg { get; set; } //附加数据 public string extend_msg { get; set; } //时间戳 public DateTime time { get; set; } //构造函数 public Msg(string command,string desip,string msg,string host) { this.command = command; this.destinationIP = desip; this.msg = msg; this.time = DateTime.Now; this.host = host; } override public string ToString() { return name + "说:" + msg; } } }
MSG data is segmented to generate subcontracted data
Subcontracted entity class
using System; using System.Collections.Generic; using System.Linq; using System.Text; using Tool; using System.Net; namespace Model { [Serializable] public class UdpPacket { public long sequence{get;set;}//所属组的唯一序列号 包编号 public int total { get; set; }//分包总数 public int index { get; set; }//消息包的索引 public byte[] data { get; set; }//包的内容数组 public int dataLength { get; set; }//分割的数组包大小 public int remainder { get; set; }//最后剩余的数组的数据长度 public int sendtimes { get; set; }//发送次数 public IPEndPoint remoteip { get; set; }//接受该包的远程地址 public bool IsRequireReceiveCheck { get; set; }//获得或设置包收到时是否需要返回确认包 public static int HeaderSize = 30000; public UdpPacket(long sequence, int total, int index, byte[] data, int dataLength, int remainder,string desip,int port) { this.sequence = sequence; this.total = total; this.index = index; this.data = data; this.dataLength = dataLength; this.remainder = remainder; this.IsRequireReceiveCheck = true;//默认都需要确认包 //构造远程地址 IPAddress ipA = IPAddress.Parse(desip); this.remoteip = new IPEndPoint(ipA, port); } //把这个对象生成byte[] public byte[] ToArray() { return SerializationUnit.SerializeObject(this); } } }
Data packet segmentation tool class
using System; using System.Collections.Generic; using System.Linq; using System.Text; using Tool; namespace Model { /// <summary> /// UDP数据包分割器 /// </summary> public static class UdpPacketSplitter { public static ICollection<UdpPacket> Split(Msg message) { byte[] datagram = null; try { datagram = SerializationUnit.SerializeObject(message); } catch (Exception e) { //AddTalkMessage("数据转型异常"); } //产生一个序列号,用来标识包数据属于哪一组 Random Rd = new Random(); long SequenceNumber = Rd.Next(88888, 999999); ICollection<UdpPacket> udpPackets = UdpPacketSplitter.Split(SequenceNumber, datagram, 10240, message.destinationIP, message.port); return udpPackets; } /// <summary> /// 分割UDP数据包 /// </summary> /// <param name="sequence">UDP数据包所持有的序号</param> /// <param name="datagram">被分割的UDP数据包</param> /// <param name="chunkLength">分割块的长度</param> /// <returns> /// 分割后的UDP数据包列表 /// </returns> public static ICollection<UdpPacket> Split(long sequence, byte[] datagram, int chunkLength,string desip,int port) { if (datagram == null) throw new ArgumentNullException("datagram"); List<UdpPacket> packets = new List<UdpPacket>(); int chunks = datagram.Length / chunkLength; int remainder = datagram.Length % chunkLength; int total = chunks; if (remainder > 0) total++; for (int i = 1; i <= chunks; i++) { byte[] chunk = new byte[chunkLength]; Buffer.BlockCopy(datagram, (i - 1) * chunkLength, chunk, 0, chunkLength); packets.Add(new UdpPacket(sequence, total, i, chunk, chunkLength, remainder, desip, port)); } if (remainder > 0) { int length = datagram.Length - (chunkLength * chunks); byte[] chunk = new byte[length]; Buffer.BlockCopy(datagram, chunkLength * chunks, chunk, 0, length); packets.Add(new UdpPacket(sequence, total, total, chunk, chunkLength, remainder, desip, port)); } return packets; } } }
Data structure of server-side data storage
using System; using System.Collections.Generic; using System.Linq; using System.Text; using Tool; using Model; namespace Model { //一个sequence对应一组的数据包的数据结构 public class RecDataList { public long sequence { get; set; }//序列号 //对应的存储包的List List<UdpPacket> RecudpPackets = new List<UdpPacket>(); public int total { get; set; } public int dataLength { get; set; } public int remainder { get; set; } public byte[] DataBuffer = null; public RecDataList(UdpPacket udp) { this.sequence = udp.sequence; this.total = udp.total; this.dataLength = udp.dataLength; this.remainder = udp.remainder; if (DataBuffer == null) { DataBuffer = new byte[dataLength * (total - 1) + remainder]; } } public RecDataList(long sequence, int total, int chunkLength, int remainder) { this.sequence = sequence; this.total = total; this.dataLength = chunkLength; this.remainder = remainder; if (DataBuffer == null) { DataBuffer = new byte[this.dataLength * (this.total - 1) + this.remainder]; } } public void addPacket(UdpPacket p) { RecudpPackets.Add(p); } public Msg show() { if (RecudpPackets.Count == total)//表示已经收集满了 { //重组数据 foreach (UdpPacket udpPacket in RecudpPackets) { //偏移量 int offset = (udpPacket.index - 1) * udpPacket.dataLength; Buffer.BlockCopy(udpPacket.data, 0, DataBuffer, offset, udpPacket.data.Length); } Msg rmsg = (Msg)SerializationUnit.DeserializeObject(DataBuffer); DataBuffer = null; RecudpPackets.Clear(); return rmsg; } else { return null; } } public bool containskey(UdpPacket udp) { foreach (UdpPacket udpPacket in RecudpPackets) { if (udpPacket.index == udp.index) return true; } return false; } } }
Encoding tool class
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.Serialization.Formatters.Binary; using System.IO; namespace Tool { public class EncodingTool { //编码 public static byte[] EncodingASCII(string buf) { byte[] data = Encoding.Unicode.GetBytes(buf); return data; } //解码 public static string DecodingASCII(byte[] bt) { string st = Encoding.Unicode.GetString(bt); return st; } //编码 public static byte[] EncodingUTF_8(string buf) { byte[] data = Encoding.UTF8.GetBytes(buf); return data; } //编码 public static string DecodingUTF_8(byte[] bt) { string st = Encoding.UTF8.GetString(bt); return st; } } }
Serialization and deserialization Tool Class
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.Serialization.Formatters.Binary; using System.IO; namespace Tool { public class SerializationUnit { /// <summary> /// 把对象序列化为字节数组 /// </summary> public static byte[] SerializeObject(object obj) { if (obj == null) return null; //内存实例 MemoryStream ms = new MemoryStream(); //创建序列化的实例 BinaryFormatter formatter = new BinaryFormatter(); formatter.Serialize(ms, obj);//序列化对象,写入ms流中 ms.Position = 0; //byte[] bytes = new byte[ms.Length];//这个有错误 byte[] bytes = ms.GetBuffer(); ms.Read(bytes, 0, bytes.Length); ms.Close(); return bytes; } /// <summary> /// 把字节数组反序列化成对象 /// </summary> public static object DeserializeObject(byte[] bytes) { object obj = null; if (bytes == null) return obj; //利用传来的byte[]创建一个内存流 MemoryStream ms = new MemoryStream(bytes); ms.Position = 0; BinaryFormatter formatter = new BinaryFormatter(); obj = formatter.Deserialize(ms);//把内存流反序列成对象 ms.Close(); return obj; } /// <summary> /// 把字典序列化 /// </summary> /// <param name="dic"></param> /// <returns></returns> public static byte[] SerializeDic(Dictionary<string, object> dic) { if (dic.Count == 0) return null; MemoryStream ms = new MemoryStream(); BinaryFormatter formatter = new BinaryFormatter(); formatter.Serialize(ms, dic);//把字典序列化成流 byte[] bytes = new byte[ms.Length];//从流中读出byte[] ms.Read(bytes, 0, bytes.Length); return bytes; } /// <summary> /// 反序列化返回字典 /// </summary> /// <param name="bytes"></param> /// <returns></returns> public static Dictionary<string, object> DeserializeDic(byte[] bytes) { Dictionary<string, object> dic = null; if (bytes == null) return dic; //利用传来的byte[]创建一个内存流 MemoryStream ms = new MemoryStream(bytes); ms.Position = 0; BinaryFormatter formatter = new BinaryFormatter(); //把流中转换为Dictionary dic = (Dictionary<string, object>)formatter.Deserialize(ms); return dic; } } }
General Data Packet Event Class
using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading; using Model; namespace ZZUdp.Core { /// <summary> /// 数据包事件数据 /// </summary> public class PackageEventArgs : EventArgs { /// <summary> /// 网络消息包 /// </summary> public UdpPacket udpPackage { get; set; } /// <summary> /// 网络消息包组 /// </summary> public UdpPacket[] udpPackages { get; set; } /// <summary> /// 远程IP /// </summary> public IPEndPoint RemoteIP { get; set; } /// <summary> /// 是否已经处理 /// </summary> public bool IsHandled { get; set; } /// <summary> /// 创建一个新的 PackageEventArgs 对象. /// </summary> public PackageEventArgs(UdpPacket package, IPEndPoint RemoteIP) { this.udpPackage = package; this.RemoteIP = RemoteIP; this.IsHandled = false; } } }
The above is the content about UDP in C# to achieve reliable transmission (group transmission of data packets). For more related content, please pay attention to PHP Chinese Net (www.php.cn)!