最近在写一个Linux下的多线程的网络聊天软件,以前一直都是来一个连接开一个线程去recv,最近刚看了select函数的用法,就把select封成了一个类,外界只要创建一个该类的对象,然后把socket注册进来就可以。
本人新手,如果有什么问题或者bug或者可以改进的地方请各位多指教!!
初始化后开一个线程select,同时监听一个udp端口,用于注册socket或注销socket时主线程通知select的线程。
加锁和端口管理未实现…………
socketNotifier.hpp
- #ifndef SOCKETNOTIFIER_HPP_
- #define SOCKETNOTIFIER_HPP_
-
- #include <map>
- #include <set>
- #include <pthread.h>
-
- #ifdef WIN32
- #include <WinSock2.h>
- #include <WS2tcpip.h>
-
- #else
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #endif
-
- #include <sys/types.h>
-
-
-
-
-
- class CSocketNotifier
- {
- public:
- CSocketNotifier();
- ~CSocketNotifier();
-
- public:
-
- typedef bool ( *sockCallBack )( void* );
- typedef struct tagSockParam
- {
- int sock_fd;
- void* pUsr;
- }SOCKPARAM;
-
-
-
-
-
-
- unsigned short getFreePort();
-
-
-
-
-
-
-
-
-
- bool registerSocket( int sock_fd, sockCallBack pFun, void* pParam );
-
-
-
-
-
-
-
- bool unregisterSocket( int sock_fd );
-
-
-
-
-
-
-
- static void* waitForEvent( void * );
-
-
-
-
-
-
-
- static bool renewFD( int event_fd, std::set<int> &fdSet
- , fd_set *pre_fdsr, int &iMaxFD );
-
-
-
-
-
-
- bool notify( fd_set* pfdsr, int iEventNum );
- private:
-
-
-
-
-
- bool sendNoticeToThread( int sock_fd );
-
- private:
-
- std::map< int, std::pair < sockCallBack, void* > > m_aFun;
- pthread_t m_threadID;
-
- bool m_bRun;
-
-
- bool m_bInit;
-
-
- int m_socketFD;
- int m_socketForNotify;
- unsigned short m_uPort;
-
-
- static const int MAX_SOCKET_NUM = 32;
-
-
- static const int MAX_COM_LEN = 20;
- const unsigned short SOCKET_NOTIFY_PORT;
- };
-
- #endif
socketNotifier.cpp
#include "StdAfx.h"
#include "SocketNotifier.h"
#include <iostream>
CSocketNotifier::CSocketNotifier()
: m_bRun( true )
, SOCKET_NOTIFY_PORT( 1924 )
{
/** do-while 为了初始化失败时break */
do
{
/** 线程中用于接受主线程发出的控制信令的socket */
if ( ( m_socketFD = socket( AF_INET, SOCK_DGRAM, 0 ) ) == -1 )
{
m_bInit = false;
perror( "m_socketFD init failed!" );
break;
}
/** 用于向线程发送信令的socket */
if ( ( m_socketForNotify = socket( AF_INET, SOCK_DGRAM, 0 ) ) == -1 )
{
m_bInit = false;
perror( "m_socketForNotify init failed!" );
break;
}
sockaddr_in localAddr;
m_uPort = getFreePort();
localAddr.sin_addr.s_addr = inet_addr( "127.0.0.1" );
localAddr.sin_family = AF_INET;
localAddr.sin_port = htons( m_uPort );
if ( bind( m_socketFD, ( sockaddr* )&localAddr, sizeof( sockaddr ) ) == -1 )
{
m_bInit = false;
perror( "Socket bind failed!" );
break;
}
SOCKPARAM* pParam = new SOCKPARAM;
pParam->pUsr = ( void* )this;
pParam->sock_fd = m_socketFD;
/** 开线程select */
int ret = pthread_create( &m_threadID, NULL, waitForEvent, pParam );
if ( 0 != ret )
{
perror( "CSocketNotifier:CSocketNotifier" );
m_bInit = false;
}
else
{
m_bInit = true;
}
} while ( 0 );
}
CSocketNotifier::~CSocketNotifier()
{
m_bRun = false;
pthread_join( m_threadID, NULL );
}
bool CSocketNotifier::registerSocket( int sock_fd, sockCallBack pFun, void* pParam )
{
if ( sock_fd < 0 )
{
std::cout<<"Illegal param!"<<std::endl;
return false;
}
/** 如果找到就说明已经有注册过,不用再注册 */
if ( m_aFun.find( sock_fd ) != m_aFun.end() )
{
return true;
}
/** 如果没找到则要添加 */
m_aFun[sock_fd] = std::make_pair<sockCallBack, void*>( pFun, pParam);
/** 通知select线程添加新的socket */
if ( !sendNoticeToThread( sock_fd ) )
{
return false;
}
return true;
}
bool CSocketNotifier::unregisterSocket( int sock_fd )
{
if ( sock_fd < 0 )
{
std::cout<<"Illegal param!"<<std::endl;
return false;
}
/** 找到就删 */
if ( m_aFun.find( sock_fd ) != m_aFun.end() )
{
m_aFun.erase( sock_fd );
}
/** 通知select线程删除socket */
if ( !sendNoticeToThread( sock_fd ) )
{
return false;
}
return true;
}
bool CSocketNotifier::renewFD( int event_fd, std::set<int> &fdSet, fd_set *pre_fdsr, int &iMaxFD )
{
/** 是已经有了的socket,说明是要移除 */
if ( FD_ISSET( event_fd, pre_fdsr ) )
{
/** 移除后找到最大的socket */
iMaxFD = 0;
std::set<int>::const_iterator it = fdSet.begin();
for ( ; it != fdSet.end(); ++it )
{
if ( *it > iMaxFD )
{
iMaxFD = *it;
}
}
FD_CLR( event_fd, pre_fdsr );
fdSet.erase( event_fd );
std::cout<<"Remove a fd"<<std::endl;
}
/** 是已新来的socket,说明是要添加 */
else
{
if ( event_fd == iMaxFD )
{
if ( event_fd > iMaxFD )
{
iMaxFD = event_fd;
}
}
FD_SET( event_fd, pre_fdsr );
fdSet.insert( event_fd );
std::cout<<"Add a fd"<<std::endl;
}
return true;
}
void* CSocketNotifier::waitForEvent( void* param )
{
SOCKPARAM* pParam = ( SOCKPARAM* )param;
CSocketNotifier *pSocketNotifier = ( CSocketNotifier * )( pParam->pUsr );
std::set<int> fdSet;
fd_set fdsr, pre_fdsr;
FD_ZERO( &pre_fdsr );
FD_ZERO( &pre_fdsr );
FD_SET( pParam->sock_fd, &pre_fdsr );
fdSet.insert( pParam->sock_fd );
struct timeval tv;
tv.tv_sec = 30;
tv.tv_usec = 0;
/** 最大的sock */
int iMaxFD = pParam->sock_fd;
int iEvenNum;
while ( 1 )
{
memcpy( &fdsr, &pre_fdsr, sizeof( fd_set ) );
iEvenNum = select( iMaxFD + 1, &fdsr, NULL, NULL, &tv );
/** 时间到 */
if ( 0 == iEvenNum )
{
continue;
}
/** 出错 */
else if ( 0 > iEvenNum )
{
perror( "select" );
break;
}
/** 正常处理 */
else
{
/** 主线程发来的控制信令 */
if ( FD_ISSET( pParam->sock_fd, &fdsr ) )
{
char szRecvBuf[MAX_COM_LEN + 1];
unsigned int uiRecvLen;
socklen_t iSockAddrLen = sizeof( struct sockaddr );
struct sockaddr_in stClientAddr;
int event_fd;
int iRet = recvfrom( pParam->sock_fd,szRecvBuf,MAX_COM_LEN,0,( struct sockaddr * ) ( &stClientAddr ),&iSockAddrLen );
/** 网络错误或关闭时返回 */
if ( 0 == uiRecvLen || -1 == uiRecvLen )
{
perror( "CSocketNotifier: recvfrom" );
break;
}
memcpy( &event_fd, szRecvBuf, sizeof( int ) );
/** 更新socket集合 */
renewFD( event_fd, fdSet, &pre_fdsr, iMaxFD );
--iEvenNum;
}
/** 通知回调函数 */
pSocketNotifier->notify( &fdsr, iEvenNum );
}
}
delete pParam;
return NULL;
}
bool CSocketNotifier::notify( fd_set* pfdsr, int iEventNum )
{
/** RemainToDo 加锁 */
std::map< int, std::pair< sockCallBack, void* > >::const_iterator it = m_aFun.begin();
for ( ; ( it != m_aFun.end() ) && ( iEventNum != 0 ); ++it )
{
if ( FD_ISSET( it->first, pfdsr ) )
{
( *( it->second.first ) )( it->second.second );
--iEventNum;
}
}
return true;
}
unsigned short CSocketNotifier::getFreePort()
{
/** 暂时未实现 */
return SOCKET_NOTIFY_PORT;
}
bool CSocketNotifier::sendNoticeToThread( int sock_fd )
{
int iRet;
char buf[ MAX_COM_LEN ] = { 0 };
sockaddr_in localAddr;
localAddr.sin_addr.s_addr = inet_addr( "127.0.0.1" );
localAddr.sin_port = htons( m_uPort );
localAddr.sin_family = AF_INET;
memset( localAddr.sin_zero, 0, sizeof( localAddr.sin_zero ) );
memcpy( buf, &sock_fd, sizeof( int ) );
iRet = sendto( m_socketForNotify,
buf,
MAX_COM_LEN,
0,
( sockaddr* )&localAddr,
sizeof( sockaddr ) );
if ( -1 == iRet)
{
perror( "CSocketNotifier:sendNoticeToThread" );
return false;
}
return true;
}
(hustxyj) |