织梦CMS - 轻松建站从此开始!

罗索

UDP分包重组算法

落鹤生 发布于 2010-09-02 15:23 点击:次 
落鹤生:由于UDP传输的特性:乱序,后发先至等,而在进行UDP编程的时候因为一个包太大,不适合用来传输,所以经常会用到UDP服务器端发送分包,以及客户端分包重组的问题。这里就是一个不错的例子源码。(BTW:Windows下用IOCP来发也可能会有同样的问题,所以本文同样适
TAG:

UDP分包重组算法(BTW:Windows下用IOCP来发也可能会有同样的问题,所以本文同样适用于TCP - IOCP下的分包及重组)

Packet.h

  1. #include "InetAddr.h" //对socket地址操作封装的类,比如char*IP转成ULONG 
  2. #include <vector> 
  3. using namespace std; 
  4.  
  5. //先定义包头
  6. typedef struct _HeadExt 
  7.     //每帧所有分片公用信息 
  8.     char   flag[5]; //可以任意字符或数字或其他方法,用来标志我们发送的数据 
  9.     UINT   m_nSeqNumber; //本帧序数 
  10.     USHORT m_nTotalFragment;//本帧数据可分成的总片数 
  11.     USHORT m_nTotalSize;  //本帧数据总长度 
  12.     //每片各自信息 
  13.     USHORT m_nFragmentIndex; //每帧分片序数,0 1 2 ... ,\ 
  14.     USHORT m_usPayloadSize; //本片数据长度 
  15.     USHORT m_usFragOffset;//本片数据相对总数据的偏移量 
  16.     USHORT m_bLastFragment;//是否最后一帧 
  17. //上 述数据有些重复,可以精简,有时间再修改 
  18.  }HeadExt; 
  19.  
  20.  
  21.  
  22. //从socket接收到的数据 
  23. class  PacketIn 
  24. public : 
  25.     PacketIn(char* lpszBuffer=NULL, UINT usBufferSize=0, UINT nDataSize=0); 
  26.     virtual ~PacketIn(); 
  27.     HeadExt head; 
  28.     BYTE*    m_lpszBuffer; 
  29.     ULONG   ip; 
  30.     UINT    port; 
  31.     CVLInetAddr addr; 
  32.     BOOL Normalize(); 
  33.     UINT   m_nDataLen; 
  34.     UINT   m_usBufferSize; 
  35.  
  36. }; 
  37.  
  38. //接收到数据后开始重组使用的一个中间缓冲区,多个PacketIn合成一个Packet 
  39. //发送时从一个Packet分割出多片,不过本程序里直接发送,没有封成Packet 
  40. class Packet  
  41. {
  42. public
  43.     enum { TIMEOUT_LOCKFRAGMENTS = 1000 };
  44.     Packet( );
  45.     ~Packet();
  46.     void reset();
  47.     void Set(const CVLInetAddr& iaFrom, UINT nSeqNumber );
  48.     BOOL InsertFragment(PacketIn* const pFragment);
  49.     bool m_bUsed; 
  50.     int recvedpacks; 
  51.     int recvedbytes; 
  52.     int seqnum; 
  53.     BYTE* m_pBuffer;
  54.  
  55. //测试用程序,直接访问了类的变量,没有封装成函数。上述变量正常应该写成 SetXXX ,GetXXX    
  56. private
  57.     BOOL IsFragComplete() const;
  58.     ULONG     m_ip;
  59.     USHORT   m_port; 
  60.     UINT        m_nSeqNumber; 
  61.     vector<int> SeqNumberList; 
  62. }; 

Packet.cpp

  1. #include "Packet.h" 
  2.  
  3. PacketIn::PacketIn(char* lpszBuffer/*=NULL*/UINT usBufferSize/*=0*/,
  4.  UINT nDataSize/*=0*/
  5.  
  6.     m_lpszBuffer   = (BYTE*)lpszBuffer; 
  7.     m_usBufferSize = usBufferSize; //数据最大长度,其实没什么用,已经设置了这个值最大为64000 
  8.     m_nDataLen = nDataSize; 
  9. PacketIn::~PacketIn() 
  10. BOOL PacketIn::Normalize()  //判断收到的数据是否有效 
  11.     const USHORT usHeaderSize = sizeof(HeadExt); 
  12.     if ( !m_lpszBuffer || m_usBufferSize < usHeaderSize )//没什么用 
  13.     { 
  14.         return FALSE; 
  15.     } 
  16.  
  17.     HeadExt* pHeader = (HeadExt*)( m_lpszBuffer  ); 
  18.     if ( pHeader->m_usPayloadSize != m_nDataLen - usHeaderSize ) 
  19.     { 
  20.         return FALSE; 
  21.     } 
  22.  
  23.     head = *pHeader; 
  24.     if ( pHeader->m_usPayloadSize > 0 ) 
  25.     { 
  26. memmove( m_lpszBuffer, m_lpszBuffer + usHeaderSize, pHeader->m_usPayloadSize ); 
  27.     } 
  28.     
  29.     return TRUE; 
  30.  
  31. ///////////// 
  32. //这里是重组数据的临时缓冲,只看这里必然迷惑,先看socket收数据那里,再回来查看 
  33. void Packet::Set( const CVLInetAddr& iaFrom, UINT nSeqNumber ) 
  34.     m_iaFrom =  iaFrom; 
  35.     m_nSeqNumber = nSeqNumber; 
  36.     if(m_pBuffer) 
  37.         delete []m_pBuffer; 
  38.  
  39. void Packet::reset() 
  40.     m_bUsed = false
  41.     recvedpacks = 0; 
  42.     seqnum = 0; 
  43.     recvedbytes = 0; 
  44.     m_pBuffer = NULL; 
  45.     m_pBuffer = new BYTE[64000]; 
  46.     SeqNumberList.clear(); 
  47. Packet::Packet() 
  48.     //calculate the ttl 
  49.     //m_nTTL = RUDP_REASSEMBLE_TIMEOUT*CRudpPeer::PR_SLOWHZ; 
  50.     reset(); 
  51.  
  52. Packet::~Packet() 
  53.     if(m_pBuffer) 
  54.         delete []m_pBuffer; 
  55.  
  56. BOOL Packet::InsertFragment(PacketIn* const pFragment) 
  57.     int nSize = SeqNumberList.size(); 
  58.     for(int i = 0; i< nSize ;i ++) 
  59.     { 
  60.         if(nSize ==SeqNumberList[i] )//收到重复数据包 
  61.         { 
  62.             return FALSE; 
  63.         } 
  64.     } 
  65.     SeqNumberList.push_back(pFragment->head.m_nFragmentIndex); 
  66.  
  67.     memcpy( m_pBuffer + pFragment->head.m_usFragOffset, 
  68. pFragment->m_lpszBuffer, pFragment->head.m_usPayloadSize); 
  69.     recvedbytes += pFragment->head.m_usPayloadSize; 
  70.     recvedpacks++; 
  71.     m_bUsed = true
  72.  
  73.     CString str; 
  74. str.Format("收到数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d, 
  75. m_usFragOffset %d,m_nTotalSize %d,recvedbytes %d\r\n", 
  76. pFragment->head.m_nSeqNumber,pFragment->head.m_nTotalFragment, 
  77. pFragment->head.m_nFragmentIndex,pFragment->head.m_usFragOffset, 
  78. pFragment->head.m_nTotalSize,pFragment->head.m_usPayloadSize); 
  79.         OutputDebugString(str); 
  80.  
  81.     if(recvedbytes == pFragment->head.m_nTotalSize ) 
  82.         return TRUE; 
  83.     
  84.     return FALSE; 

发送时的操作:

  1. #iclude "packet.h" 
  2.  
  3. const int RTP_SPLIT_PACKSIZE = 1300;
  4. //udp一般最大包为1500,一般这里都设置为1400,当然1300也没错 
  5. LONGLONG index = rand();
  6. //帧序数,从一个随机数开始,我机器上生成的是41 
  7. //分包发送,逻辑相当简单,按最常规的方法分割发送 
  8. void Send(BYTE* pBuff,UINT nLen,ULONG ip,USHORT port) 
  9.     HeadExt head; 
  10.     strcpy(head.flag ,"aaaa"); 
  11.       
  12.     head.m_nSeqNumber = index++; //帧序数 
  13.  
  14.     head.m_nTotalFragment = nLen/RTP_SPLIT_PACKSIZE; 
  15.     head.m_nTotalFragment  += nLen%RTP_SPLIT_PACKSIZE?1:0;
  16. //整除的话就是0,否则多出一个尾数据 
  17.     head.m_nFragmentIndex = 0;//第一个分段从0开始 
  18.     head.m_nTotalSize = nLen; 
  19.     head.m_bLastFragment = 0; 
  20.     head.m_usFragOffset = 0; 
  21.  
  22.     char tem[RTP_SPLIT_PACKSIZE+sizeof(HeadExt)]; 
  23.  
  24.     if(head.m_nTotalFragment == 1) 
  25.     { 
  26.         memcpy(tem,&head,sizeof(HeadExt)); 
  27.         memcpy(tem+sizeof(HeadExt),pBuff,nLen); 
  28.         head.m_usPayloadSize = nLen; 
  29.  
  30. //用UDP发送,常规做法,先对UDP做一个封装的类,这里调用类对象。本代码只说明原理,类似伪代友 
  31.        sendto(tem,nLen+sizeof(HeadExt),ip,port); 
  32.  
  33.         CString str; 
  34.         str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
  35. m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d\r\n"
  36. head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex,
  37. head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize); 
  38.         OutputDebugString(str); 
  39.  
  40.         return
  41.     } 
  42.  
  43.     int i = 0; 
  44.     for( i = 0; i < head.m_nTotalFragment-1; i++)//开始分割,最后一个单独处理 
  45.     { 
  46.         head.m_bLastFragment = 0; 
  47.         head.m_nFragmentIndex = i; 
  48.         head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE; 
  49.         head.m_usPayloadSize = RTP_SPLIT_PACKSIZE; 
  50.         memcpy(tem,&head,sizeof(HeadExt)); 
  51.         memcpy(tem+sizeof(HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,RTP_SPLIT_PACKSIZE); 
  52.        sendto(tem,nLen+sizeof(HeadExt),ip,port); 
  53.  
  54.         CString str; 
  55.         str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
  56. m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d\r\n"
  57. head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex,
  58. head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize); 
  59.         OutputDebugString(str); 
  60.  
  61.     } 
  62.  
  63.     head.m_bLastFragment = 1; 
  64.     head.m_nFragmentIndex = i; 
  65.     head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE; 
  66.     head.m_usPayloadSize = nLen - (i*RTP_SPLIT_PACKSIZE); 
  67.  
  68.     memcpy(tem,&head,sizeof(HeadExt)); 
  69.     memcpy(tem+sizeof(HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,nLen - (i*RTP_SPLIT_PACKSIZE)); 
  70.  
  71.     sendto(tem,nLen+sizeof(HeadExt),ip,port); 
  72.  
  73.     CString str; 
  74. str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
  75. m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d\r\n",head.m_nSeqNumber,
  76. head.m_nTotalFragment,head.m_nFragmentIndex,head.m_usFragOffset,
  77. head.m_nTotalSize,head.m_usPayloadSize); 
  78.  
  79. OutputDebugString(str); 

接收数据的回调

//代码很简单,收到数据后,形成PacketIn,然后找一个链表,插进去
//因为收到的数据的顺序是不定的,所以先创建一个链表,每帧首数据到来时,分配一个节点,其他分片到来的时候,都加到//这个节点里,等数据都到齐了,这个节点就是一个完整的包,回调出去。

  1. void RecvCallback(BYTE* pBuff,UINT nLen,ULONG ip,USHORT port) 
  2.     if(nLen < sizeof(HeadExt)) 
  3.         return;
  4.     HeadExt* pHead = (HeadExt*)pBuff; 
  5.     CString str = pHead->flag; 
  6.     if(str != "aaaa"
  7.     {
  8.         return
  9.     }
  10.     if(pHead->m_nTotalFragment == 1)//只有一帧,不分片 
  11.     { 
  12.         //回调,上层处理 
  13.         if(m_pfDispatch)
  14. m_pfDispatch(pBuff+sizeof(HeadExt),nLen-sizeof(HeadExt),ip,port,m_lpParam);
  15.         return
  16.     } 
  17.  
  18.     PacketIn data( (char*)pBuff, 64000, nLen ); 
  19.     if ( !data.Normalize() ) 
  20.         return;
  21.     if ( data.head.m_nTotalFragment>1 ) 
  22.     {
  23.         Packet* pTemp = GetPartialPacket( iaRemote, data.head.m_nSeqNumber ); 
  24.         if ( pTemp ) 
  25.         { 
  26.             if ( pTemp ->InsertFragment( &data ) ) 
  27.             { 
  28.                 m_pfDispatch(pTemp ->m_pBuffer, 
  29.                     pTemp ->recvedbytes,ip,port,m_lpParam); 
  30.             } 
  31.         }//end of if 
  32.     } 

//上面用到的一些变量可以在一个.h里面定义,比如:
Packet TempPacket[16];//分配一个数组,每个节点是一个重组后临时缓冲
Packet* GetPartialPacket(const CVLInetAddr& iaFrom, UINT nSeqNumber);//从数组里取出一个缓冲节点

  1. //根据这几个关键字查找,不过只用 
  2. //到了nSeqNumber,要是3人以上的视频聊天,ip和port是必要的 
  3. Packet* GetPartialPacket(const ULONG ip,USHORT port, UINT nSeqNumber) 
  4.     Packet* tmp = NULL; 
  5.     int i=0; 
  6.     while(tmp==NULL && i<16) 
  7.     { 
  8.         //该包所属的帧已有其它数据包到达 
  9.         if(TempPacket[i].seqnum==nSeqNumber && TempPacket[i].recvedpacks>0) 
  10.         { 
  11.             tmp = &TempPacket[i]; 
  12.             break
  13.         } 
  14.         i++; 
  15.     } 
  16.     if(tmp == NULL)        //新的帧 
  17.     { 
  18.         //查找空闲元素 
  19.         for(i=0;i<16;i++) 
  20.         { 
  21.             if(!TempPacket[i].m_bUsed) 
  22.                 break
  23.         } 
  24.  
  25.         if(i>=16) 
  26.         { 
  27.             //没有空闲的元素,丢掉一个最早 
  28.             tmp = &TempPacket[0]; 
  29.             int j = 0; 
  30.             for(i=1;i<16;i++) 
  31.             { 
  32.                 if(TempPacket[i].seqnum < tmp->seqnum) 
  33.                 { 
  34.                     tmp = &TempPacket[i]; 
  35.                     j = i; 
  36.                 } 
  37.             } 
  38.             //找到最早的一帧 
  39.             if(tmp->m_pBuffer) 
  40.             { 
  41.                 delete []tmp->m_pBuffer; 
  42.                 tmp->reset(); 
  43.             } 
  44.         } 
  45.         else 
  46.             tmp = &TempPacket[i]; 
  47.     } 
  48.  
  49.     InsertFragment 
  50.     tmp->m_bUsed = true
  51.     tmp->seqnum = nSeqNumber; 
  52.  
  53.     return tmp; 


整个示例最重要的是两个函数:

GetPartialPacket:取到一个临时缓冲节点

InsertFragment:把收到的每个数据段插入临时缓冲组成一个完整帧

当然发送方也要按一定规则分割发送。


 

(秩名)
本站文章除注明转载外,均为本站原创或编译欢迎任何形式的转载,但请务必注明出处,尊重他人劳动,同学习共成长。转载请注明:文章转载自:罗索实验室 [http://www.rosoo.net/a/201009/10078.html]
本文出处:网络 作者:秩名
顶一下
(13)
92.9%
踩一下
(1)
7.1%
------分隔线----------------------------
发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片
栏目列表
将本文分享到微信
织梦二维码生成器
推荐内容