UDP分包重组算法(BTW:Windows下用IOCP来发也可能会有同样的问题,所以本文同样适用于TCP - IOCP下的分包及重组)
Packet.h
- #include "InetAddr.h" //对socket地址操作封装的类,比如char*IP转成ULONG
- #include <vector>
- using namespace std;
-
- typedef struct _HeadExt
- {
-
- char flag[5];
- UINT m_nSeqNumber;
- USHORT m_nTotalFragment;
- USHORT m_nTotalSize;
-
- USHORT m_nFragmentIndex;
- USHORT m_usPayloadSize;
- USHORT m_usFragOffset;
- USHORT m_bLastFragment;
-
- }HeadExt;
-
-
-
-
- class PacketIn
- {
- public :
- PacketIn(char* lpszBuffer=NULL, UINT usBufferSize=0, UINT nDataSize=0);
- virtual ~PacketIn();
- HeadExt head;
- BYTE* m_lpszBuffer;
- ULONG ip;
- UINT port;
- CVLInetAddr addr;
- BOOL Normalize();
- UINT m_nDataLen;
- UINT m_usBufferSize;
-
- };
-
-
-
- class Packet
- {
- public:
- enum { TIMEOUT_LOCKFRAGMENTS = 1000 };
- Packet( );
- ~Packet();
- void reset();
- void Set(const CVLInetAddr& iaFrom, UINT nSeqNumber );
- BOOL InsertFragment(PacketIn* const pFragment);
- bool m_bUsed;
- int recvedpacks;
- int recvedbytes;
- int seqnum;
- BYTE* m_pBuffer;
-
-
- private:
- BOOL IsFragComplete() const;
- ULONG m_ip;
- USHORT m_port;
- UINT m_nSeqNumber;
- vector<int> SeqNumberList;
- };
Packet.cpp
- #include "Packet.h"
-
- PacketIn::PacketIn(char* lpszBuffer, UINT usBufferSize,
- UINT nDataSize)
- {
-
- m_lpszBuffer = (BYTE*)lpszBuffer;
- m_usBufferSize = usBufferSize;
- m_nDataLen = nDataSize;
- }
- PacketIn::~PacketIn()
- {
- }
- BOOL PacketIn::Normalize()
- {
- const USHORT usHeaderSize = sizeof(HeadExt);
- if ( !m_lpszBuffer || m_usBufferSize < usHeaderSize )
- {
- return FALSE;
- }
-
- HeadExt* pHeader = (HeadExt*)( m_lpszBuffer );
- if ( pHeader->m_usPayloadSize != m_nDataLen - usHeaderSize )
- {
- return FALSE;
- }
-
- head = *pHeader;
- if ( pHeader->m_usPayloadSize > 0 )
- {
- memmove( m_lpszBuffer, m_lpszBuffer + usHeaderSize, pHeader->m_usPayloadSize );
- }
-
- return TRUE;
- }
-
-
-
- void Packet::Set( const CVLInetAddr& iaFrom, UINT nSeqNumber )
- {
- m_iaFrom = iaFrom;
- m_nSeqNumber = nSeqNumber;
- if(m_pBuffer)
- delete []m_pBuffer;
- }
-
- void Packet::reset()
- {
- m_bUsed = false;
- recvedpacks = 0;
- seqnum = 0;
- recvedbytes = 0;
- m_pBuffer = NULL;
- m_pBuffer = new BYTE[64000];
- SeqNumberList.clear();
- }
- Packet::Packet()
- {
-
-
- reset();
- }
-
- Packet::~Packet()
- {
- if(m_pBuffer)
- delete []m_pBuffer;
- }
-
- BOOL Packet::InsertFragment(PacketIn* const pFragment)
- {
- int nSize = SeqNumberList.size();
- for(int i = 0; i< nSize ;i ++)
- {
- if(nSize ==SeqNumberList[i] )
- {
- return FALSE;
- }
- }
- SeqNumberList.push_back(pFragment->head.m_nFragmentIndex);
-
- memcpy( m_pBuffer + pFragment->head.m_usFragOffset,
- pFragment->m_lpszBuffer, pFragment->head.m_usPayloadSize);
- recvedbytes += pFragment->head.m_usPayloadSize;
- recvedpacks++;
- m_bUsed = true;
-
- CString str;
- str.Format("收到数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
- m_usFragOffset %d,m_nTotalSize %d,recvedbytes %d\r\n",
- pFragment->head.m_nSeqNumber,pFragment->head.m_nTotalFragment,
- pFragment->head.m_nFragmentIndex,pFragment->head.m_usFragOffset,
- pFragment->head.m_nTotalSize,pFragment->head.m_usPayloadSize);
- OutputDebugString(str);
-
- if(recvedbytes == pFragment->head.m_nTotalSize )
- return TRUE;
-
- return FALSE;
- }
发送时的操作:
- #iclude "packet.h"
-
- const int RTP_SPLIT_PACKSIZE = 1300;
-
- LONGLONG index = rand();
-
-
- void Send(BYTE* pBuff,UINT nLen,ULONG ip,USHORT port)
- {
- HeadExt head;
- strcpy(head.flag ,"aaaa");
-
- head.m_nSeqNumber = index++;
-
- head.m_nTotalFragment = nLen/RTP_SPLIT_PACKSIZE;
- head.m_nTotalFragment += nLen%RTP_SPLIT_PACKSIZE?1:0;
-
- head.m_nFragmentIndex = 0;
- head.m_nTotalSize = nLen;
- head.m_bLastFragment = 0;
- head.m_usFragOffset = 0;
-
- char tem[RTP_SPLIT_PACKSIZE+sizeof(HeadExt)];
-
- if(head.m_nTotalFragment == 1)
- {
- memcpy(tem,&head,sizeof(HeadExt));
- memcpy(tem+sizeof(HeadExt),pBuff,nLen);
- head.m_usPayloadSize = nLen;
-
-
- sendto(tem,nLen+sizeof(HeadExt),ip,port);
-
- CString str;
- str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
- m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d\r\n",
- head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex,
- head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize);
- OutputDebugString(str);
-
- return;
- }
-
- int i = 0;
- for( i = 0; i < head.m_nTotalFragment-1; i++)
- {
- head.m_bLastFragment = 0;
- head.m_nFragmentIndex = i;
- head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE;
- head.m_usPayloadSize = RTP_SPLIT_PACKSIZE;
- memcpy(tem,&head,sizeof(HeadExt));
- memcpy(tem+sizeof(HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,RTP_SPLIT_PACKSIZE);
- sendto(tem,nLen+sizeof(HeadExt),ip,port);
-
- CString str;
- str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
- m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d\r\n",
- head.m_nSeqNumber,head.m_nTotalFragment,head.m_nFragmentIndex,
- head.m_usFragOffset,head.m_nTotalSize,head.m_usPayloadSize);
- OutputDebugString(str);
-
- }
-
- head.m_bLastFragment = 1;
- head.m_nFragmentIndex = i;
- head.m_usFragOffset = i*RTP_SPLIT_PACKSIZE;
- head.m_usPayloadSize = nLen - (i*RTP_SPLIT_PACKSIZE);
-
- memcpy(tem,&head,sizeof(HeadExt));
- memcpy(tem+sizeof(HeadExt),pBuff+i*RTP_SPLIT_PACKSIZE,nLen - (i*RTP_SPLIT_PACKSIZE));
-
- sendto(tem,nLen+sizeof(HeadExt),ip,port);
-
- CString str;
- str.Format("发送数据:m_nSeqNumber%d ,m_nTotalFragment %d,m_nFragmentIndex %d,
- m_usFragOffset %d,m_nTotalSize %d,m_nDataLen %d\r\n",head.m_nSeqNumber,
- head.m_nTotalFragment,head.m_nFragmentIndex,head.m_usFragOffset,
- head.m_nTotalSize,head.m_usPayloadSize);
-
- OutputDebugString(str);
- }
接收数据的回调
//代码很简单,收到数据后,形成PacketIn,然后找一个链表,插进去
//因为收到的数据的顺序是不定的,所以先创建一个链表,每帧首数据到来时,分配一个节点,其他分片到来的时候,都加到//这个节点里,等数据都到齐了,这个节点就是一个完整的包,回调出去。
- void RecvCallback(BYTE* pBuff,UINT nLen,ULONG ip,USHORT port)
- {
- if(nLen < sizeof(HeadExt))
- return;
- HeadExt* pHead = (HeadExt*)pBuff;
- CString str = pHead->flag;
- if(str != "aaaa")
- {
- return;
- }
- if(pHead->m_nTotalFragment == 1)
- {
-
- if(m_pfDispatch)
- m_pfDispatch(pBuff+sizeof(HeadExt),nLen-sizeof(HeadExt),ip,port,m_lpParam);
- return;
- }
-
- PacketIn data( (char*)pBuff, 64000, nLen );
- if ( !data.Normalize() )
- return;
- if ( data.head.m_nTotalFragment>1 )
- {
- Packet* pTemp = GetPartialPacket( iaRemote, data.head.m_nSeqNumber );
- if ( pTemp )
- {
- if ( pTemp ->InsertFragment( &data ) )
- {
- m_pfDispatch(pTemp ->m_pBuffer,
- pTemp ->recvedbytes,ip,port,m_lpParam);
- }
- }
- }
- }
//上面用到的一些变量可以在一个.h里面定义,比如:
Packet TempPacket[16];//分配一个数组,每个节点是一个重组后临时缓冲
Packet* GetPartialPacket(const CVLInetAddr& iaFrom, UINT nSeqNumber);//从数组里取出一个缓冲节点
-
-
- Packet* GetPartialPacket(const ULONG ip,USHORT port, UINT nSeqNumber)
- {
- Packet* tmp = NULL;
- int i=0;
- while(tmp==NULL && i<16)
- {
-
- if(TempPacket[i].seqnum==nSeqNumber && TempPacket[i].recvedpacks>0)
- {
- tmp = &TempPacket[i];
- break;
- }
- i++;
- }
- if(tmp == NULL)
- {
-
- for(i=0;i<16;i++)
- {
- if(!TempPacket[i].m_bUsed)
- break;
- }
-
- if(i>=16)
- {
-
- tmp = &TempPacket[0];
- int j = 0;
- for(i=1;i<16;i++)
- {
- if(TempPacket[i].seqnum < tmp->seqnum)
- {
- tmp = &TempPacket[i];
- j = i;
- }
- }
-
- if(tmp->m_pBuffer)
- {
- delete []tmp->m_pBuffer;
- tmp->reset();
- }
- }
- else
- tmp = &TempPacket[i];
- }
-
- InsertFragment
- tmp->m_bUsed = true;
- tmp->seqnum = nSeqNumber;
-
- return tmp;
- }
整个示例最重要的是两个函数:
GetPartialPacket:取到一个临时缓冲节点
InsertFragment:把收到的每个数据段插入临时缓冲组成一个完整帧
当然发送方也要按一定规则分割发送。
(秩名) |