织梦CMS - 轻松建站从此开始!

罗索

用IOCP实现线程池. 

罗索客 发布于 2007-08-27 17:40 点击:次 
MSDN 里对IOCP 得这种用法也做了介绍, 可以在MSDN 里搜索thread pooling 来查询. 有个CThreadPool的例子可以参考。
TAG:

MSDN 里对IOCP 得这种用法也做了介绍, 可以在MSDN 里搜索thread pooling 来查询.
有个CThreadPool的例子可以参考。
vs2003里得ATL还专门提供CThreadPool来支持.这里我把那段代码扣了过来.并做了一些注释.
CThreadPool必须和自己写得worker类配合使用.
//
// CThreadPool
// This class is a simple IO completion port based thread pool
// Worker:
// is a class that is responsible for handling requests
// queued on the thread pool.
// It must have a typedef for RequestType, where request type
// is the datatype to be queued on the pool
// RequestType must be castable to (DWORD)
// The value -1 is reserved for shutdown
// of the pool
// Worker must also have a void Execute(RequestType request, void *pvParam, OVERLAPPED *pOverlapped) function
// ThreadTraits:
// is a class that implements a static CreateThread function
// This allows for overriding how the threads are created
#define ATLS_POOL_SHUTDOWN ((OVERLAPPED*) ((__int64) -1))
template <class Worker, class ThreadTraits=DefaultThreadTraits>
class CThreadPool : public IThreadPoolConfig
{
protected:
CSimpleMap<DWORD, HANDLE> m_threadMap;
DWORD m_dwThreadEventId;
CComCriticalSection m_critSec;
DWORD m_dwStackSize;
DWORD m_dwMaxWait;
void *m_pvWorkerParam;
LONG m_bShutdown;
HANDLE m_hThreadEvent;
HANDLE m_hRequestQueue;
public:
CThreadPool() throw() :
m_hRequestQueue(NULL),
m_pvWorkerParam(NULL),
m_dwMaxWait(ATLS_DEFAULT_THREADPOOLSHUTDOWNTIMEOUT),
m_bShutdown(FALSE),
m_dwThreadEventId(0),
m_dwStackSize(0)
{
}
~CThreadPool() throw()
{
Shutdown();
}
// Initialize the thread pool
// if nNumThreads > 0, then it specifies the number of threads
// if nNumThreads < 0, then it specifies the number of threads per proc (-)
// if nNumThreads == 0, then it defaults to two threads per proc
// hCompletion is a handle of a file to associate with the completion port
// pvWorkerParam is a parameter that will be passed to Worker::Execute
// dwStackSize:
// The stack size to use when creating the threads
HRESULT Initialize(void *pvWorkerParam=NULL, int nNumThreads=0, DWORD dwStackSize=0, HANDLE hCompletion=INVALID_HANDLE_VALUE) throw()
{
ATLASSERT( m_hRequestQueue == NULL );
if (m_hRequestQueue) // Already initialized
return AtlHresultFromWin32(ERROR_ALREADY_INITIALIZED);
if (S_OK != m_critSec.Init())
return E_FAIL;
m_hThreadEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (!m_hThreadEvent)
{
m_critSec.Term();
return AtlHresultFromLastError();
}
// Create IO completion port to queue the requests
m_hRequestQueue = CreateIoCompletionPort(hCompletion, NULL, 0, nNumThreads);
if (m_hRequestQueue == NULL)
{
// failed creating the Io completion port
m_critSec.Term();
CloseHandle(m_hThreadEvent);
return AtlHresultFromLastError();
}
m_pvWorkerParam = pvWorkerParam; // 这个pvWorkerParam是对所有thread都共有的数据,.
//QueueRequest 中所传递的request, 才是实现线程池中可以执行不同任务的关键.
m_dwStackSize = dwStackSize;
HRESULT hr = SetSize(nNumThreads);
if (hr != S_OK)
{
// Close the request queue handle
CloseHandle(m_hRequestQueue);
// Clear the queue handle
m_hRequestQueue = NULL;
// Uninitialize the critical sections
m_critSec.Term();
CloseHandle(m_hThreadEvent);

return hr;
}
return S_OK;
}
// Shutdown the thread pool
// This function posts the shutdown request to all the threads in the pool
// It will wait for the threads to shutdown a maximum of dwMaxWait MS.
// If the timeout expires it just returns without terminating the threads.
void Shutdown(DWORD dwMaxWait=0) throw()
{
if (!m_hRequestQueue) // Not initialized
return;
CComCritSecLock<CComCriticalSection> lock(m_critSec, false);
if (FAILED(lock.Lock()))
{
// out of memory
ATLASSERT( FALSE );
return;
}

if (dwMaxWait == 0)
dwMaxWait = m_dwMaxWait;
HRESULT hr = InternalResizePool(0, dwMaxWait);
if (hr != S_OK)
ATLTRACE(atlTraceUtil, 0, _T("Thread pool not shutting down cleanly : x"), hr);
// If the threads have not returned, then something is wrong
for (int i = m_threadMap.GetSize() - 1; i >= 0; i--)
{
HANDLE hThread = m_threadMap.GetValueAt(i);
DWORD dwExitCode;
GetExitCodeThread(hThread, &dwExitCode);
if (dwExitCode == STILL_ACTIVE)
{
ATLTRACE(atlTraceUtil, 0, _T("Terminating thread"));
TerminateThread(hThread, 0);
}
CloseHandle(hThread);
}
// Close the request queue handle
CloseHandle(m_hRequestQueue);
// Clear the queue handle
m_hRequestQueue = NULL;
ATLASSERT(m_threadMap.GetSize() == 0);
// Uninitialize the critical sections
lock.Unlock();
m_critSec.Term();
CloseHandle(m_hThreadEvent);
}
// IThreadPoolConfig methods
HRESULT STDMETHODCALLTYPE SetSize(int nNumThreads) throw()
{
if (nNumThreads == 0)
nNumThreads = -ATLS_DEFAULT_THREADSPERPROC;
if (nNumThreads < 0)
{
SYSTEM_INFO si;
GetSystemInfo(&si);
nNumThreads = (int) (-nNumThreads) * si.dwNumberOfProcessors;
}
return InternalResizePool(nNumThreads, m_dwMaxWait);
}
HRESULT STDMETHODCALLTYPE GetSize(int *pnNumThreads) throw()
{
if (!pnNumThreads)
return E_POINTER;
*pnNumThreads = GetNumThreads();
return S_OK;
}
HRESULT STDMETHODCALLTYPE SetTimeout(DWORD dwMaxWait) throw()
{
m_dwMaxWait = dwMaxWait;
return S_OK;
}
HRESULT STDMETHODCALLTYPE GetTimeout(DWORD *pdwMaxWait) throw()
{
if (!pdwMaxWait)
return E_POINTER;
*pdwMaxWait = m_dwMaxWait;
return S_OK;
}
// IUnknown methods
HRESULT STDMETHODCALLTYPE QueryInterface(REFIID riid, void **ppv) throw()
{
if (!ppv)
return E_POINTER;
*ppv = NULL;
if (InlineIsEqualGUID(riid, __uuidof(IUnknown)) ||
InlineIsEqualGUID(riid, __uuidof(IThreadPoolConfig)))
{
*ppv = static_cast<IThreadPoolConfig*>(this);
AddRef();
return S_OK;
}
return E_NOINTERFACE;
}
ULONG STDMETHODCALLTYPE AddRef() throw()
{
return 1;
}
ULONG STDMETHODCALLTYPE Release() throw()
{
return 1;
}

HANDLE GetQueueHandle() throw()
{
return m_hRequestQueue;
}
int GetNumThreads() throw()
{
return m_threadMap.GetSize();
}
// QueueRequest adds a request to the thread pool
// it will be picked up by one of the threads and dispatched to the worker
// in WorkerThreadProc
// 注意这个request, 这个是用来给线程传递关键信息得,譬如传递task对象得指针.
BOOL QueueRequest(typename Worker::RequestType request) throw()
{
if (!PostQueuedCompletionStatus(m_hRequestQueue, 0, (ULONG_PTR) request, NULL))
return FALSE;
return TRUE;
}

protected:
DWORD ThreadProc() throw()
{
DWORD dwBytesTransfered;
ULONG_PTR dwCompletionKey;
OVERLAPPED* pOverlapped;
// this block is to ensure theWorker gets destructed before the
// thread handle is closed
{
// We instantiate an instance of the worker class on the stack
// for the life time of the thread.
Worker theWorker;
if (theWorker.Initialize(m_pvWorkerParam) == FALSE)
{
return 1;
}
// Event需要被激发以通知创建函数自己被创建成功了,这样该线程得信息才能被加到map中做管理.
SetEvent(m_hThreadEvent);
// Get the request from the IO completion port
while (GetQueuedCompletionStatus(m_hRequestQueue, &dwBytesTransfered, &dwCompletionKey, &pOverlapped, INFINITE))
{
if (pOverlapped == ATLS_POOL_SHUTDOWN) // Shut down
{
LONG bResult = InterlockedExchange(&m_bShutdown, FALSE);
if (bResult) // Shutdown has not been cancelled
break;
// else, shutdown has been cancelled -- continue as before
}
else // Do work
{
Worker::RequestType request = (Worker::RequestType) dwCompletionKey;
// Process the request. Notice the following:
// (1) It is the worker''s responsibility to free any memory associated
// with the request if the request is complete
// (2) If the request still requires some more processing
// the worker should queue the request again for dispatching
theWorker.Execute(request, m_pvWorkerParam, pOverlapped);
}
}
theWorker.Terminate(m_pvWorkerParam);
}
m_dwThreadEventId = GetCurrentThreadId();
SetEvent(m_hThreadEvent);
return 0;
}
static DWORD WINAPI WorkerThreadProc(LPVOID pv) throw()
{
CThreadPool* pThis =
reinterpret_cast< CThreadPool* >(pv);
return pThis->ThreadProc();
}
// 这个func可以用来调节线程池中线程数得多少,是这个class中比较关键得一个函数
HRESULT InternalResizePool(int nNumThreads, int dwMaxWait) throw()
{
if (!m_hRequestQueue) // Not initialized
return E_FAIL;
CComCritSecLock<CComCriticalSection> lock(m_critSec, false);
if (FAILED(lock.Lock()))
{
// out of memory
ATLASSERT( FALSE );
return E_FAIL;
}
int nCurThreads = m_threadMap.GetSize();
if (nNumThreads == nCurThreads)
{
return S_OK;
}
else if (nNumThread[FS:PAGE]s < nCurThreads)
{
int nNumShutdownThreads = nCurThreads - nNumThreads;
for (int nThreadIndex = 0; nThreadIndex < nNumShutdownThreads; nThreadIndex++)
{
ResetEvent(m_hThreadEvent);
InterlockedExchange(&m_bShutdown, TRUE);
// 因为IOCP得特性,将随机发送给一个线程关闭消息
PostQueuedCompletionStatus(m_hRequestQueue, 0, 0, ATLS_POOL_SHUTDOWN);
DWORD dwRet = WaitForSingleObject(m_hThreadEvent, dwMaxWait);
if (dwRet == WAIT_TIMEOUT) // 超时
{
LONG bResult = InterlockedExchange(&m_bShutdown, FALSE);
if (bResult) // Nobody picked up the shutdown message
{
return AtlHresultFromWin32(WAIT_TIMEOUT);
}
}
else if (dwRet != WAIT_OBJECT_0) // 其他错误
{
return AtlHresultFromLastError();
}
// 在map中删除对应项
int nIndex = m_threadMap.FindKey(m_dwThreadEventId);
if (nIndex != -1)
{
HANDLE hThread = m_threadMap.GetValueAt(nIndex);
// Wait for the thread to shutdown
if (WaitForSingleObject(hThread, 60000) == WAIT_OBJECT_0)
{
CloseHandle(hThread);
m_threadMap.RemoveAt(nIndex);
}
else
{
// Thread failed to terminate
return E_FAIL;
}
}
}
}
else
{
int nNumNewThreads = nNumThreads - nCurThreads;
// Create and initialize worker threads
for (int nThreadIndex = 0; nThreadIndex < nNumNewThreads; nThreadIndex++)
{
DWORD dwThreadID;
ResetEvent(m_hThreadEvent);
CHandle hdlThread( ThreadTraits::CreateThread(NULL, m_dwStackSize, WorkerThreadProc, (LPVOID)this, 0, &dwThreadID) );
if (!hdlThread)
{
HRESULT hr = AtlHresultFromLastError();
ATLASSERT(hr != S_OK);
return hr;
}
DWORD dwRet = WaitForSingleObject(m_hThreadEvent, dwMaxWait);
// 线程创建成功,但初始化没有成功,这个地方没有CloseHandle,因为CHandle类得析购中带有Closehandle得功能.
if (dwRet != WAIT_OBJECT_0)
{
if (dwRet == WAIT_TIMEOUT)
{
return HRESULT_FROM_WIN32(WAIT_TIMEOUT);
}
else
{
return AtlHresultFromLastError();
}
}
if (m_threadMap.Add(dwThreadID, hdlThread) != FALSE)
{
// detach 不会改变thread 的reference count,很奇怪的行为
hdlThread.Detach();
}
}
}
return S_OK;
}
}; // class CThreadPool

ATL 还提供了一个无状态的worker类的实现. worker是一个template 参数,只要实现了worker规定的接口,就是一个worker. 比起c++中的继承.灵活性和性能都有不少的提高.下面是他的代码.
template <class Worker>
class CNonStatelessWorker
{
public:
typedef typename Worker::RequestType RequestType;
BOOL Initialize(void * /*pvParam*/) throw()
{
return TRUE;
}
void Execute(typename Worker::RequestType request, void *pvWorkerParam, OVERLAPPED *pOverlapped)
{
// 这个worker 是无状态的,所以可以从新构建一个,而实际上,object 往往都是从request传递而来,以保持task
// 的状态.
Worker worker;
if (worker.Initialize(pvWorkerParam))
{
worker.Execute(request, pvWorkerParam, pOverlapped);
worker.Terminate(pvWorkerParam);
}
}
void Terminate(void* /*pvParam*/) throw()
{
}
}; // class CNonStatelessWorker

以上就是ATL 实现ThreadPool的关键代码,因为采用了IOCP, 简化了许多逻辑. 通过这一段代码,再次让我感受到了ATL的强大, 如果抛开学习的复杂性来说, ATL 是个比mfc要轻便和高效的多的框架.尤其是ATL7.0,比ATL3.0做了许多增强, 在实际工作中,即使不用他做FrameWork, 也可以从他的Lib中受益很大.

(iwgh)
本站文章除注明转载外,均为本站原创或编译欢迎任何形式的转载,但请务必注明出处,尊重他人劳动,同学习共成长。转载请注明:文章转载自:罗索实验室 [http://www.rosoo.net/a/200708/6780.html]
本文出处:网络博客 作者:iwgh
顶一下
(1)
100%
踩一下
(0)
0%
------分隔线----------------------------
发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片
栏目列表
将本文分享到微信
织梦二维码生成器
推荐内容