Heim >Backend-Entwicklung >C#.Net-Tutorial >c# UDP-Kommunikationsframework basierend auf einem Ereignismodell (geeignet für die Kodierung und Dekodierung von Netzwerkpaketen)
Ich habe bereits einen Artikel über die Unterauftragsvergabe und das Senden von c#udp geschrieben
Die in diesem Artikel vorgestellte Methode ist eine Implementierung, aber es gibt einen Nachteil, dass ein Objekt nach der Serialisierung stark zunimmt. Dies ist nicht der Fall förderlich für die Übertragung im Netzwerk.
Unsere Übertragung im Netzwerk muss die Größe der übertragenen Datenpakete so weit wie möglich reduzieren, daher habe ich auf einige Informationen im Internet und einige Open-Source-Projekte verwiesen (http://www.php.cn). /) Das obige Open-Source-Feige-Übertragungsframework
bedeutet eigentlich, die zu übertragenden Daten gemäß bestimmten Vorschriften in ein Byte-Array einzufügen und die Daten nach dem Empfang entsprechend dem entsprechenden Format zu analysieren Um die Daten zu reduzieren, wurde die Komprimierung von GZipStream verwendet. Das vorherige Problem trat bei der Dekomprimierung auf, wurde jedoch jetzt behoben.
Zuerst müssen wir ein Format definieren, das die von uns übertragenen Datenpakete darstellen kann
public class PacketNetWorkMsg : IComparable<PacketNetWorkMsg> { /// <summary> /// 封包版本 /// </summary> public int Version { get; set; } /// <summary> /// 要发送的数据包 /// </summary> public byte[] Data { get; set; } /// <summary> /// 数据包所含数据长度 /// </summary> public int DataLength { get; set; } /// <summary> /// 分包后最后一个剩余长度 /// </summary> public int Remainder { get; set; } /// <summary> /// 远程地址 /// </summary> public IPEndPoint RemoteIP { get; set; } /// <summary> /// 发送次数 /// </summary> public int SendTimes { get; set; } /// <summary> /// 包编号 /// </summary> public long PackageNo { get; set; } /// <summary> /// 分包索引 /// </summary> public int PackageIndex { get; set; } /// <summary> /// 分包总数 /// </summary> public int PackageCount { get; set; } /// <summary> /// 获得或设置是否需要返回已收到标志 /// </summary> public bool IsRequireReceiveCheck { get; set; } public PacketNetWorkMsg() { Version = 1; CreationTime = DateTime.Now; } public PacketNetWorkMsg(long packageNo, int Count, int index, byte[] data, int dataLength, int remainder, IPEndPoint desip, bool IsRequireReceive) { this.PackageNo = packageNo; this.PackageCount = Count; this.PackageIndex = index; this.Data = data; this.DataLength = dataLength; this.Remainder = remainder; this.IsRequireReceiveCheck = IsRequireReceive;//默认都需要确认包 this.RemoteIP = desip; } #region IComparable<PackedNetworkMessage> 成员 public int CompareTo(PacketNetWorkMsg other) { return PackageIndex < other.PackageIndex ? -1 : 1; } #endregion /// <summary> /// 获得生成数据包的时间 /// </summary> public DateTime CreationTime { get; private set; } }
Diese Klasse ist das spezifisch kleinste Paket, das wir im Netzwerk übertragen müssen
Dann gibt es eine weitere Msg-Klasse, die wir verwenden, um das Format der von uns übertragenen Daten darzustellen. Eine Msg-Klasse kann zur Übertragung in mehrere PacketNetWorkMsg unterteilt werden. 64k) kann UDP zur Übertragung großer Datenmengen verwenden. Obwohl TCP eine sehr gute Möglichkeit ist, möchte ich Ihnen nicht sagen, warum.
public class Msg { /// <summary> /// 是否已经被处理.在挂钩过程中,如果为true,则底层代码不会再对信息进行处理 /// </summary> public bool Handled { get; set; } /// <summary> /// 获得或设置当前的消息编号 /// </summary> /// <value></value> /// <remarks></remarks> public long PackageNo { get; set; } /// <summary> /// 获得或设置当前的消息所属的主机名 /// </summary> public string HostName { get; set; } /// <summary> /// 获得或设置当前的消息所属的用户名 /// </summary> public string UserName { get; set; } /// <summary> /// 获得或设置当前的命令代码 /// </summary> //命令的名称 public Commands Command { get; set; } /// <summary> /// 获得或设置当前的消息的类型 文本消息,或者二进制消息 /// </summary> public Consts Type { get; set; } /// <summary> /// 获得或设置当前的命令消息文本 /// </summary> public string NormalMsg { get; set; } /// <summary> /// 消息文本字节 /// </summary> public byte[] NormalMsgBytes { get; set; } /// <summary> /// 扩展消息文本字节 /// </summary> public byte[] ExtendMessageBytes { get; set; } /// <summary> /// 获得或设置当前命令的扩展文本 /// </summary> public string ExtendMessage { get; set; } /// <summary> /// 远程地址 /// </summary> public IPEndPoint RemoteAddr { get; set; } /// <summary> /// 主机地址 /// </summary> public IPEndPoint HostAddr { get; set; } /// <summary> /// 获得或设置是否需要返回已收到标志 /// </summary> public bool IsRequireReceive { get; set; } public Msg(IPEndPoint Addr) { RemoteAddr = Addr; Handled = false; Type = Consts.MESSAGE_TEXT; } public Msg(IPEndPoint hostIP,IPEndPoint remoteIP,Commands cmd) { HostAddr = hostIP; RemoteAddr = remoteIP; Command = cmd; Handled = false; Type = Consts.MESSAGE_TEXT; } public Msg(IPEndPoint addr, string hostName, string userName,Commands command, string message, string extendMessage) { RemoteAddr = addr; Handled = false; HostName = hostName; UserName = userName; Command = command; NormalMsg = message; ExtendMessage = extendMessage; Type = Consts.MESSAGE_TEXT; } /// <summary> /// 直接创建一个新的Message对象 /// </summary> /// <param name="host">主机对象</param> /// <param name="addr">远程地址</param> /// <param name="hostName">主机名</param> /// <param name="userName">用户名</param> /// <param name="command">命令</param> /// <param name="options">选项</param> /// <param name="message">信息</param> /// <param name="extendMessage">扩展信息</param> /// <returns></returns> public static Msg Create(Host host, IPEndPoint addr, string hostName, string userName, Commands command, string message, string extendMessage) { return new Msg(addr,hostName, userName, command,message, extendMessage); } }
Derzeit sind diese beiden Klassen unsere Hauptdatenstrukturen.
Dann kommt als nächstes unsere Hauptklasse, eine Klasse zum Unterverpacken und Gruppieren
/// <summary> /// 消息封包类 /// </summary> public class MessagePacker { Timer _timer; public MessagePacker() { _timer = new Timer(_ => CheckForOutdateMessage(), null, new TimeSpan(0, 5, 0), new TimeSpan(0, 0, 5, 0)); } /* * 消息包注意: * 1.第一位始终是2(ASCII码50) * 2.第二位到第九位是一个long类型的整数,代表消息编号 * 3.第十位到第十三位是一个int类型的整数,代表消息内容总长度 * 4.第十四位到第十七位是一个int类型的整数,代表分包的总数 * 5.第十八位到第二十一位是一个int类型的整数,代表当前的分包编号 * 6.第二十二位表示是否需要返回一个确认标识(1/0) * 7.第二十三到第三十一位是保留的(Reserved) * 8.第三十二字节以后是数据包 * */ /// <summary> /// 消息版本号 /// </summary> public static byte VersionHeader { get { return 50; } } /// <summary> /// 返回当前消息封包的头字节数 /// </summary> public static int PackageHeaderLength { get { return 32; } } /// <summary> /// 获得消息包的字节流 /// </summary> /// <param name="message">要打包的消息对象</param> /// <returns></returns> public static PacketNetWorkMsg[] BuildNetworkMessage(Msg message) { if (message.ExtendMessageBytes != null) { return BuildNetworkMessage( message.RemoteAddr, message.PackageNo, message.Command, message.UserName, message.HostName, message.Type, message.NormalMsgBytes, message.ExtendMessageBytes, message.IsRequireReceive ); } else { return BuildNetworkMessage( message.RemoteAddr, message.PackageNo, message.Command, message.UserName, message.HostName, message.Type, System.Text.Encoding.Unicode.GetBytes(message.NormalMsg), System.Text.Encoding.Unicode.GetBytes(message.ExtendMessage), message.IsRequireReceive ); } } /// <summary> /// 获得消息包的字节流 /// </summary> /// <param name="remoteIp">远程主机地址</param> /// <param name="packageNo">包编号</param> /// <param name="command">命令</param> /// <param name="options">参数</param> /// <param name="userName">用户名</param> /// <param name="hostName">主机名</param> /// <param name="content">正文消息</param> /// <param name="extendContents">扩展消息</param> /// <returns></returns> public static PacketNetWorkMsg[] BuildNetworkMessage(IPEndPoint remoteIp, long packageNo, Commands command, string userName, string hostName,Consts type ,byte[] content, byte[] extendContents, bool RequireReceiveCheck) { //每次发送所能容下的数据量 int maxBytesPerPackage = (int)Consts.MAX_UDP_PACKAGE_LENGTH - PackageHeaderLength; //压缩数据流 var ms = new MemoryStream(); //var dest = new MemoryStream(); //var zip = new GZipStream(dest, CompressionMode.Compress); var bw = new BinaryWriter(ms, System.Text.Encoding.Unicode); //写入头部数据 bw.Write(packageNo); //包编号 bw.Write(userName); //用户名 bw.Write(hostName); //主机名 bw.Write((long)command); //命令 bw.Write((long)type); //数据类型 bw.Write(content == null ? 0 : content.Length);//数据长度 //写入消息数据 if (content != null) bw.Write(content); bw.Write(extendContents == null ? 0 : extendContents.Length);//补充数据长度 if (extendContents != null) bw.Write(extendContents); ms.Flush(); ms.Seek(0, System.IO.SeekOrigin.Begin); byte[] ibuf = ms.ToArray(); var dest = new System.IO.MemoryStream(); GZipStream zipStream = new GZipStream(dest, CompressionMode.Compress, true); byte[] buff = new byte[1024]; int offset; ms.Seek(0, SeekOrigin.Begin); while ((offset = ms.Read(buff, 0, buff.Length)) > 0) { zipStream.Write(buff, 0, offset);//先把数据用二进制写入内存,然后在把它用zip压缩,获取压缩过后的二进制流dest } zipStream.Close(); bw.Close(); ms.Close(); dest.Seek(0, SeekOrigin.Begin); //打包数据总量 int dataLength = (int)dest.Length; int packageCount = (int)Math.Ceiling(dataLength * 1.0 / maxBytesPerPackage); PacketNetWorkMsg[] pnma = new PacketNetWorkMsg[packageCount]; for (int i = 0; i < packageCount; i++) { int count = i == packageCount - 1 ? dataLength - maxBytesPerPackage * (packageCount - 1) : maxBytesPerPackage; byte[] buf = new byte[count + PackageHeaderLength]; buf[0] = VersionHeader;//版本号 第1位 BitConverter.GetBytes(packageNo).CopyTo(buf, 1);//消息编号 第2到9位 long类型的整数 BitConverter.GetBytes(dataLength).CopyTo(buf, 9);//消息内容长度 第10到13位 int类型的整数 BitConverter.GetBytes(packageCount).CopyTo(buf, 13);//分包总数 第14位到第17位 int类型的整数 BitConverter.GetBytes(i).CopyTo(buf, 17);//分包编号 第18位到第21位 int类型的整数 buf[21] = RequireReceiveCheck ? (byte)1 : (byte)0;//是否回确认包 第22位 //第23到第31位是保留的(Reserved) dest.Read(buf, 32, buf.Length - 32);//第32字节以后是,具体的数据包 pnma[i] = new PacketNetWorkMsg() { Data = buf, PackageCount = packageCount, PackageIndex = i, PackageNo = packageNo, RemoteIP = remoteIp, SendTimes = 0, Version = 2, IsRequireReceiveCheck = buf[21] == 1 }; } return pnma; } /// <summary> /// 检测确认是否是这个类型的消息包 /// </summary> /// <param name="buffer"></param> /// <returns></returns> public static bool Test(byte[] buffer) { return buffer != null && buffer.Length > PackageHeaderLength && buffer[0] == VersionHeader; } /// <summary> /// 缓存接收到的片段 /// </summary> static Dictionary<long, PacketNetWorkMsg[]> packageCache = new Dictionary<long, PacketNetWorkMsg[]>(); /// <summary> /// 分析网络数据包并进行转换为信息对象 /// </summary> /// <param name="packs">接收到的封包对象</param> /// <returns></returns> /// <remarks> /// 对于分包消息,如果收到的只是片段并且尚未接收完全,则不会进行解析 /// </remarks> public static Msg ParseToMessage(params PacketNetWorkMsg[] packs) { if (packs.Length == 0 || (packs[0].PackageCount > 1 && packs.Length != packs[0].PackageCount)) return null; var ms = DecompressMessagePacks(packs); if (ms == null) { //事件 return null; } //构造读取流 System.IO.BinaryReader br = new System.IO.BinaryReader(ms, System.Text.Encoding.Unicode); //开始读出数据 Msg m = new Msg(packs[0].RemoteIP); m.PackageNo = br.ReadInt64();//包编号 m.UserName = br.ReadString();//用户名 m.HostName = br.ReadString();//主机名 m.Command = (Commands)br.ReadInt64(); //命令 m.Type = (Consts)br.ReadInt64();//数据类型 int length = br.ReadInt32(); //数据长度 m.NormalMsgBytes = new byte[length]; br.Read(m.NormalMsgBytes, 0, length);//读取内容 length = br.ReadInt32(); //附加数据长度 m.ExtendMessageBytes = new byte[length]; br.Read(m.ExtendMessageBytes, 0, length);//读取附加数据 if (m.Type == Consts.MESSAGE_TEXT) { m.NormalMsg = System.Text.Encoding.Unicode.GetString(m.NormalMsgBytes, 0, length); //正文 m.ExtendMessage = System.Text.Encoding.Unicode.GetString(m.ExtendMessageBytes, 0, length); //扩展消息 m.ExtendMessageBytes = null; m.NormalMsgBytes = null; } return m; } /// <summary> /// 组合所有的网络数据包并执行解压缩 /// </summary> /// <param name="packs"></param> /// <returns></returns> static MemoryStream DecompressMessagePacks(params PacketNetWorkMsg[] packs) { try { //尝试解压缩,先排序 Array.Sort(packs); var msout = new MemoryStream(); using (var ms = new System.IO.MemoryStream()) { //合并写入 //Array.ForEach(packs, s => ms.Write(s.Data, 32, s.Data.Length-32)); Array.ForEach(packs, s => ms.Write(s.Data, 0, s.Data.Length)); ms.Seek(0, SeekOrigin.Begin); //解压缩 using (var gz = new GZipStream(ms, CompressionMode.Decompress)) { var buffer = new byte[0x400]; var count = 0; while ((count = gz.Read(buffer, 0, buffer.Length)) > 0) { msout.Write(buffer, 0, count); } } } msout.Seek(0, SeekOrigin.Begin); return msout; } catch (Exception) { return null; } } /// <summary> /// 尝试将收到的网络包解析为实体 /// </summary> /// <param name="pack">收到的网络包</param> /// <returns></returns> /// <remarks>如果收到的包是分片包,且其所有子包尚未接受完全,则会返回空值</remarks> public static Msg TryToTranslateMessage(PacketNetWorkMsg pack) { if (pack == null || pack.PackageIndex > pack.PackageCount - 1) return null; else if (pack.PackageCount == 1) return ParseToMessage(pack); else { lock (packageCache) { if (packageCache.ContainsKey(pack.PackageNo)) { PacketNetWorkMsg[] array = packageCache[pack.PackageNo]; array[pack.PackageIndex] = pack; //检测是否完整 if (Array.FindIndex(array, s => s == null) == -1) { packageCache.Remove(pack.PackageNo); return ParseToMessage(array); } else { return null; } } else { PacketNetWorkMsg[] array = new PacketNetWorkMsg[pack.PackageCount]; array[pack.PackageIndex] = pack; packageCache.Add(pack.PackageNo, array); return null; } } } } /// <summary> /// 将网络信息解析为封包 /// </summary> /// <param name="buffer"></param> /// <returns></returns> public static PacketNetWorkMsg Parse(byte[] buffer, IPEndPoint clientAddress) { if (!Test(buffer)) return null; PacketNetWorkMsg p = new PacketNetWorkMsg() { RemoteIP = clientAddress, SendTimes = 0 }; p.PackageNo = BitConverter.ToInt64(buffer, 1);//包编号 p.DataLength = (int)BitConverter.ToInt64(buffer, 9); //内容长度 p.PackageCount = BitConverter.ToInt32(buffer, 13);//分包总数 p.PackageIndex = BitConverter.ToInt32(buffer, 17);//索引 p.IsRequireReceiveCheck = buffer[21] == 1;//是否需要回包 p.Data = new byte[buffer.Length - PackageHeaderLength]; Array.Copy(buffer, PackageHeaderLength, p.Data, 0, p.Data.Length); return p; } void CheckForOutdateMessage() { lock (packageCache) { //TODO 这里设置最短的过期时间为5分钟,也就是说五分钟之前的消息会被干掉 var minTime = DateTime.Now.AddMinutes(5.0); var targetList = new List<long>(); foreach (var pkgid in packageCache.Keys) { if (Array.TrueForAll(packageCache[pkgid], s => s == null || s.CreationTime < minTime)) { targetList.Add(pkgid); } } foreach (var pkgid in targetList) { packageCache.Remove(pkgid); } } } #region 事件 /// <summary> /// 网络层数据包解压缩失败 /// </summary> public static event EventHandler<PackageEventArgs> DecompressFailed; /// <summary> /// 触发解压缩失败事件 /// </summary> /// <param name="e">事件包含的参数</param> protected static void OnDecompressFailed(PackageEventArgs e) { if (DecompressFailed != null) DecompressFailed(typeof(MessagePacker),e); } #endregion }
Mit der BuildNetworkMessage-Methode kann ein Msg-Objekt in ein oder mehrere Pakete aufgeteilt werden, und dann können die aufgeteilten Pakete einzeln über die zuvor verwendete Methode gesendet werden. Nachdem Sie das Datenpaket empfangen haben, verwenden Sie die TryToTranslateMessage-Methode, um das Datenpaket zu einer Msg
zusammenzusetzen, gefolgt von einer zugrunde liegenden UDP-Kommunikationsklasse
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net.Sockets; using Netframe.Model; using System.Net; using System.Threading; using Netframe.Tool; using Netframe.Event; namespace Netframe.Core { /// <summary> /// 基本通信类 UDP,能够进行基本数据发送,UdpPacketMsg的发送,数据收到时触发事件 /// </summary> public class UDPThread { #region 私有变量 /// <summary> /// 配置信息 /// </summary> Config _config; /// <summary> /// UDP客户端 /// </summary> UdpClient client; /// <summary> /// 用于轮询是否发送成功的记录 /// </summary> List<PacketNetWorkMsg> SendList; #endregion #region 属性 /// <summary> /// 是否已经初始化了 /// </summary> public bool IsInitialized { get; private set; } /// <summary> /// 是否建立连接 /// </summary> public bool IsConnect { get; private set; } /// <summary> /// 检查发送队列间隔 /// </summary> public int CheckQueueTimeInterval { get; set; } /// <summary> /// 没有收到确认包时,最大重新发送的数目,超过此数目会丢弃并触发PackageSendFailture 事件。 /// </summary> public int MaxResendTimes { get; set; } #endregion #region 构造函数 /// <summary> /// 构造一个新的消息对象,并绑定到指定的端口和IP上。 /// </summary> /// <param name="ip">绑定的IP</param> /// <param name="port">绑定的端口</param> public UDPThread(int port) { IsInitialized = false; IPAddress LocalIPAddress = null; //获得本机当前的ip try { IPAddress[] address = Dns.GetHostAddresses(Dns.GetHostName()); foreach (IPAddress addr in address) { if (addr.AddressFamily.ToString().Equals("InterNetwork")) { LocalIPAddress = addr; break; } } } catch (Exception) { OnLocalIpError(new EventArgs()); //获取本机ip异常 return; } try { client = new UdpClient(new IPEndPoint(LocalIPAddress, port)); IsConnect = false; } catch (Exception) { OnNetworkError(new EventArgs()); return; } SendList = new List<PacketNetWorkMsg>(); client.EnableBroadcast = true; CheckQueueTimeInterval = 2000; MaxResendTimes = 5; new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start(); IsInitialized = true; //开始监听 client.BeginReceive(ReceiveDataAsync, null); //ReceiveData(); } public UDPThread(Config config) { IsInitialized = false; try { client = new UdpClient(new IPEndPoint(config.BindedIP, config.Port)); } catch (Exception) { OnNetworkError(new EventArgs()); return; } SendList = new List<PacketNetWorkMsg>(); client.EnableBroadcast = true; this._config = config; CheckQueueTimeInterval = 2000; MaxResendTimes = 5; new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start(); IsInitialized = true; //开始监听 client.BeginReceive(ReceiveDataAsync, null); } /// <summary> /// 构造函数与远程主机连接 /// </summary> /// <param name="ipaddress">绑定ip</param> /// <param name="port">端口</param> public UDPThread(string ip, int port) { IsInitialized = false; IPAddress ipaddress = IPAddress.Parse(ip);//构造远程连接的参数 try { client = new UdpClient(); client.Connect(new IPEndPoint(ipaddress, port));//与远程服务器建立连接ps:只是形式上,udp本身无连接的 IsConnect = true; } catch (Exception) { OnNetworkError(new EventArgs()); return; } SendList = new List<PacketNetWorkMsg>(); client.EnableBroadcast = true; CheckQueueTimeInterval = 2000; MaxResendTimes = 5; new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start(); IsInitialized = true; //开始监听 client.BeginReceive(ReceiveDataAsync, null); //ReceiveData(); } #endregion #region 私有方法 /// <summary> /// 接收数据的方法 /// </summary> /// <param name="ar"></param> void ReceiveDataAsync(IAsyncResult ar) { IPEndPoint ipend = null; byte[] buffer = null; try { buffer = client.EndReceive(ar, ref ipend); } catch (Exception) { return; } finally { if (IsInitialized && client != null) client.BeginReceive(ReceiveDataAsync, null); } if (buffer == null || buffer.Length == 0) return; //触发已收到事件 OnPackageReceived(new PackageEventArgs() { RemoteIP = ipend, Data = buffer }); } /// <summary> /// 同步数据接收方法 /// </summary> private void ReceiveData() { while (true) { IPEndPoint retip = null; byte[] buffer = null; try { buffer = client.Receive(ref retip);//接收数据,当Client端连接主机的时候,retip就变成Cilent端的IP了 } catch (Exception) { //异常处理操作 return; } if (buffer == null || buffer.Length == 0) return; PackageEventArgs arg = new PackageEventArgs(buffer, retip); OnPackageReceived(arg);//数据包收到触发事件 } } /// <summary> /// 异步接受数据 /// </summary> private void AsyncReceiveData() { try { client.BeginReceive(new AsyncCallback(ReceiveCallback), null); } catch (SocketException ex) { throw ex; } } /// <summary> /// 接收数据的回调函数 /// </summary> /// <param name="param"></param> private void ReceiveCallback(IAsyncResult param) { if (param.IsCompleted) { IPEndPoint retip = null; byte[] buffer = null; try { buffer = client.EndReceive(param, ref retip);//接收数据,当Client端连接主机的时候,test就变成Cilent端的IP了 } catch (Exception ex) { //异常处理操作 } finally { AsyncReceiveData(); } if (buffer == null || buffer.Length == 0) return; OnPackageReceived(new PackageEventArgs() { RemoteIP = retip, Data = buffer }); } } #endregion #region 公共函数 /// <summary> /// 关闭客户端 /// </summary> public void Close() { if (IsInitialized) { IsInitialized = false; if (IsInitialized) client.Close(); IsConnect = false; client = null; } } /// <summary> /// 发送数据,不进行检查 /// </summary> /// <param name="address">远程主机地址</param> /// <param name="port">远程主机端口</param> /// <param name="data">数据流</param> /// <param name="packageNo">数据包编号</param> /// <param name="packageIndex">分包索引</param> private void Send(IPAddress address, int port, byte[] data, long packageNo, int packageIndex) { Send(false, new IPEndPoint(address, port), data, packageNo, packageIndex); } /// <summary> /// 发送数据,并判断是否对数据作回应检查。将会在每隔 <see cref="CheckQueueTimeInterval"/> 的间隔后重新发送,直到收到对方的回应。 /// 注意:网络层不会解析回应,请调用 <see cref="PopSendItemFromList"/> 方法来告知已收到数据包。 /// </summary> /// <param name="receiveConfirm">消息是否会回发确认包</param> /// <param name="address">远程主机地址</param> /// <param name="port">远程主机端口</param> /// <param name="data">数据流</param> /// <param name="packageNo">数据包编号</param> /// <param name="packageIndex">分包索引</param> private void Send(bool receiveConfirm, IPAddress address, int port, byte[] data, long packageNo, int packageIndex) { Send(receiveConfirm, new IPEndPoint(address, port), data, packageNo, packageIndex); } /// <summary> /// 发送数据,并对数据作回应检查。当 <see cref="receiveConfirm"/> 为 true 时,将会在每隔 <see cref="CheckQueueTimeInterval"></see> 的间隔后重新发送,直到收到对方的回应。 /// 注意:网络层不会解析回应,请调用 <see cref="PopSendItemFromList"></see> 方法来告知已收到数据包。 /// </summary> /// <param name="receiveConfirm">消息是否会回发确认包</param> /// <param name="address">远程主机地址</param> /// <param name="data">数据流</param> /// <param name="packageNo">数据包编号</param> /// <param name="packageIndex">分包索引</param> private void Send(bool receiveConfirm, IPEndPoint address, byte[] data, long packageNo, int packageIndex) { if (IsInitialized) { client.Send(data, data.Length, address); if (receiveConfirm) PushSendItemToList(new PacketNetWorkMsg() { Data = data, RemoteIP = address, SendTimes = 0, PackageIndex = packageIndex, PackageNo = packageNo }); } } /// <summary> /// 同步发送分包数据 /// </summary> /// <param name="message"></param> public void SendMsg(Msg message) { if (IsInitialized) { ICollection<PacketNetWorkMsg> udpPackets = MessagePacker.BuildNetworkMessage(message); foreach (PacketNetWorkMsg packedMessage in udpPackets) { //使用同步发送 SendPacket(packedMessage); } } } /// <summary> /// 将已经打包的消息发送出去 /// </summary> /// <param name="packet"></param> public void SendPacket(PacketNetWorkMsg packet) { if (IsInitialized) { //使用同步的方法发送数据 if (!IsConnect) client.Send(packet.Data, packet.Data.Length, packet.RemoteIP); else client.Send(packet.Data, packet.Data.Length); if (packet.IsRequireReceiveCheck) PushSendItemToList(packet); } } /// <summary> /// 异步分包发送数组的方法 /// </summary> /// <param name="message"></param> public void AsyncSendMsg(Msg message) { if (IsInitialized) { ICollection<PacketNetWorkMsg> udpPackets = MessagePacker.BuildNetworkMessage(message); foreach (PacketNetWorkMsg packedMessage in udpPackets) { //使用异步的方法发送数据 AsyncSendPacket(packedMessage); } } } /// <summary> /// 发送完成后的回调方法 /// </summary> /// <param name="param"></param> private void SendCallback(IAsyncResult param) { if (param.IsCompleted) { try { client.EndSend(param);//这句话必须得写,BeginSend()和EndSend()是成对出现的 } catch (Exception) { PackageEventArgs e = new PackageEventArgs(); OnPackageSendFailure(e);//触发发送失败事件 } } } /// <summary> /// 异步将将已经打包的消息发送出去,不进行发送检查 /// </summary> /// <param name="packet"></param> public void AsyncSendPacket(PacketNetWorkMsg packet) { //使用异步的方法发送数据 if (IsInitialized) { if (!IsConnect) this.client.BeginSend(packet.Data, packet.Data.Length, packet.RemoteIP, new AsyncCallback(SendCallback), null); else this.client.BeginSend(packet.Data, packet.Data.Length, new AsyncCallback(SendCallback), null); if (packet.IsRequireReceiveCheck) PushSendItemToList(packet);//将该消息压入列表 } } #endregion System.Threading.SendOrPostCallback cucqCallpack; System.Threading.SendOrPostCallback resendCallback; /// <summary> /// 自由线程,检测未发送的数据并发出 /// </summary> void CheckUnConfirmedQueue() { //异步调用委托 if (cucqCallpack == null) cucqCallpack = (s) => OnPackageSendFailure(s as PackageEventArgs); if (resendCallback == null) resendCallback = (s) => OnPackageResend(s as PackageEventArgs); do { if (SendList.Count > 0) { PacketNetWorkMsg[] array = null; lock (SendList) { array = SendList.ToArray(); } //挨个重新发送并计数 Array.ForEach(array, s => { s.SendTimes++; if (s.SendTimes >= MaxResendTimes) { //发送失败啊 PackageEventArgs e = new PackageEventArgs(); if (SeiClient.NeedPostMessage) { SeiClient.SendSynchronizeMessage(cucqCallpack, e); } else { OnPackageSendFailure(e);//触发发送失败事件 } SendList.Remove(s); } else { //重新发送 AsyncSendPacket(s); PackageEventArgs e = new PackageEventArgs() { PacketMsg = s }; if (SeiClient.NeedPostMessage) { SeiClient.SendASynchronizeMessage(resendCallback, e); } else { OnPackageResend(e);//触发重新发送事件 } } }); } Thread.Sleep(CheckQueueTimeInterval); } while (IsInitialized); } static object lockObj = new object(); /// <summary> /// 将数据信息压入列表 /// </summary> /// <param name="item"></param> public void PushSendItemToList(PacketNetWorkMsg item) { SendList.Add(item); } /// <summary> /// 将数据包从列表中移除 /// 网络层不会解析 /// </summary> /// <param name="packageNo">数据包编号</param> /// <param name="packageIndex">数据包分包索引</param> public void PopSendItemFromList(long packageNo, int packageIndex) { lock (lockObj) { Array.ForEach(SendList.Where(s => s.PackageNo == packageNo && s.PackageIndex == packageIndex).ToArray(), s => SendList.Remove(s)); } } #region 事件 /// <summary> /// 网络出现异常,无法获取本地ip地址 /// </summary> public event EventHandler IPError; protected void OnLocalIpError(EventArgs e) { if (IPError != null) IPError(this, e); } /// <summary> /// 网络出现异常(如端口无法绑定等,此时无法继续工作) /// </summary> public event EventHandler NetworkError; protected void OnNetworkError(EventArgs e) { if (NetworkError != null) NetworkError(this, e); } /// <summary> /// 当数据包收到时触发 /// </summary> public event EventHandler<PackageEventArgs> PackageReceived; /// <summary> /// 当数据包收到事件触发时,被调用 /// </summary> /// <param name="e">包含事件的参数</param> protected virtual void OnPackageReceived(PackageEventArgs e) { if (PackageReceived != null) PackageReceived(this, e); } /// <summary> /// 数据包发送失败 /// </summary> public event EventHandler<PackageEventArgs> PackageSendFailure; /// <summary> /// 当数据发送失败时调用 /// </summary> /// <param name="e">包含事件的参数</param> protected virtual void OnPackageSendFailure(PackageEventArgs e) { if (PackageSendFailure != null) PackageSendFailure(this, e); } /// <summary> /// 数据包未接收到确认,重新发送 /// </summary> public event EventHandler<PackageEventArgs> PackageResend; /// <summary> /// 触发重新发送事件 /// </summary> /// <param name="e">包含事件的参数</param> protected virtual void OnPackageResend(PackageEventArgs e) { if (PackageResend != null) PackageResend(this, e); } #endregion #region IDisposable 成员 /// <summary> /// 关闭客户端并释放资源 /// </summary> public void Dispose() { Close(); } #endregion } }
und dann einer UDP-Oberschichtklasse zum Zusammenstellen und Zusammenführen der empfangenen Pakete zu einer Nachricht
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net; using Netframe.Event; using Netframe.Model; using Netframe.Tool; using System.Threading; namespace Netframe.Core { /// <summary> /// 对底层收到数据的解析 /// </summary> public class MsgTranslator { #region 属性 /// <summary> /// 用来发送和接收消息的对象 /// </summary> public UDPThread Client { get; set; } Config _config; //用来检测重复收到的消息包 Queue<long> ReceivedQueue; #endregion public MsgTranslator(UDPThread udpClient,Config config) { this.Client = udpClient; this._config = config; ReceivedQueue = new Queue<long>(); Client.PackageReceived += PackageReceived; } /// <summary> /// 发送信息实体 /// </summary> /// <param name="msg"></param> public void Send(Msg msg) { //消息正在发送事件 OnMessageSending(new MessageEventArgs(msg)); Client.AsyncSendMsg(msg); //消息已发送事件 OnMessageSended(new MessageEventArgs(msg)); } static object lockObj = new object(); /// <summary> /// 消息包接收到时的事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void PackageReceived(object sender, PackageEventArgs e) { if (!e.IsHandled) { e.IsHandled = true; Msg m = ResolveToMessage(e.Data, e.RemoteIP); if (m == null) return; if (m.Command == Commands.RecvConfirm) { long pno = m.NormalMsg.TryParseToInt(0); int pindex = m.ExtendMessage.TryParseToInt(0); if (pno != 0) this.Client.PopSendItemFromList(pno, pindex); return; } //检查最近收到的消息队列里面是否已经包含了这个消息包,如果是,则丢弃 if (!ReceivedQueue.Contains(m.PackageNo)) { ReceivedQueue.Enqueue(m.PackageNo); if (ReceivedQueue.Count > 100) ReceivedQueue.Dequeue(); OnMessageReceived(new MessageEventArgs(m)); } else OnMessageDroped(new MessageEventArgs(m)); } } public Msg ResolveToMessage(byte[] buffer, IPEndPoint remoteEndPoint) { if (buffer == null || buffer.Length < 0) return null; Msg m = null; if (MessagePacker.Test(buffer)) { PacketNetWorkMsg pack = MessagePacker.Parse(buffer, remoteEndPoint); if (pack == null) return null; if (DetermineConfirm(pack)) { //发送确认标志 Msg cm = Helper.CreateRecivedCheck(remoteEndPoint, pack.PackageNo, pack.PackageIndex, _config); Client.SendMsg(cm); } m = MessagePacker.TryToTranslateMessage(pack); } return m; } /// <summary> /// 检测是否需要发送回复包来确认收到 /// </summary> /// <param name="message"></param> /// <returns></returns> static bool DetermineConfirm(PacketNetWorkMsg packet) { return packet.IsRequireReceiveCheck; } static bool DetermineConfirm(Msg message) { return message.IsRequireReceive; } #region 事件 /// <summary> /// 接收到消息包(UDP) /// </summary> public event EventHandler<MessageEventArgs> MessageReceived; SendOrPostCallback messageReceivedCallBack; /// <summary> /// 引发接收到消息包事件 /// </summary> /// <param name="e"></param> protected virtual void OnMessageReceived(MessageEventArgs e) { if (MessageReceived == null) return; if (!SeiClient.NeedPostMessage) { MessageReceived(this, e); } else { if (messageReceivedCallBack == null) messageReceivedCallBack = s => MessageReceived(this, s as MessageEventArgs); SeiClient.SendSynchronizeMessage(messageReceivedCallBack, e); } } /// <summary> /// 消息将要发送事件 /// </summary> public event EventHandler<MessageEventArgs> MessageSending; SendOrPostCallback messageSendingCallBack; /// <summary> /// 引发消息将要发送事件 /// </summary> /// <param name="e"></param> protected virtual void OnMessageSending(MessageEventArgs e) { if (MessageSending == null) return; if (!SeiClient.NeedPostMessage) { MessageSending(this, e); } else { if (messageSendingCallBack == null) messageSendingCallBack = s => MessageSending(this, s as MessageEventArgs); SeiClient.SendSynchronizeMessage(messageSendingCallBack, e); } } /// <summary> /// 消息已经发送事件 /// </summary> public event EventHandler<MessageEventArgs> MessageSended; SendOrPostCallback messageSendedCall; /// <summary> /// 引发消息已经发送事件 /// </summary> /// <param name="e"></param> protected virtual void OnMessageSended(MessageEventArgs e) { if (MessageSended == null) return; if (!SeiClient.NeedPostMessage) { MessageSended(this, e); } else { if (messageSendedCall == null) messageSendedCall = s => MessageSended(this, s as MessageEventArgs); SeiClient.SendSynchronizeMessage(messageSendedCall, e); } } /// <summary> /// 重复收包然后丢包事件 /// </summary> public event EventHandler<MessageEventArgs> MessageDroped; /// <summary> /// 引发丢弃Msg事件 /// </summary> /// <param name="e"></param> protected virtual void OnMessageDroped(MessageEventArgs e) { if (MessageDroped == null) return; MessageDroped(this, e); } #endregion } }
Das Obige ist das auf dem C#-Ereignismodell basierende UDP-Kommunikationsframework (geeignet für Netzwerkpaketcodierung und Dekodierung) Inhalt, bitte beachten Sie die chinesische PHP-Website (www.php.cn) für weitere verwandte Inhalte!