>  기사  >  백엔드 개발  >  안정적인 전송을 달성하기 위한 C#의 UDP 정보(데이터 패킷의 그룹 전송)

안정적인 전송을 달성하기 위한 C#의 UDP 정보(데이터 패킷의 그룹 전송)

黄舟
黄舟원래의
2017-02-27 11:04:104128검색

UDP는 C#에서 비연결 전송에 사용되지만 TCP만큼 안정적이지는 않습니다. 하지만 효율성이 높을수록 장점과 단점이 있습니다

즉, 패킷이 손실되는 경우도 있고 UDP를 사용해야 하는 경우도 있는데 어떻게 하면 보다 안정적으로 신뢰할 수 있는 전송을 달성할 수 있는지가 문제입니다.

TCP는 데이터 전송 시 크기 제한이 없지만, UDP 전송 시에는 크기 제한이 있습니다. 어떻게 하면 빅데이터를 안정적으로 전송할 수 있을까요? 우리는 데이터 패킷을 하청 계약하는 것을 고려했습니다.

대용량 데이터를 일련의 작은 데이터 패킷으로 분할하여 별도로 전송하면 서버는 이를 수신한 후 완전한 데이터를 하나로 묶습니다.

중간에 패킷 손실이 발생하면 재전송합니다.

데이터 패킷화 및 재전송을 구현하는 UDP 스레드 클래스입니다. 구체적인 수신 작업은

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using Model;
using System.Net;
using Tool;
using System.Threading;

namespace ZZUdp.Core
{
    //udp的类  
    public class UDPThread
    {
        #region 私有变量

        UdpClient client;//UDP客户端

        List<UdpPacket> sendlist;// 用于轮询是否发送成功的记录

        Dictionary<long, RecDataList> RecListDic = new Dictionary<long, RecDataList>();//数据接收列表,每一个sequence对应一个

        IPEndPoint remotIpEnd = null;//用来在接收数据的时候对远程主机的信息存放

        int port=6666;//定义服务器的端口号
        #endregion

        #region 属性
        public int CheckQueueTimeInterval { get; set; }//检查发送队列间隔

        public int MaxResendTimes { get; set; }//没有收到确认包时,最大重新发送的数目,超过此数目会丢弃并触发PackageSendFailture事件
        #endregion

        #region 事件
        
        /// <summary>
        /// 当数据包收到时触发
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageReceived;
        /// <summary>
        /// 当数据包收到事件触发时,被调用
        /// </summary>
        /// <param name="e">包含事件的参数</param>
        protected virtual void OnPackageReceived(PackageEventArgs e)
        {
            if (PackageReceived != null) 
                PackageReceived(this, e);
        }

        /// <summary>
        /// 数据包发送失败
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageSendFailure;
        /// <summary>
        /// 当数据发送失败时调用
        /// </summary>
        /// <param name="e">包含事件的参数</param>
        protected virtual void OnPackageSendFailure(PackageEventArgs e)
        {
            if (PackageSendFailure != null) 
                PackageSendFailure(this, e);
        }

        /// <summary>
        /// 数据包未接收到确认,重新发送
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageResend;
        /// <summary>
        /// 触发重新发送事件
        /// </summary>
        /// <param name="e">包含事件的参数</param>
        protected virtual void OnPackageResend(PackageEventArgs e)
        {
            if (PackageResend != null) 
                PackageResend(this, e);
        }
        #endregion


        //无参构造函数
        public UDPThread()
        { 
        }
        //构造函数
        public UDPThread(string ipaddress, int port)
        {
            IPAddress ipA = IPAddress.Parse(ipaddress);//构造远程连接的参数
            IPEndPoint ipEnd = new IPEndPoint(ipA, port);
            client = new UdpClient();// client = new UdpClient(ipEnd)这样的话就没有创建远程连接
            client.Connect(ipEnd);//使用指定的远程主机信息建立默认远程主机连接
            sendlist = new List<UdpPacket>();

            CheckQueueTimeInterval = 2000;//轮询间隔时间
            MaxResendTimes = 5;//最大发送次数
            
            new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();//启动轮询线程
            //开始监听数据
            AsyncReceiveData();
        }
        /// <summary>
        /// 同步数据接收方法
        /// </summary>
        public void ReceiveData()
        {
            while (true)
            {
                IPEndPoint retip = null;
                UdpPacket udpp = null;
                try
                {
                    byte[] data = client.Receive(ref retip);//接收数据,当Client端连接主机的时候,retip就变成Cilent端的IP了
                    udpp = (UdpPacket)SerializationUnit.DeserializeObject(data);
                }
                catch (Exception ex)
                {
                    //异常处理操作
                }
                if (udpp != null)
                {
                    PackageEventArgs arg = new PackageEventArgs(udpp, retip);
                    OnPackageReceived(arg);//数据包收到触发事件
                }
            }
        }

        //异步接受数据
        public void AsyncReceiveData()
        {
            try
            {
                client.BeginReceive(new AsyncCallback(ReceiveCallback), null);
            }
            catch (SocketException ex)
            {
                throw ex;
            }
        }
        //接收数据的回调函数
        public void ReceiveCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                UdpPacket udpp = null;
                try
                {
                    byte[] data = client.EndReceive(param, ref remotIpEnd);//接收数据,当Client端连接主机的时候,test就变成Cilent端的IP了
                    udpp = (UdpPacket)SerializationUnit.DeserializeObject(data);
                }
                catch (Exception ex)
                {
                    //异常处理操作
                }
                finally
                {
                    AsyncReceiveData();
                }
                if (udpp != null)//触发数据包收到事件
                {
                    PackageEventArgs arg = new PackageEventArgs(udpp, null);
                    OnPackageReceived(arg);
                }
            }
        }


        /// <summary>
        /// 同步发送分包数据
        /// </summary>
        /// <param name="message"></param>
        public void SendData(Msg message)
        {
           
            ICollection<UdpPacket> udpPackets = UdpPacketSplitter.Split(message);
            foreach (UdpPacket udpPacket in udpPackets)
            {
                byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket);
                //使用同步发送
               client.Send(udpPacketDatagram, udpPacketDatagram.Length,udpPacket.remoteip);
               if (udpPacket.IsRequireReceiveCheck)
                   PushSendItemToList(udpPacket);//将该消息压入列表

            }
        }
        
        /// <summary>
        /// 异步分包发送数组的方法
        /// </summary>
        /// <param name="message"></param>
        public void AsyncSendData(Msg message)
        {
            
            ICollection<UdpPacket> udpPackets = UdpPacketSplitter.Split(message);
            foreach (UdpPacket udpPacket in udpPackets)
            {
                byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket);
                //使用同步发送
                //client.Send(udpPacketDatagram, udpPacketDatagram.Length);

                //使用异步的方法发送数据
                this.client.BeginSend(udpPacketDatagram, udpPacketDatagram.Length, new AsyncCallback(SendCallback), null);

            }
        }
        //发送完成后的回调方法
        public void SendCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                try
                {
                    client.EndSend(param);//这句话必须得写,BeginSend()和EndSend()是成对出现的 
                }
                catch (Exception e)
                {
                    //其他处理异常的操作
                }
            }

        }
        static object lockObj = new object();
        /// <summary>
        /// 自由线程,检测未发送的数据并发出,存在其中的就是没有收到确认包的数据包
        /// </summary>
        void CheckUnConfirmedQueue()
        {
            do
            {
                if (sendlist.Count > 0)
                {
                    UdpPacket[] array = null;

                    lock (sendlist)
                    {
                        array = sendlist.ToArray();
                    }
                    //挨个重新发送并计数
                    Array.ForEach(array, s =>
                    {
                        s.sendtimes++;
                        if (s.sendtimes >= MaxResendTimes)
                        {
                            //sOnPackageSendFailure//出发发送失败事件
                            sendlist.Remove(s);//移除该包
                        }
                        else
                        {
                            //重新发送
                            byte[] udpPacketDatagram = SerializationUnit.SerializeObject(s);
                            client.Send(udpPacketDatagram, udpPacketDatagram.Length, s.remoteip);
                        }
                    });
                }

               Thread.Sleep(CheckQueueTimeInterval);//间隔一定时间重发数据
            } while (true);
        }
        /// <summary>
        /// 将数据信息压入列表
        /// </summary>
        /// <param name="item"></param>
        void PushSendItemToList(UdpPacket item)
        {
            sendlist.Add(item);
        }
        /// <summary>
        /// 将数据包从列表中移除
        /// </summary>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">数据包分包索引</param>
        public void PopSendItemFromList(long packageNo, int packageIndex)
        {
            lock (lockObj)
            {
                Array.ForEach(sendlist.Where(s => s.sequence == packageNo && s.index == packageIndex).ToArray(), s => sendlist.Remove(s));
            }
        }
        /// <summary>
        /// 关闭客户端并释放资源
        /// </summary>
        public void Dispose()
        {
            if (client != null)
            {
                client.Close();
                client = null;
            }
        }



    }
}

첫 번째는 데이터 정보 개체 클래스

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;

namespace Model
{
    //封装消息类
    [Serializable]
   public class Msg
    {
        //所属用户的用户名
        public string name { get; set; }

        //所属用户的ip
        public string host { get; set; }

        //命令的名称
        public string command { get; set; }

        //收信人的姓名
        public string desname { get; set; }

        //你所发送的消息的目的地ip,应该是对应在服务器的列表里的主键值
        public string destinationIP { get; set; }

        //端口号
        public int port { get; set; }

        //文本消息
        public string msg { get; set; }

        //二进制消息
        public byte[] byte_msg { get; set; }

        //附加数据
        public string extend_msg { get; set; }

        //时间戳
        public DateTime time { get; set; }

        //构造函数
        public Msg(string command,string desip,string msg,string host)
        {
            this.command = command;
            this.destinationIP = desip;
            this.msg = msg;
            this.time = DateTime.Now;
            this.host = host;
        }
        override
        public string ToString()
        {
            return name + "说:" + msg;
        }
    }
}

MSG 데이터를 분할하여 하도급 데이터

하도급 개체 클래스

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;
using System.Net;

namespace Model
{
    [Serializable]
    public class UdpPacket
    {
        public long sequence{get;set;}//所属组的唯一序列号 包编号
        public int total { get; set; }//分包总数
        public int index { get; set; }//消息包的索引
        public byte[] data { get; set; }//包的内容数组
        public int dataLength { get; set; }//分割的数组包大小
        public int remainder { get; set; }//最后剩余的数组的数据长度
        public int sendtimes { get; set; }//发送次数
        public IPEndPoint remoteip { get; set; }//接受该包的远程地址
        public bool IsRequireReceiveCheck { get; set; }//获得或设置包收到时是否需要返回确认包
        public static int HeaderSize = 30000;
        public UdpPacket(long sequence, int total, int index, byte[] data, int dataLength, int remainder,string desip,int port)
        {
            this.sequence = sequence;
            this.total = total;
            this.index = index;
            this.data = data;
            this.dataLength = dataLength;
            this.remainder = remainder;
            this.IsRequireReceiveCheck = true;//默认都需要确认包
            //构造远程地址
            IPAddress ipA = IPAddress.Parse(desip);
            this.remoteip = new IPEndPoint(ipA, port);
        }
        //把这个对象生成byte[]
        public byte[] ToArray()
        {
            return SerializationUnit.SerializeObject(this);
        }
    }
}

데이터 패킷 분할 도구 클래스

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;

namespace Model
{
        /// <summary>
        /// UDP数据包分割器
        /// </summary>
        public static class UdpPacketSplitter
        {


            public static ICollection<UdpPacket> Split(Msg message)
            {
                byte[] datagram = null;
                try
                {
                    datagram = SerializationUnit.SerializeObject(message);
                }
                catch (Exception e)
                {
                    //AddTalkMessage("数据转型异常");
                }
                //产生一个序列号,用来标识包数据属于哪一组
                Random Rd = new Random();
                long SequenceNumber = Rd.Next(88888, 999999);
                ICollection<UdpPacket> udpPackets = UdpPacketSplitter.Split(SequenceNumber, datagram, 10240, message.destinationIP, message.port);

                return udpPackets;
            }
            /// <summary>
            /// 分割UDP数据包
            /// </summary>
            /// <param name="sequence">UDP数据包所持有的序号</param>
            /// <param name="datagram">被分割的UDP数据包</param>
            /// <param name="chunkLength">分割块的长度</param>
            /// <returns>
            /// 分割后的UDP数据包列表
            /// </returns>
            public static ICollection<UdpPacket> Split(long sequence, byte[] datagram, int chunkLength,string desip,int port)
            {
                if (datagram == null)
                    throw new ArgumentNullException("datagram");

                List<UdpPacket> packets = new List<UdpPacket>();

                int chunks = datagram.Length / chunkLength;
                int remainder = datagram.Length % chunkLength;
                int total = chunks;
                if (remainder > 0) total++;

                for (int i = 1; i <= chunks; i++)
                {
                    byte[] chunk = new byte[chunkLength];
                    Buffer.BlockCopy(datagram, (i - 1) * chunkLength, chunk, 0, chunkLength);
                    packets.Add(new UdpPacket(sequence, total, i, chunk, chunkLength, remainder, desip, port));
                }
                if (remainder > 0)
                {
                    int length = datagram.Length - (chunkLength * chunks);
                    byte[] chunk = new byte[length];
                    Buffer.BlockCopy(datagram, chunkLength * chunks, chunk, 0, length);
                    packets.Add(new UdpPacket(sequence, total, total, chunk, chunkLength, remainder, desip, port));
                }

                return packets;
            }
        }
}

서버측 데이터 저장을 위한 데이터 구조

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;
using Model;

namespace Model
{
    //一个sequence对应一组的数据包的数据结构
    public class RecDataList
    {
        public long sequence { get; set; }//序列号
        //对应的存储包的List
        List<UdpPacket> RecudpPackets = new List<UdpPacket>();
        public int total { get; set; }
        public int dataLength { get; set; }
        public int remainder { get; set; }
        public byte[] DataBuffer = null;
        public RecDataList(UdpPacket udp)
        {

            this.sequence = udp.sequence;
            this.total = udp.total;
            this.dataLength = udp.dataLength;
            this.remainder = udp.remainder;
            if (DataBuffer == null)
            {
                DataBuffer = new byte[dataLength * (total - 1) + remainder];
            }
        }
        public RecDataList(long sequence, int total, int chunkLength, int remainder)
        {
            
            this.sequence = sequence;
            this.total = total;
            this.dataLength = chunkLength;
            this.remainder = remainder;
            if (DataBuffer == null)
            {
                DataBuffer = new byte[this.dataLength * (this.total - 1) + this.remainder];
            }
        }
        public void addPacket(UdpPacket p)
        {
            RecudpPackets.Add(p);
        }
        public Msg show() 
        {
            if (RecudpPackets.Count == total)//表示已经收集满了
            {
                //重组数据
                foreach (UdpPacket udpPacket in RecudpPackets)
                {
                    //偏移量
                    int offset = (udpPacket.index - 1) * udpPacket.dataLength;
                    Buffer.BlockCopy(udpPacket.data, 0, DataBuffer, offset, udpPacket.data.Length);
                }
                Msg rmsg = (Msg)SerializationUnit.DeserializeObject(DataBuffer);
                DataBuffer = null;
                RecudpPackets.Clear();
                return rmsg;
            }
            else
            {
                return null;
            }
        }
        public bool containskey(UdpPacket udp)
        {
            foreach (UdpPacket udpPacket in RecudpPackets)
            {
                if (udpPacket.index == udp.index)
                    return true;
            }
            return false;
        }
    }
}

인코딩 도구 클래스

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;

namespace Tool
{
    public class EncodingTool
    {
        //编码
        public static byte[] EncodingASCII(string buf)
        {
            byte[] data = Encoding.Unicode.GetBytes(buf);
            return data;
        }
        //解码
        public static string DecodingASCII(byte[] bt)
        {
            string st = Encoding.Unicode.GetString(bt);
            return st;
        }



        //编码
        public static byte[] EncodingUTF_8(string buf)
        {
            byte[] data = Encoding.UTF8.GetBytes(buf);
            return data;
        }
        //编码
        public static string DecodingUTF_8(byte[] bt)
        {
            string st = Encoding.UTF8.GetString(bt);
            return st;
        }
       
    }
}

직렬화 및 역직렬화 도구 class

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
namespace Tool
{

    public class SerializationUnit
    {
        /// <summary>  
        /// 把对象序列化为字节数组  
        /// </summary>  
        public static byte[] SerializeObject(object obj)
        {
            if (obj == null)
                return null;
            //内存实例
            MemoryStream ms = new MemoryStream();
            //创建序列化的实例
            BinaryFormatter formatter = new BinaryFormatter();
            formatter.Serialize(ms, obj);//序列化对象,写入ms流中  
            ms.Position = 0;
            //byte[] bytes = new byte[ms.Length];//这个有错误
            byte[] bytes = ms.GetBuffer();
            ms.Read(bytes, 0, bytes.Length);
            ms.Close();
            return bytes;
        }

        /// <summary>  
        /// 把字节数组反序列化成对象  
        /// </summary>  
        public static object DeserializeObject(byte[] bytes)
        {
            object obj = null;
            if (bytes == null)
                return obj;
            //利用传来的byte[]创建一个内存流
            MemoryStream ms = new MemoryStream(bytes);
            ms.Position = 0;
            BinaryFormatter formatter = new BinaryFormatter();
            obj = formatter.Deserialize(ms);//把内存流反序列成对象  
            ms.Close();
            return obj;
        }
        /// <summary>
        /// 把字典序列化
        /// </summary>
        /// <param name="dic"></param>
        /// <returns></returns>
        public static byte[] SerializeDic(Dictionary<string, object> dic)
        {
            if (dic.Count == 0)
                return null;
            MemoryStream ms = new MemoryStream();
            BinaryFormatter formatter = new BinaryFormatter();
            formatter.Serialize(ms, dic);//把字典序列化成流

            byte[] bytes = new byte[ms.Length];//从流中读出byte[]
            ms.Read(bytes, 0, bytes.Length);

            return bytes;
        }
        /// <summary>
        /// 反序列化返回字典
        /// </summary>
        /// <param name="bytes"></param>
        /// <returns></returns>
        public static Dictionary<string, object> DeserializeDic(byte[] bytes)
        {
            Dictionary<string, object> dic = null;
            if (bytes == null)
                return dic;
            //利用传来的byte[]创建一个内存流
            MemoryStream ms = new MemoryStream(bytes);
            ms.Position = 0;
            BinaryFormatter formatter = new BinaryFormatter();
            //把流中转换为Dictionary
            dic = (Dictionary<string, object>)formatter.Deserialize(ms);
            return dic;
        }
    }

}

일반 패킷 이벤트 클래스

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using Model;

namespace ZZUdp.Core
{
	/// <summary>
	/// 数据包事件数据
	/// </summary>
	public class PackageEventArgs : EventArgs
	{
		/// <summary>
		/// 网络消息包
		/// </summary>
		public UdpPacket udpPackage { get; set; }

		/// <summary>
		/// 网络消息包组
		/// </summary>
        public UdpPacket[] udpPackages { get; set; }

        /// <summary>
        /// 远程IP
        /// </summary>
        public IPEndPoint RemoteIP { get; set; }

		/// <summary>
		/// 是否已经处理
		/// </summary>
		public bool IsHandled { get; set; }

		/// <summary>
		/// 创建一个新的 PackageEventArgs 对象.
		/// </summary>
        public PackageEventArgs(UdpPacket package, IPEndPoint RemoteIP)
		{
            this.udpPackage = package;
            this.RemoteIP = RemoteIP;
			this.IsHandled = false;
		}
	}
}

위 내용은 안정적인 전송(데이터 패킷의 그룹 전송)을 구현하기 위한 C#의 UDP에 대한 내용입니다. 중국넷(www.php.cn)!



성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.