使用ACE中的Proactor的话,会要比我们使用我们直接写的要来得简单。 在说Proactor之前我们需要了解Windows里的完成端口的工作原理。 完成端口是WinNT内核里的一个框架。我们可以为我们的一些异步的操作 新建一个完成端口,然后这个完成端口会有几个工作线程来处理。我们 可以将socket,或是一个文件读写,或是一个串口的收发数据的句柄, 梆定到这个完成端口之上,当一个读或是写的事件完成之后,完成端口 机制将会自动将一个完成消息放到完成队列中,完成端口的工作线程池 将会被触发调用。回调的时候我们梆定时将一些基础的信息也梆在其中, 当工作线程也会通过一种叫做完成项的指针返回给你。就是说,你可能 梆定了多个socket或是文件都是没有问题的。按微软人写的文档里说的 可以面对百万个这样的异步对象。 这里我就不再使用WinAPI写完成端口了。 现在是使用ACE框架来写一个。 使用他来做一个完成端口步骤也是一样的。 开始的时候需要一个完成端口,还有完成端口的工作线程池。在ACE框架 里提供了一种叫ACE_Task的线程池模块类 和一般的线程类一样,它的工作时调用的函数是 virtual int svc (void); 只是如何使用呢。无非是开启线程与关闭线程两个操作。 在此类中定义一个ACE_Thread_Semaphore sem_;变量 然后开户n个线程的过程就是这样的: int Proactor_Task::star(int nMax) { ... this->activate (THR_NEW_LWP, nMax); for (;nMax>0;nMax--) { sem_.acquire(); } return 0; } 一个是创建,二个是一个一个的触发。让这一些线程都工作. 当然工作线程都要释放自己:
int Proactor_Task::svc() { ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\\n"))); sem_.release(1); ... return 0; }
好了。这个线程池开始工作了。接下来,我们要做将完成端口对象给创建出来: 在这个线程池里定义一个完成端口对象指针: ACE_Proactor * proactor_; 创建的过程是这样的。 //是在Win32下,就使用这个Proactor的实现 ACE_WIN32_Proactor *proactor_impl = new ACE_WIN32_Proactor(); //新建proactor的实现 proactor_=new ACE_Proactor(proactor_impl,1); //与proactor关联 ACE_Proactor::instance (this->proactor_, 1); //将新建出来的proactor保存在静态框架里
如何删除呢。 ACE_Proactor::end_event_loop(); this->wait();
之后来写线程池里的函数 ACE_Proactor::run_event_loop(); 只要写一句就OK了。
这就完成了一个完成端口对象的创建过程。我们只要做一下封装就OK了。 给它一个工作线程的大小。之后它就会自动的新建一个完成端口在ACE_Proactor::instance里。
接下来我们要做Acceptor与recv。 实计上ACE里已经为我们写好了。它们就是: ACE_Asynch_Acceptor类ACE_Service_Handler类 class Accepte : public ACE_Asynch_Acceptor<Receive> class Receive : public ACE_Service_Handler 这样一继承,工作就已经完成了。 如果我们想得到这一些网络事件的话,可以做一些继承就OK了。 他们内部调用的过程是这样的: 当有一个新的用户连接上来之后。 Accepte会有一个函数回调。 virtual HANDLER *make_handler (void); 这个函数里,我们必需写一个new Receive对象。 new完成之后Receive的open函数将会回调 open函数调用的时候,此接收对象的socket句柄就得到了。我们就在这个时候需要将一个 读、写的流梆定在其中。还有就是做一步异步的接收数据。这个如果你写过重叠方式的话就会 比较的了解,这也叫做异步的I/O的投递。等这个recv完成之后就会回调。 好了。这就算完成了。现在把代码贴出来。 我不知道如何做一个下载点。不好意思只有大家自己复制下来 我使用的VC6.0,ACE5.4.1的编程环境
// Accepte.h: interface for the Accepte class. // //////////////////////////////////////////////////////////////////////
#if !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_) #define AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_
#if _MSC_VER > 1000 #pragma once #endif // _MSC_VER > 1000 #include <ace/Asynch_Acceptor.h> #include "Receive.h" class Accepte : public ACE_Asynch_Acceptor<Receive> { public: Receive* make_handler (void); Accepte(); virtual ~Accepte();
};
#endif // !defined(AFX_ACCEPTE_H__DCEC809D_E5D2_48D1_A8A7_C9FD3C4D7C15__INCLUDED_)
// Accepte.cpp: implementation of the Accepte class. // //////////////////////////////////////////////////////////////////////
#include "Accepte.h"
////////////////////////////////////////////////////////////////////// // Construction/Destruction //////////////////////////////////////////////////////////////////////
Accepte::Accepte() {
}
Accepte::~Accepte() {
}
Receive* Accepte::make_handler(void) { return new Receive(); }
// Proactor_Task.h: interface for the Proactor_Task class. // //////////////////////////////////////////////////////////////////////
#if !defined(AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_) #define AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_
#if _MSC_VER > 1000 #pragma once #endif // _MSC_VER > 1000 #include "ace\\Task_T.h" #include "ace\\Thread_Semaphore.h"
#include "ace\\Proactor.h" #include "ace\\WIN32_Proactor.h" class Proactor_Task : public ACE_Task<ACE_MT_SYNCH> { public: Proactor_Task(); virtual ~Proactor_Task(); int star(int nMax); int stop(); virtual int svc (void); int create_proactor(); int release_proactor(); ACE_Thread_Semaphore sem_; ACE_Proactor * proactor_; };
#endif // !defined(AFX_PROACTOR_TASK_H__12F37C95_9872_4923_89A2_5A59AE7AC1FD__INCLUDED_)
// Proactor_Task.cpp: implementation of the Proactor_Task class. // //////////////////////////////////////////////////////////////////////
#include "Proactor_Task.h"
////////////////////////////////////////////////////////////////////// // Construction/Destruction //////////////////////////////////////////////////////////////////////
Proactor_Task::Proactor_Task() {
}
Proactor_Task::~Proactor_Task() {
}
int Proactor_Task::star(int nMax) { create_proactor(); this->activate (THR_NEW_LWP, nMax); for (;nMax>0;nMax--) { sem_.acquire(); } return 0; }
int Proactor_Task::stop() { ACE_Proactor::end_event_loop(); this->wait(); return 0; }
int Proactor_Task::release_proactor() { ACE_Proactor::close_singleton (); proactor_ = 0; return 0; } int Proactor_Task::create_proactor() { ACE_WIN32_Proactor *proactor_impl = 0;
ACE_NEW_RETURN (proactor_impl, ACE_WIN32_Proactor, -1); // always delete implementation 1 , not !(proactor_impl == 0) ACE_NEW_RETURN (this->proactor_, ACE_Proactor (proactor_impl, 1 ), -1); // Set new singleton and delete it in close_singleton() ACE_Proactor::instance (this->proactor_, 1);
return 0; }
int Proactor_Task::svc() { ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\\n"))); sem_.release(1); ACE_Proactor::run_event_loop(); return 0; }
// Receiv[FS:PAGE]e.h: interface for the Receive class. // //////////////////////////////////////////////////////////////////////
#if !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_) #define AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_
#if _MSC_VER > 1000 #pragma once #endif // _MSC_VER > 1000 #include <ace/Asynch_io.h> #include <ace/Message_Block.h> #include <ace/Log_Msg.h> #include <ace/OS_Memory.h> class Receive : public ACE_Service_Handler { public: Receive(); virtual ~Receive() { if (this->handle() != ACE_INVALID_HANDLE ) { closesocket(SOCKET(this->handle())); } } virtual void open(ACE_HANDLE h,ACE_Message_Block& ); virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result); virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result); private: ACE_Asynch_Write_Stream write_; ACE_Asynch_Read_Stream reader_;
};
#endif // !defined(AFX_RECEIVE_H__0E7EF8C0_465F_4D9C_8A29_0C2A0F1EAFFE__INCLUDED_)
// Receive.cpp: implementation of the Receive class. // //////////////////////////////////////////////////////////////////////
#include "Receive.h"
////////////////////////////////////////////////////////////////////// // Construction/Destruction //////////////////////////////////////////////////////////////////////
Receive::Receive() {
} void Receive::open(ACE_HANDLE h,ACE_Message_Block& ) { this->handle(h); if (this->write_.open(*this)!=0 || this->reader_.open(*this) != 0 ) { delete this; return ; } ACE_Message_Block *mb; ACE_NEW_NORETURN(mb,ACE_Message_Block(1024)); if ( this->reader_.read(*mb,mb->space()) != 0) { ACE_ERROR((LM_ERROR,ACE_TEXT(" (%t) error information %p."))); mb->release(); delete this; return; } } void Receive::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { ACE_Message_Block &mb = result.message_block(); if ( !result.success() || result.bytes_transferred() == 0) { mb.release(); delete this; } else { ACE_Message_Block* new_mb; ACE_NEW_NORETURN(new_mb,ACE_Message_Block(1024)); this->reader_.read(*new_mb,new_mb->space()); } return ; }
void Receive::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { result.message_block().release(); return ; }
//////////////////////////////////////////////////////////////////////////////////////////////////// //main.cpp
#ifdef _DEBUG #pragma comment(lib,"aced") #else #pragma comment(lib,"ace") #endif #include <ace\\ace.h> #include "Accepte.h" #include "Proactor_Task.h"
int ACE_TMAIN(int ,char*[]) { Proactor_Task task; task.star(3);
Accepte accepte; accepte.open(ACE_INET_Addr (2222), 0, 1,ACE_DEFAULT_BACKLOG,1,ACE_Proactor::instance()); int nExit=0; while (nExit==0) scanf("%d",&nExit); return 0; }
(iwgh) |