1.ACE反应器框架简介
反应器(Reactor):用于事件多路分离和分派的体系结构模式
通常的,对一个文件描述符指定的文件或设备, 有两种工作方式: 阻塞与非阻塞。所谓阻塞方式的意思是指, 当试图对该文件描述符进行读写时,如果当时没有东西可读,或者暂时不可写, 程序就进入等待状态, 直到有东西可读或者可写为止。而对于非阻塞状态,如果没有东西可读, 或者不可写, 读写函数马上返回,而不会等待。
在前面的章节中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:当接收tcp数据时,如果远端没有数据可以读,则会一直阻塞到读到需要的数据为止。这种方式的传输和传统的被动方法的调用类似,非常直观,并且简单有效,但是同样也存在一个效率问题,如果你是开发一个面对着数千个连接的服务器程序,对每一个客户端都采用阻塞的方式通信,如果存在某个非常耗时的读写操作时,其它的客户端通信将无法响应,效率非常低下。
一种常用做法是:每建立一个Socket连接时,同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程的无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况。
另一种较高效的做法是:服务器端保存一个Socket连接列表,然后对这个列表进行轮询,如果发现某个Socket端口上有数据可读时(读就绪),则调用该socket连接的相应读操作;如果发现某个Socket端口上有数据可写时(写就绪),则调用该socket连接的相应写操作;如果某个端口的Socket连接已经中断,则调用相应的析构方法关闭该端口。这样能充分利用服务器资源,效率得到了很大提高。
在Socket编程中就可以通过select等相关API实现这一方式。但直接用这些API控制起来比较麻烦,并且也难以控制和移植,在ACE中可以通过Reactor模式简化这一开发过程。
反应器本质上提供一组更高级的编程抽象,简化了事件驱动的分布式应用的设计和实现。除此而外,反应器还将若干不同种类的事件的多路分离集成到易于使用的API中。特别地,反应器对基于定时器的事件、信号事件、基于I/O端口监控的事件和用户定义的通知进行统一地处理。
ACE中的反应器与若干内部和外部组件协同工作。其基本概念是反应器框架检测事件的发生(通过在OS事件多路分离接口上进行侦听),并发出对预登记事件处理器(eventhandler)对象中的方法的"回调"(callback)。该方法由应用开发者实现,其中含有应用处理此事件的特定代码。
使用ACE的反应器,只需如下几步:
-
创建事件处理器,以处理他所感兴趣的某事件。
-
在反应器上 登记,通知说他有兴趣处理某事件,同时传递他想要用以处理此事件的事件处理器的指针给反应器。
随后反应器框架将自动地:
-
在内部维护一些表,将不同的事件类型与事件处理器对象关联起来。
-
在用户已登记的某个事件发生时,反应器发出对处理器中相应方法的回调。
反应器模式在ACE中被实现为ACE_Reactor类,它提供反应器框架的功能接口。
如上面所提到的,反应器将事件处理器对象作为服务提供者使用。反应器内部记录某个事件处理器的特定事件的相关回调方法。当这些事件发生时,反应器会创建这种事件和相应的事件处理器的关联。
- 事件处理器 事件处理器就是需要通过轮询发生事件改变的对象列表中的对象,如在上面的例子中就是连接的客户端,每个客户端都可以看成一个事件处理器。
- 回调事件 就是反应器支持的事件,如Socket读就绪,写就绪。拿上面的例子来说,如果某个客户端(事件处理器)在反应器中注册了读就绪事件,当客户端给服务器发送一条消息的时候,就会触发这个客户端的数据可读的回调函数。
在反应器框架中,所有应用特有的事件处理器都必须由ACE_Event_Handler的抽象接口类派生。可以通过重载相应的"handle_"方法实现相关的回调方法。
使用ACE_Reactor基本上有三个步骤:
- 创建ACE_Event_Handler的子类,并在其中实现适当的"handle_"方法,以处理你想要此事件处理器为之服务的事件类型。
- 通过调用反应器对象的register_handler(),将你的事件处理器登记到反应器。
- 在事件发生时,反应器将自动回调相应的事件处理器对象的适当的handle_"方法。
下面我就以一个Socket客户端的例子为例简单的说明反应器的基本用法。

- #include
- #include
- #include
- #include
- #include
- using namespace std;
- class MyClient:public ACE_Event_Handler
- {
- public:
- bool open()
- {
- ACE_SOCK_Connector connector;
- ACE_INET_Addr addr(3000,"127.0.0.1");
- ACE_Time_Value timeout(5,0);
- if(connector.connect(peer,addr,&timeout) != 0)
- {
- coutreactor ());
- if (client->open () == -1)
- client->handle_close (ACE_INVALID_HANDLE, 0);
- return 0;
- }
- virtual int handle_close (ACE_HANDLE handle,
- ACE_Reactor_Mask close_mask)
- {
- if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE)
- {
- ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK |
- ACE_Event_Handler::DONT_CALL;
- this->reactor ()->remove_handler (this, m);
- this->acceptor_.close ();
- }
- return 0;
- }
- protected:
- ACE_SOCK_Acceptor acceptor_;
- };
- int main(int argc, char *argv[])
- {
- ACE_INET_Addr addr(3000,"192.168.1.142");
- ClientAcceptor server;
- server.reactor(ACE_Reactor::instance());
- server.open(addr);
- while(true)
- {
- ACE_Reactor::instance()->handle_events();
- }
- return 0;
- }
代码功能比较简单,需要注意以下几点:
-
这里注册事件的方式和前面的文章中方式不一样,是通过ACE_Event_Handler类的 reactor()方法设置和获取reactor的指针,比较直观和方便。前面的文章是通过ACE_Reactor::instance()来获取的一个单体reactor的指针。
- 当客户端Socket连接关闭时,需要释放相应资源,需要注意一下ClientService对象的handle_close方法中释放资源的相应代码。
定时器的实现
通过Reactor机制,还可以很容易的实现定时器的功能,使用方式如下。
-
编写一个事件反应器,重载 handle_timeout()方法,该方法是定时器的触发时间到时,会自动触发该方法。
-
通过Reactor的 schedule_timer()方法注册定时器。
-
启动reacotr的 handle_events()事件分发循环。
-
当不想使用定时器时,可以通过Reactor的 cancel_timer()方法注销定时器。
下面的代码简单的实现了一个定时器,并具有基本的开启,关闭功能。

- #include
- #include
- class MyTimerHandler : public ACE_Event_Handler
- {
- private:
- int inteval; //执行时间间隔
- int delay; //延迟执行时间
- int timerid;
- public:
- MyTimerHandler(int delay,int inteval)
- {
- this->delay=delay;
- this->inteval=inteval;
- }
- int open() //注册定时器
- {
- ACE_Time_Value delaytime(inteval);
- ACE_Time_Value intevaltime(inteval);
- timerid = reactor()->schedule_timer(this,
- 0, //传递handle_timeout给的参数
- delaytime,
- intevaltime);
- return timerid;
- }
- int close() //取消定时器
- {
- return reactor()->cancel_timer(timerid);
- }
- //定时器回调函数
- int handle_timeout (const ACE_Time_Value ¤t_time,
- const void * = 0)
- {
- time_t epoch = ((timespec_t)current_time).tv_sec;
- ACE_DEBUG ((LM_INFO,
- ACE_TEXT ("handle_timeout: %s\n"),
- ACE_OS::ctime (&epoch)));
- return 0;
- }
- };
- int main(int argc, char *argv[])
- {
- MyTimerHandler * timer = new MyTimerHandler (3,5);
- timer->reactor(ACE_Reactor::instance());
- timer->open();
- for(int i=0;ihandle_events();
- }
- timer->close();
- ACE_OS::printf("cancel timer");
- while(true)
- ACE_Reactor::instance()->handle_events();
- return 0;
- }
代码功能比较简单,这里就不多做介绍了。