搜索
首页后端开发C#.Net教程c#基于事件模型的UDP通讯框架(适用于网络包编解码)

之前写过一篇关于c#udp分包发送的文章

这篇文章里面介绍的方法是一种实现,但是存在一个缺点就是一个对象序列化后会增大很多,不利于在网络中的传输。

我们在网络中的传输是需要尽可能的减小传送的数据包的大小,于是我参考了网上一些资料和一些开源的项目(http://www.php.cn/)这个上面的那个开源的飞鸽传输的框架,

其实也就是把要传送的数据按照某种规定放在一个byte数组中,然后接收到后按照相应的格式把数据解析出来,为了减小数据还使用了GZipStream的压缩,之前出的问题就是在解压缩时,不过现在已经解决了。

首先我们要定义一种能表示我们传送数据的包的格式

public class PacketNetWorkMsg : IComparable<PacketNetWorkMsg>
    {
        /// <summary>
		/// 封包版本
		/// </summary>
		public int Version { get; set; }

		/// <summary>
		/// 要发送的数据包
		/// </summary>
		public byte[] Data { get; set; }

        /// <summary>
        /// 数据包所含数据长度
        /// </summary>
        public int DataLength { get; set; }

        /// <summary>
        /// 分包后最后一个剩余长度
        /// </summary>
        public int Remainder { get; set; }
		/// <summary>
		/// 远程地址
		/// </summary>
		public IPEndPoint RemoteIP { get; set; }

		/// <summary>
		/// 发送次数
		/// </summary>
		public int SendTimes { get; set; }

		/// <summary>
		/// 包编号
		/// </summary>
		public long PackageNo { get; set; }

		/// <summary>
		/// 分包索引
		/// </summary>
		public int PackageIndex { get; set; }

		/// <summary>
		/// 分包总数
		/// </summary>
		public int PackageCount { get; set; }

		/// <summary>
		/// 获得或设置是否需要返回已收到标志
		/// </summary>
        public bool IsRequireReceiveCheck { get; set; }

        public PacketNetWorkMsg()
		{
			Version = 1;
            CreationTime = DateTime.Now;
		}
        public PacketNetWorkMsg(long packageNo, int Count, int index, byte[] data, int dataLength, int remainder, IPEndPoint desip, bool IsRequireReceive)
        {
            this.PackageNo = packageNo;
            this.PackageCount = Count;
            this.PackageIndex = index;
            this.Data = data;
            this.DataLength = dataLength;
            this.Remainder = remainder;
            this.IsRequireReceiveCheck = IsRequireReceive;//默认都需要确认包
            this.RemoteIP = desip;
        }
		#region IComparable<PackedNetworkMessage> 成员

        public int CompareTo(PacketNetWorkMsg other)
		{
			return PackageIndex < other.PackageIndex ? -1 : 1;
		}

		#endregion

        /// <summary>
        /// 获得生成数据包的时间
        /// </summary>
        public DateTime CreationTime { get; private set; }
    }

这个类是我们在网络中需要传输的具体最小的包

然后还有一个就是我们用来表示我们传输的数据的格式类Msg类,一个Msg类可能被分为多个PacketNetWorkMsg 来传送,分包的方法是为了突破udp方式传输数据的限制(64k),,能够使用udp传输大数据,很多人就在问,为什么不用TCP,虽然TCP是非常好的方式,但是我也不想说出个所以然来,我就喜欢这么干。

public class Msg
    {
        
        /// <summary>
        /// 是否已经被处理.在挂钩过程中,如果为true,则底层代码不会再对信息进行处理
        /// </summary>
        public bool Handled { get; set; }

        /// <summary>
        /// 获得或设置当前的消息编号
        /// </summary>
        /// <value></value>
        /// <remarks></remarks>
        public long PackageNo { get; set; }

        /// <summary>
        /// 获得或设置当前的消息所属的主机名
        /// </summary>
        public string HostName { get; set; }

        /// <summary>
        /// 获得或设置当前的消息所属的用户名
        /// </summary>
        public string UserName { get; set; }

        /// <summary>
        /// 获得或设置当前的命令代码
        /// </summary>
        //命令的名称
        public Commands Command { get; set; }

        /// <summary>
        /// 获得或设置当前的消息的类型 文本消息,或者二进制消息
        /// </summary>
        public Consts Type { get; set; }

        /// <summary>
        /// 获得或设置当前的命令消息文本
        /// </summary>
        public string NormalMsg { get; set; }

        /// <summary>
        /// 消息文本字节
        /// </summary>
        public byte[] NormalMsgBytes { get; set; }

        /// <summary>
        /// 扩展消息文本字节
        /// </summary>
        public byte[] ExtendMessageBytes { get; set; }

        /// <summary>
        /// 获得或设置当前命令的扩展文本
        /// </summary>
        public string ExtendMessage { get; set; }

        /// <summary>
        /// 远程地址
        /// </summary>
        public IPEndPoint RemoteAddr { get; set; }

        /// <summary>
        /// 主机地址
        /// </summary>
        public IPEndPoint HostAddr { get; set; }
        /// <summary>
        /// 获得或设置是否需要返回已收到标志
        /// </summary>
        public bool IsRequireReceive { get; set; }

      
        public Msg(IPEndPoint Addr)
		{
			RemoteAddr = Addr;
			Handled = false;
            Type = Consts.MESSAGE_TEXT;
		}
        public Msg(IPEndPoint hostIP,IPEndPoint remoteIP,Commands cmd)
        {
            HostAddr = hostIP;
            RemoteAddr = remoteIP;
            Command = cmd;
            Handled = false;
            Type = Consts.MESSAGE_TEXT;
        }
		public Msg(IPEndPoint addr, string hostName, string userName,Commands command, string message, string extendMessage)
		{
			RemoteAddr = addr;
			Handled = false;
			HostName = hostName;
			UserName = userName;
			Command = command;
			NormalMsg = message;
			ExtendMessage = extendMessage;
            Type = Consts.MESSAGE_TEXT;
		}


		/// <summary>
		/// 直接创建一个新的Message对象
		/// </summary>
		/// <param name="host">主机对象</param>
		/// <param name="addr">远程地址</param>
		/// <param name="hostName">主机名</param>
		/// <param name="userName">用户名</param>
		/// <param name="command">命令</param>
		/// <param name="options">选项</param>
		/// <param name="message">信息</param>
		/// <param name="extendMessage">扩展信息</param>
		/// <returns></returns>
		public static Msg Create(Host host, IPEndPoint addr, string hostName, string userName, Commands command, string message, string extendMessage)
		{
			return new Msg(addr,hostName, userName, command,message, extendMessage);
		}
    }

目前这两个类就是我们主要的数据结构了。

然后接下来是我们主要的类,分包和组包的一个类了

/// <summary>
    /// 消息封包类
    /// </summary>
    public class MessagePacker
    {
        Timer _timer;
        public MessagePacker()
		{
			_timer = new Timer(_ => CheckForOutdateMessage(), null, new TimeSpan(0, 5, 0), new TimeSpan(0, 0, 5, 0));
		}
        /*
		 * 消息包注意:
		 * 1.第一位始终是2(ASCII码50)
		 * 2.第二位到第九位是一个long类型的整数,代表消息编号
		 * 3.第十位到第十三位是一个int类型的整数,代表消息内容总长度
		 * 4.第十四位到第十七位是一个int类型的整数,代表分包的总数
		 * 5.第十八位到第二十一位是一个int类型的整数,代表当前的分包编号
		 * 6.第二十二位表示是否需要返回一个确认标识(1/0)
		 * 7.第二十三到第三十一位是保留的(Reserved)
		 * 8.第三十二字节以后是数据包
		 * */

        /// <summary>
        /// 消息版本号
        /// </summary>
        public static byte VersionHeader { get { return 50; } }
        /// <summary>
        /// 返回当前消息封包的头字节数
        /// </summary>
        public static int PackageHeaderLength { get { return 32; } }

        /// <summary>
        /// 获得消息包的字节流
        /// </summary>
        /// <param name="message">要打包的消息对象</param>
        /// <returns></returns>
        public static PacketNetWorkMsg[] BuildNetworkMessage(Msg message)
        {
            if (message.ExtendMessageBytes != null)
            {
                return BuildNetworkMessage(
                message.RemoteAddr,
                message.PackageNo,
                message.Command,
                message.UserName,
                message.HostName,
                message.Type,
                message.NormalMsgBytes,
                message.ExtendMessageBytes,
                message.IsRequireReceive
                );
            }
            else
            {
                return BuildNetworkMessage(
                message.RemoteAddr,
                message.PackageNo,
                message.Command,
                message.UserName,
                message.HostName,
                message.Type,
                System.Text.Encoding.Unicode.GetBytes(message.NormalMsg),
                System.Text.Encoding.Unicode.GetBytes(message.ExtendMessage),
                message.IsRequireReceive
                );
            }
        }

        /// <summary>
        /// 获得消息包的字节流
        /// </summary>
        /// <param name="remoteIp">远程主机地址</param>
        /// <param name="packageNo">包编号</param>
        /// <param name="command">命令</param>
        /// <param name="options">参数</param>
        /// <param name="userName">用户名</param>
        /// <param name="hostName">主机名</param>
        /// <param name="content">正文消息</param>
        /// <param name="extendContents">扩展消息</param>
        /// <returns></returns>
        public static PacketNetWorkMsg[] BuildNetworkMessage(IPEndPoint remoteIp, long packageNo, Commands command, string userName, string hostName,Consts type ,byte[] content, byte[] extendContents, bool RequireReceiveCheck)
        {

            //每次发送所能容下的数据量
            int maxBytesPerPackage = (int)Consts.MAX_UDP_PACKAGE_LENGTH - PackageHeaderLength;
            //压缩数据流
            var ms = new MemoryStream();
            //var dest = new MemoryStream();
            //var zip = new GZipStream(dest, CompressionMode.Compress);
            var bw = new BinaryWriter(ms, System.Text.Encoding.Unicode);
            //写入头部数据
            bw.Write(packageNo);			//包编号
            bw.Write(userName);				//用户名
            bw.Write(hostName);				//主机名
            bw.Write((long)command);        //命令
            bw.Write((long)type);           //数据类型
            bw.Write(content == null ? 0 : content.Length);//数据长度

            //写入消息数据
            if (content != null) 
                bw.Write(content);
            bw.Write(extendContents == null ? 0 : extendContents.Length);//补充数据长度
            if (extendContents != null) 
                bw.Write(extendContents);
           
            
            ms.Flush();
            ms.Seek(0, System.IO.SeekOrigin.Begin);
            byte[] ibuf = ms.ToArray();

            var dest = new System.IO.MemoryStream();
            GZipStream zipStream = new GZipStream(dest, CompressionMode.Compress, true);
            byte[] buff = new byte[1024];
            int offset;
            ms.Seek(0, SeekOrigin.Begin);
            while ((offset = ms.Read(buff, 0, buff.Length)) > 0)
            {
                zipStream.Write(buff, 0, offset);//先把数据用二进制写入内存,然后在把它用zip压缩,获取压缩过后的二进制流dest
            }
            zipStream.Close();
            bw.Close();
            ms.Close();
            dest.Seek(0, SeekOrigin.Begin);
            //打包数据总量
            int dataLength = (int)dest.Length;
            
            int packageCount = (int)Math.Ceiling(dataLength * 1.0 / maxBytesPerPackage);
            PacketNetWorkMsg[] pnma = new PacketNetWorkMsg[packageCount];
            for (int i = 0; i < packageCount; i++)
            {
                int count = i == packageCount - 1 ? dataLength - maxBytesPerPackage * (packageCount - 1) : maxBytesPerPackage;

                byte[] buf = new byte[count + PackageHeaderLength];
                buf[0] = VersionHeader;//版本号 第1位 
                BitConverter.GetBytes(packageNo).CopyTo(buf, 1);//消息编号 第2到9位 long类型的整数
                BitConverter.GetBytes(dataLength).CopyTo(buf, 9);//消息内容长度 第10到13位 int类型的整数
                BitConverter.GetBytes(packageCount).CopyTo(buf, 13);//分包总数 第14位到第17位 int类型的整数
                BitConverter.GetBytes(i).CopyTo(buf, 17);//分包编号 第18位到第21位 int类型的整数
                buf[21] = RequireReceiveCheck ? (byte)1 : (byte)0;//是否回确认包 第22位 
                //第23到第31位是保留的(Reserved)
                dest.Read(buf, 32, buf.Length - 32);//第32字节以后是,具体的数据包

                pnma[i] = new PacketNetWorkMsg()
                {
                    Data = buf,
                    PackageCount = packageCount,
                    PackageIndex = i,
                    PackageNo = packageNo,
                    RemoteIP = remoteIp,
                    SendTimes = 0,
                    Version = 2,
                    IsRequireReceiveCheck = buf[21] == 1
                };
            }

            return pnma;
        }


        /// <summary>
        /// 检测确认是否是这个类型的消息包
        /// </summary>
        /// <param name="buffer"></param>
        /// <returns></returns>
        public static bool Test(byte[] buffer)
        {
            return buffer != null && buffer.Length > PackageHeaderLength && buffer[0] == VersionHeader;
        }

        /// <summary>
        /// 缓存接收到的片段
        /// </summary>
        static Dictionary<long, PacketNetWorkMsg[]> packageCache = new Dictionary<long, PacketNetWorkMsg[]>();

        /// <summary>
        /// 分析网络数据包并进行转换为信息对象
        /// </summary>
        /// <param name="packs">接收到的封包对象</param>
        /// <returns></returns>
        /// <remarks>
        /// 对于分包消息,如果收到的只是片段并且尚未接收完全,则不会进行解析
        /// </remarks>
        public static Msg ParseToMessage(params PacketNetWorkMsg[] packs)
        {
            if (packs.Length == 0 || (packs[0].PackageCount > 1 && packs.Length != packs[0].PackageCount))
                return null;


            var ms = DecompressMessagePacks(packs);
            if (ms == null)
            {
                //事件
                return null;
            }
           //构造读取流
            System.IO.BinaryReader br = new System.IO.BinaryReader(ms, System.Text.Encoding.Unicode);
            //开始读出数据
            Msg m = new Msg(packs[0].RemoteIP);
            m.PackageNo = br.ReadInt64();//包编号

            m.UserName = br.ReadString();//用户名
            m.HostName = br.ReadString();//主机名
            m.Command = (Commands)br.ReadInt64(); //命令
            m.Type = (Consts)br.ReadInt64();//数据类型
            int length = br.ReadInt32(); //数据长度
            m.NormalMsgBytes = new byte[length];
            br.Read(m.NormalMsgBytes, 0, length);//读取内容

            length = br.ReadInt32();    //附加数据长度
            m.ExtendMessageBytes = new byte[length];
            br.Read(m.ExtendMessageBytes, 0, length);//读取附加数据

            if (m.Type == Consts.MESSAGE_TEXT)
            {
                m.NormalMsg = System.Text.Encoding.Unicode.GetString(m.NormalMsgBytes, 0, length);	//正文
                m.ExtendMessage = System.Text.Encoding.Unicode.GetString(m.ExtendMessageBytes, 0, length);	//扩展消息
                m.ExtendMessageBytes = null;
                m.NormalMsgBytes = null;

            }
            return m;
        }
        /// <summary>
        /// 组合所有的网络数据包并执行解压缩
        /// </summary>
        /// <param name="packs"></param>
        /// <returns></returns>
        static MemoryStream DecompressMessagePacks(params PacketNetWorkMsg[] packs)
        {
            try
            {
                //尝试解压缩,先排序
                Array.Sort(packs);
                var msout = new MemoryStream();
                using (var ms = new System.IO.MemoryStream())
                {
                    //合并写入
                    //Array.ForEach(packs, s => ms.Write(s.Data, 32, s.Data.Length-32));
                    Array.ForEach(packs, s => ms.Write(s.Data, 0, s.Data.Length));
                    ms.Seek(0, SeekOrigin.Begin);

                    //解压缩
                    using (var gz = new GZipStream(ms, CompressionMode.Decompress))
                    {
                        var buffer = new byte[0x400];
                        var count = 0;
                        while ((count = gz.Read(buffer, 0, buffer.Length)) > 0)
                        {
                            msout.Write(buffer, 0, count);
                        }
                    }
                }
                msout.Seek(0, SeekOrigin.Begin);

                return msout;
            }
            catch (Exception)
            {

                return null;
            }
        }
        /// <summary>
        /// 尝试将收到的网络包解析为实体
        /// </summary>
        /// <param name="pack">收到的网络包</param>
        /// <returns></returns>
        /// <remarks>如果收到的包是分片包,且其所有子包尚未接受完全,则会返回空值</remarks>
        public static Msg TryToTranslateMessage(PacketNetWorkMsg pack)
        {
            if (pack == null || pack.PackageIndex > pack.PackageCount - 1) return null;
            else if (pack.PackageCount == 1) return ParseToMessage(pack);
            else
            {
                lock (packageCache)
                {
                    if (packageCache.ContainsKey(pack.PackageNo))
                    {
                        PacketNetWorkMsg[] array = packageCache[pack.PackageNo];
                        array[pack.PackageIndex] = pack;

                        //检测是否完整
                        if (Array.FindIndex(array, s => s == null) == -1)
                        {
                            packageCache.Remove(pack.PackageNo);
                            return ParseToMessage(array);
                        }
                        else
                        {
                            return null;
                        }
                    }
                    else
                    {
                        PacketNetWorkMsg[] array = new PacketNetWorkMsg[pack.PackageCount];
                        array[pack.PackageIndex] = pack;
                        packageCache.Add(pack.PackageNo, array);
                        return null;
                    }
                }
            }

        }

        /// <summary>
        /// 将网络信息解析为封包
        /// </summary>
        /// <param name="buffer"></param>
        /// <returns></returns>
        public static PacketNetWorkMsg Parse(byte[] buffer, IPEndPoint clientAddress)
        {
            if (!Test(buffer)) return null;

            PacketNetWorkMsg p = new PacketNetWorkMsg()
            {
                RemoteIP = clientAddress,
                SendTimes = 0
            };
            p.PackageNo = BitConverter.ToInt64(buffer, 1);//包编号
            p.DataLength = (int)BitConverter.ToInt64(buffer, 9); //内容长度
            p.PackageCount = BitConverter.ToInt32(buffer, 13);//分包总数
            p.PackageIndex = BitConverter.ToInt32(buffer, 17);//索引
            p.IsRequireReceiveCheck = buffer[21] == 1;//是否需要回包
            p.Data = new byte[buffer.Length - PackageHeaderLength];
            Array.Copy(buffer, PackageHeaderLength, p.Data, 0, p.Data.Length);

            return p;
        }
        void CheckForOutdateMessage()
        {
            
            lock (packageCache)
            {
                //TODO 这里设置最短的过期时间为5分钟,也就是说五分钟之前的消息会被干掉
                var minTime = DateTime.Now.AddMinutes(5.0);
                var targetList = new List<long>();
                foreach (var pkgid in packageCache.Keys)
                {
                    if (Array.TrueForAll(packageCache[pkgid], s => s == null || s.CreationTime < minTime))
                    {
                        targetList.Add(pkgid);
                    }
                }

                foreach (var pkgid in targetList)
                {
                    packageCache.Remove(pkgid);
                }
            }

        }
        #region 事件
        /// <summary>
        /// 网络层数据包解压缩失败
        /// </summary>
        public static event EventHandler<PackageEventArgs> DecompressFailed;

        /// <summary>
        /// 触发解压缩失败事件
        /// </summary>
        /// <param name="e">事件包含的参数</param>
        protected static void OnDecompressFailed(PackageEventArgs e)
        {
            if (DecompressFailed != null) DecompressFailed(typeof(MessagePacker),e);
        }
        #endregion
    }


通过BuildNetworkMessage方法可以把一个Msg对象分为1个或多个packet,然后的话就可以通过以前所用的方法把分好的包挨个发送了。收到数据包后用TryToTranslateMessage方法把数据包组装成为一个Msg

接下来是一个 UDP底层通信的类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using Netframe.Model;
using System.Net;
using System.Threading;
using Netframe.Tool;
using Netframe.Event;

namespace Netframe.Core
{
    /// <summary>
    /// 基本通信类  UDP,能够进行基本数据发送,UdpPacketMsg的发送,数据收到时触发事件
    /// </summary>
    public class UDPThread
    {
        #region 私有变量

        /// <summary>
        /// 配置信息
        /// </summary>
        Config _config;

        /// <summary>
        /// UDP客户端
        /// </summary>
        UdpClient client;

        /// <summary>
        /// 用于轮询是否发送成功的记录
        /// </summary>
        List<PacketNetWorkMsg> SendList;

        #endregion

        #region 属性

        /// <summary>
        /// 是否已经初始化了
        /// </summary>
        public bool IsInitialized { get; private set; }

        /// <summary>
        /// 是否建立连接
        /// </summary>
        public bool IsConnect { get; private set; }
        /// <summary>
        /// 检查发送队列间隔
        /// </summary>
        public int CheckQueueTimeInterval { get; set; }

        /// <summary>
        /// 没有收到确认包时,最大重新发送的数目,超过此数目会丢弃并触发PackageSendFailture 事件。
        /// </summary>
        public int MaxResendTimes { get; set; }

        #endregion

        #region 构造函数

        /// <summary>
        /// 构造一个新的消息对象,并绑定到指定的端口和IP上。
        /// </summary>
        /// <param name="ip">绑定的IP</param>
        /// <param name="port">绑定的端口</param>
        public UDPThread(int port)
        {
            IsInitialized = false;
            IPAddress LocalIPAddress = null;
            //获得本机当前的ip
            try
            {
                IPAddress[] address = Dns.GetHostAddresses(Dns.GetHostName());
                foreach (IPAddress addr in address)
                {
                    if (addr.AddressFamily.ToString().Equals("InterNetwork"))
                    {
                        LocalIPAddress = addr;
                        break;
                    }
                }
            }
            catch (Exception)
            {
                OnLocalIpError(new EventArgs());
                //获取本机ip异常
                return;
            }
            try
            {
                client = new UdpClient(new IPEndPoint(LocalIPAddress, port));
                IsConnect = false;
            }
            catch (Exception)
            {
                OnNetworkError(new EventArgs());
                return;
            }
            SendList = new List<PacketNetWorkMsg>();
            client.EnableBroadcast = true;

            CheckQueueTimeInterval = 2000;
            MaxResendTimes = 5;
            new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();


            IsInitialized = true;

            //开始监听
            client.BeginReceive(ReceiveDataAsync, null);
            //ReceiveData();
        }

        public UDPThread(Config config)
        {
            IsInitialized = false;
            try
			{
                client = new UdpClient(new IPEndPoint(config.BindedIP, config.Port));
			}
            catch (Exception)
            {
                OnNetworkError(new EventArgs());
                return;
            }
            SendList = new List<PacketNetWorkMsg>();
            client.EnableBroadcast = true;
            this._config = config;
            CheckQueueTimeInterval = 2000;
            MaxResendTimes = 5;
            new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();


            IsInitialized = true;

            //开始监听
            client.BeginReceive(ReceiveDataAsync, null);
        }
        /// <summary>
        /// 构造函数与远程主机连接
        /// </summary>
        /// <param name="ipaddress">绑定ip</param>
        /// <param name="port">端口</param>
        public UDPThread(string ip, int port)
        {
            IsInitialized = false;
            IPAddress ipaddress = IPAddress.Parse(ip);//构造远程连接的参数
            try
            {
                client = new UdpClient();
                client.Connect(new IPEndPoint(ipaddress, port));//与远程服务器建立连接ps:只是形式上,udp本身无连接的
                IsConnect = true;
            }
            catch (Exception)
            {
                OnNetworkError(new EventArgs());
                return;
            }
            SendList = new List<PacketNetWorkMsg>();
            client.EnableBroadcast = true;

            CheckQueueTimeInterval = 2000;
            MaxResendTimes = 5;
            new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();


            IsInitialized = true;

            //开始监听
            client.BeginReceive(ReceiveDataAsync, null);
            //ReceiveData();
        }
        #endregion

        #region 私有方法
        /// <summary>
        /// 接收数据的方法
        /// </summary>
        /// <param name="ar"></param>
        void ReceiveDataAsync(IAsyncResult ar)
        {
            IPEndPoint ipend = null;
            byte[] buffer = null;
            try
            {
                buffer = client.EndReceive(ar, ref ipend);
            }
            catch (Exception)
            {
                return;
            }
            finally
            {
                if (IsInitialized && client != null) 
                    client.BeginReceive(ReceiveDataAsync, null);
            }

            if (buffer == null || buffer.Length == 0) return;
            //触发已收到事件
            OnPackageReceived(new PackageEventArgs() { RemoteIP = ipend, Data = buffer });

        }
        /// <summary>
        /// 同步数据接收方法
        /// </summary>
        private void ReceiveData()
        {
            while (true)
            {
                IPEndPoint retip = null;
                byte[] buffer = null;
                try
                {
                    buffer = client.Receive(ref retip);//接收数据,当Client端连接主机的时候,retip就变成Cilent端的IP了
                }
                catch (Exception)
                {
                    //异常处理操作
                    return;
                }
                if (buffer == null || buffer.Length == 0) return;
                PackageEventArgs arg = new PackageEventArgs(buffer, retip);
                OnPackageReceived(arg);//数据包收到触发事件
            }
        }

        /// <summary>
        /// 异步接受数据
        /// </summary>
        private void AsyncReceiveData()
        {
            try
            {
                client.BeginReceive(new AsyncCallback(ReceiveCallback), null);
            }
            catch (SocketException ex)
            {
                throw ex;
            }
        }
        /// <summary>
        /// 接收数据的回调函数
        /// </summary>
        /// <param name="param"></param>
        private void ReceiveCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                IPEndPoint retip = null;
                byte[] buffer = null;
                try
                {
                    buffer = client.EndReceive(param, ref retip);//接收数据,当Client端连接主机的时候,test就变成Cilent端的IP了
                }
                catch (Exception ex)
                {
                    //异常处理操作
                }
                finally
                {
                    AsyncReceiveData();
                }
                if (buffer == null || buffer.Length == 0) return;
                OnPackageReceived(new PackageEventArgs() { RemoteIP = retip, Data = buffer });
            }
        }

        #endregion

        #region 公共函数

        /// <summary>
        /// 关闭客户端
        /// </summary>
        public void Close()
        {
            if (IsInitialized)
            {
                IsInitialized = false;
                if (IsInitialized) 
                    client.Close();
                IsConnect = false;
                client = null;
            }
        }

        
        /// <summary>
        /// 发送数据,不进行检查
        /// </summary>
        /// <param name="address">远程主机地址</param>
        /// <param name="port">远程主机端口</param>
        /// <param name="data">数据流</param>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">分包索引</param>
        private void Send(IPAddress address, int port, byte[] data, long packageNo, int packageIndex)
        {
            Send(false, new IPEndPoint(address, port), data, packageNo, packageIndex);
        }
        /// <summary>
        /// 发送数据,并判断是否对数据作回应检查。将会在每隔 <see cref="CheckQueueTimeInterval"/> 的间隔后重新发送,直到收到对方的回应。
        /// 注意:网络层不会解析回应,请调用 <see cref="PopSendItemFromList"/> 方法来告知已收到数据包。
        /// </summary>
        /// <param name="receiveConfirm">消息是否会回发确认包</param>
        /// <param name="address">远程主机地址</param>
        /// <param name="port">远程主机端口</param>
        /// <param name="data">数据流</param>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">分包索引</param>
        private void Send(bool receiveConfirm, IPAddress address, int port, byte[] data, long packageNo, int packageIndex)
        {
            Send(receiveConfirm, new IPEndPoint(address, port), data, packageNo, packageIndex);
        }

        /// <summary>
        /// 发送数据,并对数据作回应检查。当 <see cref="receiveConfirm"/> 为 true 时,将会在每隔 <see cref="CheckQueueTimeInterval"></see> 的间隔后重新发送,直到收到对方的回应。
        /// 注意:网络层不会解析回应,请调用 <see cref="PopSendItemFromList"></see> 方法来告知已收到数据包。
        /// </summary>
        /// <param name="receiveConfirm">消息是否会回发确认包</param>
        /// <param name="address">远程主机地址</param>
        /// <param name="data">数据流</param>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">分包索引</param>
        private void Send(bool receiveConfirm, IPEndPoint address, byte[] data, long packageNo, int packageIndex)
        {
            if (IsInitialized)
            {
                client.Send(data, data.Length, address);
                if (receiveConfirm)
                    PushSendItemToList(new PacketNetWorkMsg() { Data = data, RemoteIP = address, SendTimes = 0, PackageIndex = packageIndex, PackageNo = packageNo });
            }
        }
         

        /// <summary>
        /// 同步发送分包数据
        /// </summary>
        /// <param name="message"></param>
        public void SendMsg(Msg message)
        {
            if (IsInitialized)
            {
                ICollection<PacketNetWorkMsg> udpPackets = MessagePacker.BuildNetworkMessage(message);
                foreach (PacketNetWorkMsg packedMessage in udpPackets)
                {
                    //使用同步发送
                    SendPacket(packedMessage);
                }
            }
        }
        /// <summary>
        /// 将已经打包的消息发送出去
        /// </summary>
        /// <param name="packet"></param>
        public void SendPacket(PacketNetWorkMsg packet)
        {
            if (IsInitialized)
            {
                //使用同步的方法发送数据
                if (!IsConnect)
                    client.Send(packet.Data, packet.Data.Length, packet.RemoteIP);
                else
                    client.Send(packet.Data, packet.Data.Length);
                if (packet.IsRequireReceiveCheck)
                    PushSendItemToList(packet);
            }
        }
        /// <summary>
        /// 异步分包发送数组的方法
        /// </summary>
        /// <param name="message"></param>
        public void AsyncSendMsg(Msg message)
        {
            if (IsInitialized)
            {
                ICollection<PacketNetWorkMsg> udpPackets = MessagePacker.BuildNetworkMessage(message);
                foreach (PacketNetWorkMsg packedMessage in udpPackets)
                {
                    //使用异步的方法发送数据
                    AsyncSendPacket(packedMessage);
                }
            }
        
        }
        /// <summary>
        /// 发送完成后的回调方法
        /// </summary>
        /// <param name="param"></param>
        private void SendCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                try
                {
                    client.EndSend(param);//这句话必须得写,BeginSend()和EndSend()是成对出现的 
                }
                catch (Exception)
                {
                    PackageEventArgs e = new PackageEventArgs();
                    OnPackageSendFailure(e);//触发发送失败事件
                }
            }

        }
        /// <summary>
        /// 异步将将已经打包的消息发送出去,不进行发送检查
        /// </summary>
        /// <param name="packet"></param>
        public void AsyncSendPacket(PacketNetWorkMsg packet)
        {
            //使用异步的方法发送数据
            if (IsInitialized)
            {
                if (!IsConnect)
                    this.client.BeginSend(packet.Data, packet.Data.Length, packet.RemoteIP, new AsyncCallback(SendCallback), null);
                else
                    this.client.BeginSend(packet.Data, packet.Data.Length, new AsyncCallback(SendCallback), null);
                if (packet.IsRequireReceiveCheck)
                    PushSendItemToList(packet);//将该消息压入列表
            }
        }

        #endregion
        System.Threading.SendOrPostCallback cucqCallpack;
        System.Threading.SendOrPostCallback resendCallback;
        /// <summary>
        /// 自由线程,检测未发送的数据并发出
        /// </summary>
        void CheckUnConfirmedQueue()
        {
            //异步调用委托
            if (cucqCallpack == null) cucqCallpack = (s) => OnPackageSendFailure(s as PackageEventArgs);
            if (resendCallback == null) resendCallback = (s) => OnPackageResend(s as PackageEventArgs);
            do
            {
                if (SendList.Count > 0)
                {
                    PacketNetWorkMsg[] array = null;
                    lock (SendList)
                    {
                        array = SendList.ToArray();
                    }
                    //挨个重新发送并计数
                    Array.ForEach(array, s =>
                    {
                        s.SendTimes++;
                        if (s.SendTimes >= MaxResendTimes)
                        {
                            //发送失败啊
                            PackageEventArgs e = new PackageEventArgs();
                            if (SeiClient.NeedPostMessage)
                            {
                                SeiClient.SendSynchronizeMessage(cucqCallpack, e);
                            }
                            else
                            {
                                OnPackageSendFailure(e);//触发发送失败事件
                            }
                            SendList.Remove(s);
                        }
                        else
                        {
                            //重新发送
                            AsyncSendPacket(s);
                            PackageEventArgs e = new PackageEventArgs() { PacketMsg = s };
                            if (SeiClient.NeedPostMessage)
                            {
                                SeiClient.SendASynchronizeMessage(resendCallback, e);
                            }
                            else
                            {
                                OnPackageResend(e);//触发重新发送事件
                            }
                        }
                    });
                }
                Thread.Sleep(CheckQueueTimeInterval);
            } while (IsInitialized);
        }


        static object lockObj = new object();
        /// <summary>
        /// 将数据信息压入列表
        /// </summary>
        /// <param name="item"></param>
        public void PushSendItemToList(PacketNetWorkMsg item)
        {
            SendList.Add(item);
        }

        /// <summary>
        /// 将数据包从列表中移除
        /// 网络层不会解析
        /// </summary>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">数据包分包索引</param>
        public void PopSendItemFromList(long packageNo, int packageIndex)
        {
            lock (lockObj)
            {
                Array.ForEach(SendList.Where(s => s.PackageNo == packageNo && s.PackageIndex == packageIndex).ToArray(), s => SendList.Remove(s));
            }
        }

        #region 事件

        /// <summary>
        /// 网络出现异常,无法获取本地ip地址
        /// </summary>
        public event EventHandler IPError;

        protected void OnLocalIpError(EventArgs e)
        {
            if (IPError != null) IPError(this, e);
        }

        /// <summary>
        /// 网络出现异常(如端口无法绑定等,此时无法继续工作)
        /// </summary>
        public event EventHandler NetworkError;

        protected void OnNetworkError(EventArgs e)
        {
            if (NetworkError != null) NetworkError(this, e);
        }

        /// <summary>
        /// 当数据包收到时触发
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageReceived;

        /// <summary>
        /// 当数据包收到事件触发时,被调用
        /// </summary>
        /// <param name="e">包含事件的参数</param>
        protected virtual void OnPackageReceived(PackageEventArgs e)
        {
            if (PackageReceived != null) PackageReceived(this, e);
        }
        /// <summary>
        /// 数据包发送失败
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageSendFailure;

        /// <summary>
        /// 当数据发送失败时调用
        /// </summary>
        /// <param name="e">包含事件的参数</param>
        protected virtual void OnPackageSendFailure(PackageEventArgs e)
        {
            if (PackageSendFailure != null) PackageSendFailure(this, e);
        }

        /// <summary>
        /// 数据包未接收到确认,重新发送
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageResend;


        /// <summary>
        /// 触发重新发送事件
        /// </summary>
        /// <param name="e">包含事件的参数</param>
        protected virtual void OnPackageResend(PackageEventArgs e)
        {
            if (PackageResend != null) PackageResend(this, e);
        }


        #endregion

        #region IDisposable 成员

        /// <summary>
        /// 关闭客户端并释放资源
        /// </summary>
        public void Dispose()
        {
            Close();
        }

        #endregion


    }
}

再然后来一个 UDP上层的类,用来把收到的包组装合并成为一个msg

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
using Netframe.Event;
using Netframe.Model;
using Netframe.Tool;
using System.Threading;

namespace Netframe.Core
{
    /// <summary>
    /// 对底层收到数据的解析
    /// </summary>
    public class MsgTranslator
    {
        #region 属性
        /// <summary>
        /// 用来发送和接收消息的对象
        /// </summary>
        public UDPThread Client { get; set; }

        Config _config;

        //用来检测重复收到的消息包
        Queue<long> ReceivedQueue;

        #endregion

        public MsgTranslator(UDPThread udpClient,Config config)
        {
            this.Client = udpClient;
            this._config = config;
            ReceivedQueue = new Queue<long>();
            Client.PackageReceived += PackageReceived;
        }

        /// <summary>
        /// 发送信息实体
        /// </summary>
        /// <param name="msg"></param>
        public void Send(Msg msg)
        {   
            //消息正在发送事件
            OnMessageSending(new MessageEventArgs(msg));
            Client.AsyncSendMsg(msg);
            //消息已发送事件
            OnMessageSended(new MessageEventArgs(msg));
        }


        static object lockObj = new object();
        /// <summary>
        /// 消息包接收到时的事件
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void PackageReceived(object sender, PackageEventArgs e)
        {
            if (!e.IsHandled)
            {
                e.IsHandled = true;
                Msg m = ResolveToMessage(e.Data, e.RemoteIP);
                if (m == null) return;
                if (m.Command == Commands.RecvConfirm)
                {
                    long pno = m.NormalMsg.TryParseToInt(0);
                    int pindex = m.ExtendMessage.TryParseToInt(0);
                    if (pno != 0)
                        this.Client.PopSendItemFromList(pno, pindex);
                    return;
                }
                //检查最近收到的消息队列里面是否已经包含了这个消息包,如果是,则丢弃
                if (!ReceivedQueue.Contains(m.PackageNo))
                {
                    ReceivedQueue.Enqueue(m.PackageNo);
                    if (ReceivedQueue.Count > 100) ReceivedQueue.Dequeue();

                    OnMessageReceived(new MessageEventArgs(m));
                }
                else
                    OnMessageDroped(new MessageEventArgs(m));
            }
        }

        public Msg ResolveToMessage(byte[] buffer, IPEndPoint remoteEndPoint)
        {
            if (buffer == null || buffer.Length < 0) return null;
            Msg m = null;
            if (MessagePacker.Test(buffer))
            {
                PacketNetWorkMsg pack = MessagePacker.Parse(buffer, remoteEndPoint);
                if (pack == null) return null;
                if (DetermineConfirm(pack))
                {
                    //发送确认标志
                    Msg cm = Helper.CreateRecivedCheck(remoteEndPoint, pack.PackageNo, pack.PackageIndex, _config);
                    Client.SendMsg(cm);
                }
                m = MessagePacker.TryToTranslateMessage(pack);
            }
            return m;
        }
        /// <summary>
        /// 检测是否需要发送回复包来确认收到
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        static bool DetermineConfirm(PacketNetWorkMsg packet)
        {
            return packet.IsRequireReceiveCheck;
        }
        static bool DetermineConfirm(Msg message)
        {
            return message.IsRequireReceive;
        }
        #region 事件

        /// <summary>
        /// 接收到消息包(UDP)
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageReceived;
        SendOrPostCallback messageReceivedCallBack;
        /// <summary>
        /// 引发接收到消息包事件
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnMessageReceived(MessageEventArgs e)
        {
            if (MessageReceived == null) return;
            if (!SeiClient.NeedPostMessage)
            {
                MessageReceived(this, e);
            }
            else
            {
                if (messageReceivedCallBack == null) 
                    messageReceivedCallBack = s => MessageReceived(this, s as MessageEventArgs);

                SeiClient.SendSynchronizeMessage(messageReceivedCallBack, e);
            }
        }

        /// <summary>
        /// 消息将要发送事件
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageSending;
        SendOrPostCallback messageSendingCallBack;
        /// <summary>
        /// 引发消息将要发送事件
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnMessageSending(MessageEventArgs e)
        {
            if (MessageSending == null) return;

            if (!SeiClient.NeedPostMessage)
            {
                MessageSending(this, e);
            }
            else
            {
                if (messageSendingCallBack == null) 
                    messageSendingCallBack = s => MessageSending(this, s as MessageEventArgs);
                SeiClient.SendSynchronizeMessage(messageSendingCallBack, e);
            }
        }


        /// <summary>
        /// 消息已经发送事件
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageSended;
        SendOrPostCallback messageSendedCall;
        /// <summary>
        /// 引发消息已经发送事件
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnMessageSended(MessageEventArgs e)
        {
            if (MessageSended == null) return;

            if (!SeiClient.NeedPostMessage)
            {
                MessageSended(this, e);
            }
            else
            {
                if (messageSendedCall == null) 
                    messageSendedCall = s => MessageSended(this, s as MessageEventArgs);
                SeiClient.SendSynchronizeMessage(messageSendedCall, e);
            }
        }

        /// <summary>
        /// 重复收包然后丢包事件
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageDroped;
        /// <summary>
        /// 引发丢弃Msg事件
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnMessageDroped(MessageEventArgs e)
        {
            if (MessageDroped == null) return;

            MessageDroped(this, e);
        }

        #endregion

    }
}



 以上就是c#基于事件模型的UDP通讯框架(适用于网络包编解码)的内容,更多相关内容请关注PHP中文网(www.php.cn)!




声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
.NET中的C#代码:探索编程过程.NET中的C#代码:探索编程过程Apr 12, 2025 am 12:02 AM

C#在.NET中的编程过程包括以下步骤:1)编写C#代码,2)编译为中间语言(IL),3)由.NET运行时(CLR)执行。C#在.NET中的优势在于其现代化语法、强大的类型系统和与.NET框架的紧密集成,适用于从桌面应用到Web服务的各种开发场景。

C#.NET:探索核心概念和编程基础知识C#.NET:探索核心概念和编程基础知识Apr 10, 2025 am 09:32 AM

C#是一种现代、面向对象的编程语言,由微软开发并作为.NET框架的一部分。1.C#支持面向对象编程(OOP),包括封装、继承和多态。2.C#中的异步编程通过async和await关键字实现,提高应用的响应性。3.使用LINQ可以简洁地处理数据集合。4.常见错误包括空引用异常和索引超出范围异常,调试技巧包括使用调试器和异常处理。5.性能优化包括使用StringBuilder和避免不必要的装箱和拆箱。

测试C#.NET应用程序:单元,集成和端到端测试测试C#.NET应用程序:单元,集成和端到端测试Apr 09, 2025 am 12:04 AM

C#.NET应用的测试策略包括单元测试、集成测试和端到端测试。1.单元测试确保代码的最小单元独立工作,使用MSTest、NUnit或xUnit框架。2.集成测试验证多个单元组合的功能,常用模拟数据和外部服务。3.端到端测试模拟用户完整操作流程,通常使用Selenium进行自动化测试。

高级C#.NET教程:ACE您的下一次高级开发人员面试高级C#.NET教程:ACE您的下一次高级开发人员面试Apr 08, 2025 am 12:06 AM

C#高级开发者面试需要掌握异步编程、LINQ、.NET框架内部工作原理等核心知识。1.异步编程通过async和await简化操作,提升应用响应性。2.LINQ以SQL风格操作数据,需注意性能。3..NET框架的CLR管理内存,垃圾回收需谨慎使用。

C#.NET面试问题和答案:提高您的专业知识C#.NET面试问题和答案:提高您的专业知识Apr 07, 2025 am 12:01 AM

C#.NET面试问题和答案包括基础知识、核心概念和高级用法。1)基础知识:C#是微软开发的面向对象语言,主要用于.NET框架。2)核心概念:委托和事件允许动态绑定方法,LINQ提供强大查询功能。3)高级用法:异步编程提高响应性,表达式树用于动态代码构建。

使用C#.NET建筑微服务:建筑师实用指南使用C#.NET建筑微服务:建筑师实用指南Apr 06, 2025 am 12:08 AM

C#.NET是构建微服务的热门选择,因为其生态系统强大且支持丰富。1)使用ASP.NETCore创建RESTfulAPI,处理订单创建和查询。2)利用gRPC实现微服务间的高效通信,定义和实现订单服务。3)通过Docker容器化微服务,简化部署和管理。

C#.NET安全性最佳实践:防止常见漏洞C#.NET安全性最佳实践:防止常见漏洞Apr 05, 2025 am 12:01 AM

C#和.NET的安全最佳实践包括输入验证、输出编码、异常处理、以及身份验证和授权。1)使用正则表达式或内置方法验证输入,防止恶意数据进入系统。2)输出编码防止XSS攻击,使用HttpUtility.HtmlEncode方法。3)异常处理避免信息泄露,记录错误但不返回详细信息给用户。4)使用ASP.NETIdentity和Claims-based授权保护应用免受未授权访问。

c语言中:是什么意思c语言中:是什么意思Apr 03, 2025 pm 07:24 PM

C 语言中冒号 (':') 的含义:条件语句:分隔条件表达式和语句块循环语句:分隔初始化、条件和增量表达式宏定义:分隔宏名和宏值单行注释:表示从冒号到行尾的内容为注释数组维数:指定数组的维数

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
3 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

EditPlus 中文破解版

EditPlus 中文破解版

体积小,语法高亮,不支持代码提示功能

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。

MinGW - 适用于 Windows 的极简 GNU

MinGW - 适用于 Windows 的极简 GNU

这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

功能强大的PHP集成开发环境