ACE Reactor 框架实现了Reactor模式,允许事件驱动的应用对源自许多不同事件源的事件做出反映,比如IO句柄,定时器,以及信号,应用重新定义框架所定义的挂钩方法。框架随机对其进行分派来处理事件,Reactor负责:(1)检查多路分离器来自各种事件源的、不同类型的连接和数据事件,(2)将这些事件分派给应用所定义的处理器,由它进行处理。
反应式服务器响应来自一个或多个事件源的时间,在理想情况下,对时间的响应会足够快,以使所有请求看起来像是被同时处理的,尽管事件处理通常是由单个线程处理的。同步事件多路分离器位于各个反应式服务器的心脏处。这种机制检测源自许多事件源的事件并对其作出响应,从而同步地使事件作为服务器正常执行路径的一部分提供给服务器。
select()函数是最为常见的同步事件多路分离器。这个系统函数在同一组IO句柄上等待指定的事件发生,当一个或者是多个IO句柄开始活动时,或是在指定的时间过去后,select函数就会返回。
ACE_Select_Reactor是ACE_Reactor接口的一种实现,它使用select同步时间多路分离器函数来检测IO和定时器事件,除了支持ACE_Reactor接口的所有特性外,ACE_Select_Reactor类还提供了以下能力:
1、它支持重入的反应器调用,应用可以从正在由统一反应器分派的事件处理器中调用handle_event方法;
2、它可以被配置为同步化的或异步化的,在线程安全性和降低开销之间进行折中;
3、它在再次调用select函数之前,分派其句柄集中的所有活动句柄,从而保证了公正性。
- #include "ace/streams.h"
- #include "ace/Reactor.h"
- #include "ace/Select_Reactor.h"
- #include "ace/Thread_Manager.h"
- #include
- #include "Reactor_Logging_Server_T.h"
- #include "Logging_Acceptor_Ex.h"
- typedef Reactor_Logging_Server
- Server_Logging_Daemon;
- static ACE_THR_FUNC_RETURN event_loop (void *arg)
- {
- ACE_Reactor *reactor = static_cast (arg);
- reactor->owner (ACE_OS::thr_self ());
- reactor->run_reactor_event_loop ();
- return 0;
- }
- static ACE_THR_FUNC_RETURN controller (void *arg)
- {
- for (;;)
- {
- std::string user_input;
- std::getline (cin, user_input, '\n');
- if (user_input == "quit")
- {
- break;
- }
- }
- return 0;
- }
- int main(int argc,char **argv)
- {
- ACE_Select_Reactor select_reactor;
- ACE_Reactor reactor (&select_reactor);
- Server_Logging_Daemon *server;
- // Ignore argv[0]...
- --argc; ++argv;
- ACE_NEW_RETURN (server,
- Server_Logging_Daemon (argc, argv, &reactor),
- 1);
- ACE_Thread_Manager::instance ()->spawn (event_loop, &reactor);
- ACE_Thread_Manager::instance ()->spawn (controller, &reactor);
- return ACE_Thread_Manager::instance ()->wait ();
- }
在上面的代码中,在51行,由ACE_Thread_Manager单体派生一个线程,并让其运行event_loop()函数,在52行,由ACE_Thread_Manager单体派生一个线程,让其运行controller()函数。
在从main函数返回之前,等待其他两个线程推出,ACE_Thread_Manager:wait()还会收取两个线程的退出状态,以免内存泄漏。
ACE_Select_Reactor 2 —— 服务器网关
ACE中的流包装提供面向连接的通信。流数据传输包装类包括ACE_SOCK_Stream和ACE_LSOCK_Stream,他们分别包装TCP/IP和UNIX域Socket协议数据传输功能。连接建立类包括针对TCP/IP的ACE_SOCK_Connector和ACE_SOCK_Acceptor,以及针对UNIX域Socket的ACE_LSOCK_Connector和ACE_LSOCK_Acceptor。
- class Server
- {
- public:
- Server (int port): server_addr_(port),peer_acceptor_(server_addr_)
- {
- data_buf_= new char[SIZE_BUF];
- }
- int handle_connection()
- {
- // Read data from client
- int byte_count=0;
- if( (byte_count=new_stream_.recv (data_buf_, SIZE_DATA, 0))==-1)
- ACE_ERROR ((LM_ERROR, "%p\n", "Error in recv"));
- else
- {
- data_buf_[byte_count]=0;
- ACE_DEBUG((LM_DEBUG,"Server received %s \n",data_buf_));
- }
- // Close new endpoint
- if (new_stream_.close () == -1)
- ACE_ERROR ((LM_ERROR, "%p\n", "close"));
- return 0;
- }
- int accept_connections ()
- {
- if (peer_acceptor_.get_local_addr (server_addr_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"%p\n","Error in get_local_addr"),1);
- ACE_DEBUG ((LM_DEBUG,"Starting server at port %d\n",
- server_addr_.get_port_number ()));
- while(1)
- {
- ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT);
- if (peer_acceptor_.accept (new_stream_, &client_addr_, &timeout)== -1)
- {
- ACE_ERROR ((LM_ERROR, "%p\n", "accept"));
- continue;
- }
- else
- {
- ACE_DEBUG((LM_DEBUG,
- "Connection established with remote %s:%d\n",
- client_addr_.get_host_name(),client_addr_.get_port_number()));
- //Handle the connection
- handle_connection();
- }
- }
- }
- private:
- char *data_buf_;
- ACE_INET_Addr server_addr_;
- ACE_INET_Addr client_addr_;
- ACE_SOCK_Acceptor peer_acceptor_;
- ACE_SOCK_Stream new_stream_;
- };
在上面的Server类中,创建了一个被动服务器,侦听到来的客户端连接,在连接建立之后,服务器接收来自客户端的数据,然后关闭链接。
Server类包含的accept_connection()方法使用接收器来将连接接受“进”ACE_SOCK_Stream new_stream_。该操作完成的基本流程是:调用接收器上的accept(),并将流作为参数传入其中。一旦连接已经建立进流中,流的包装方法send()和recv()就可以用来在新建立的链路上发送和接收数据,还有一个空的ACE_INET_Addr也被传入接收器的accept()方法,并在其中被设定为发起连接的远程机器地址。
在连接建立后,服务器调用handle_connection()方法,它开始从客户端那里收取一个预先知道的单词,然后将流关闭。连接关闭通过调用流上的close()方法来完成,该方法会释放所有的Socket资源并终止连接。
http://acme-ltt.iteye.com/blog/1455556中提到的ACE_Select_Reactor,在static ACE_THR_FUNC_RETURN controller (void *arg)函数中,调用上述的Server类,搭建基于ACE_Select_Reactor的Socket服务器网关。
客户端程序:
- #include "ace/SOCK_Connector.h"
- #include "ace/INET_Addr.h"
- #include "ace/OS.h"
- #include "ace/Log_Msg.h"
- #include
- #include "text.h"
- #include "ace/Thread_Manager.h"
- #include
- #define SIZE_BUF 128
- static const ACE_Time_Value TIME_INTERVAL(0, 1000000);
- class Client
- {
- public:
- Client(char *hostname, int port):remote_addr_(port,hostname)
- {
- }
- int connect_to_server()
- {
- ACE_DEBUG ((LM_DEBUG, "(%P|%t) Starting connect to %s:%d\n",
- remote_addr_.get_host_name(),remote_addr_.get_port_number()));
- if (connector_.connect (client_stream_, remote_addr_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p\n","connection failed"),-1);
- else
- ACE_DEBUG ((LM_DEBUG,"(%P|%t) connected to %s\n",
- remote_addr_.get_host_name ()));
- return 0;
- }
- int send_to_server()
- {
- iovec iov[3];
- iov[0].iov_base = (void *)"get";
- iov[0].iov_len = 3;
- iov[1].iov_base = getdata()/* some data */;
- iov[1].iov_len = strlen(getdata());
- iov[2].iov_base = (void *)"end.";
- iov[2].iov_len = 4;
- if (client_stream_.sendv_n (iov,3) == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p\n","send_n"),0);
- //break;
- exit(0);
- }
- close();
- }
- int close()
- {
- if (client_stream_.close () == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) %p\n","close"),-1);
- else
- return 0;
- }
- private:
- ACE_SOCK_Stream client_stream_;
- ACE_INET_Addr remote_addr_;
- ACE_SOCK_Connector connector_;
- char *data_buf_;
- };
- int main (int argc, char *argv[])
- {
- if(argc acceptor_type;