您当前的位置: 首页 > 

phymat.nico

暂无认证

  • 0浏览

    0关注

    1967博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ACE proactor example

phymat.nico 发布时间:2017-12-20 23:39:36 ,浏览量:0

网上看的两个好的例子-学习ACE时经常看的。

这个文章应该是介绍ACE编程的一个很好的原创文章,个人非常推荐了!

1、WIN32下面用proactor可以达到几乎RAW IOCP的效率,由于封装关系,应该是差那么一点。

 

客户端处理类的常规写法:

[cpp] view plain copy
  1. //处理客户端连接消息  
  2. class ClientHandler : public ACE_Service_Handler  
  3. {  
  4. public:  
  5. /**构造函数 
  6.  * 
  7.  * 
  8.  */  
  9. ClientHandler(unsigned int client_recv_buf_size=SERVER_CLIENT_RECEIVE_BUF_SIZE)  
  10.  :_read_msg_block(client_recv_buf_size),_io_count(0)  
  11. {  
  12. }  
  13.   
  14.   
  15. ~ClientHandler(){}  
  16.   
  17. /** 
  18.  *初始化,因为可能要用到ClientHandler内存池,而这个池又不一定要用NEW 
  19.  */  
  20. void init();  
  21.   
  22. /**清理函数,因为可能要用到内存池 
  23.  * 
  24.  */  
  25. void fini();  
  26.   
  27.   
  28. //检查是否超时的函数  
  29.   
  30. void check_time_out(time_t cur_time);  
  31.   
  32. public:  
  33.   
  34. /**客户端连接服务器成功后调用 
  35.  * 
  36.  * /param handle 套接字句柄 
  37.  * /param &message_block 第一次读到的数据(未用) 
  38.  */  
  39.   
  40.   
  41. //由Acceptor来调用!!!  
  42. virtual void open (ACE_HANDLE handle,ACE_Message_Block &message_block);  
  43.   
  44. /**处理网络读操作结束消息 
  45.  * 
  46.  * /param &result 读操作结果 
  47.  */  
  48. virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);  
  49.   
  50.   
  51. /**处理网络写操作结束消息 
  52.  * 
  53.  * /param &result 写操作结果 
  54.  */  
  55. virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);  
  56.   
  57. private:  
  58.   
  59. //**生成一个网络读请求  
  60.  *  
  61.  * /param void   
  62.  * /return 0-成功,-1失败  
  63.  */  
  64. int  initiate_read_stream  (void);  
  65.   
  66. /**生成一个写请求 
  67.  * 
  68.  * /param mb 待发送的数据 
  69.  * /param nBytes 待发送数据大小 
  70.  * /return 0-成功,-1失败 
  71.  */  
  72. int  initiate_write_stream (ACE_Message_Block & mb, size_t nBytes );  
  73.   
  74. /** 
  75.  * 
  76.  * /return 检查是否可以删除,用的是一个引用计数。每一个外出IO的时候+1,每一个IO成功后-1 
  77.  */  
  78. int check_destroy();  
  79.   
  80. //异步读  
  81. ACE_Asynch_Read_Stream _rs;  
  82.   
  83. //异步写  
  84. ACE_Asynch_Write_Stream _ws;  
  85.   
  86. //接收缓冲区只要一个就够了,因为压根就没想过要多读,我直到现在也不是很清楚为什么要多读,多读的话要考虑很多问题  
  87. ACE_Message_Block _read_msg_block;  
  88.   
  89. //套接字句柄,这个可以不要了,因为基类就有个HANDLER在里面的。  
  90. //ACE_HANDLE _handle;  
  91.   
  92. //一个锁,客户端反正有东东要锁的,注意,要用ACE_Recursive_Thread_Mutex而不是用ACE_Thread_Mutex,这里面是可以重入的,而且在WIN32下是直接的EnterCriticalSection,可以达到很高的效率  
  93. ACE_Recursive_Thread_Mutex _lock;  
  94.   
  95. //在外IO数量,其实就是引用计数啦,没啥的。为0的时候就把这个东东关掉啦。  
  96. long _io_count;  
  97.   
  98.   
  99. //检查超时用的,过段时间没东东就CLOSE他了。  
  100.   
  101. time_t _last_net_io;  
  102.   
  103. private:  
  104.   
  105.   
  106. //本来想用另外一种模型的,只用1个或者2个外出读,后来想想,反正一般内存都是足够的,就不管了。  
  107.   
  108. //ACE_Message_Block _send_msg_blocks[2];  
  109.   
  110. //ACE_Message_Block &_sending_msg_block;  
  111.   
  112. //ACE_Message_Block &_idle_msg_block;  
  113.   
  114. private:  
  115.   
  116. public:  
  117. //TODO:move to prriva and use friend class!!!  
  118.   
  119.   
  120. //只是为了效率更高,不用STL的LIST是因为到现在我没有可用的Node_Allocator,所以效率上会有问题。  
  121. ClientHandler *_next;  
  122.   
  123. ClientHandler *next(){return _next;}  
  124.   
  125. void next(ClientHandler *obj){_next=obj;}  
  126.   
  127. };  
  128.   
  129.    
  130.   
  131. //这是具体实现,有些地方比较乱,懒得管了,锁的有些地方不对。懒得改了,反正在出错或者有瓶颈的时候再做也不迟。  
  132.   
  133. void ClientHandler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)  
  134. {  
  135. _last_net_io=ACE_OS::time(NULL);  
  136. int byterecved=result.bytes_transferred ();  
  137. if ( (result.success ()) && (byterecved != 0))  
  138. {  
  139.  //ACE_DEBUG ((LM_DEBUG,  "Receiver completed:%d/n",byterecved));  
  140.   
  141.   
  142. //处理完数据  
  143.  if(handle_received_data()==true)  
  144.  {  
  145.   //ACE_DEBUG ((LM_DEBUG,  "go on reading.../n"));  
  146.   
  147.   
  148. //把东东推到头部,处理粘包  
  149.   _read_msg_block.crunch();  
  150.   initiate_read_stream();  
  151.  }  
  152. }  
  153.   
  154.   
  155. //这个地方不想用ACE_Atom_op,因为反正要有一个锁,而且一般都会用锁,不管了。假如不在意的话,应该直接用ACE_Atom_Op以达到最好的效率  
  156.   
  157. {  
  158.  ACE_Guard locker (_lock);  
  159.  _io_count--;  
  160. }  
  161. check_destroy ();  
  162. }  
  163.   
  164. void ClientHandler::init()  
  165. {  
  166.   
  167.   
  168. //初始化数据,并不在构造函数里做。  
  169. _last_net_io=ACE_OS::time(NULL);  
  170. _read_msg_block.rd_ptr(_read_msg_block.base());  
  171. _read_msg_block.wr_ptr(_read_msg_block.base());  
  172. this->handle(ACE_INVALID_HANDLE);  
  173. }  
  174.   
  175. bool ClientHandler::handle_received_data()  
  176. {  
  177.   
  178.   
  179. ...........自己处理  
  180. return true;  
  181. }  
  182.   
  183.   
  184. //==================================================================  
  185. void ClientHandler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)  
  186. {  
  187. //发送成功,RELEASE掉  
  188. //这个不可能有多个RELEASE,直接XX掉  
  189. //result.message_block ().release ();  
  190. MsgBlockManager::get_instance().release_msg_block(&result.message_block());  
  191.   
  192. {  
  193.  ACE_Guard locker (_lock);  
  194.  _io_count--;  
  195. }  
  196. check_destroy ();  
  197. }  
  198.   
  199. //bool ClientHandler::destroy ()   
  200. //{  
  201. // FUNC_ENTER;  
  202. // ClientManager::get_instance().release_client_handle(this);  
  203. // FUNC_LEAVE;  
  204. // return false ;  
  205. //}  
  206.   
  207.   
  208. int  ClientHandler::initiate_read_stream  (void)  
  209. {  
  210. ACE_Guard locker (_lock);  
  211.   
  212.   
  213. //考虑到粘包的呀  
  214. if (_rs.read (_read_msg_block, _read_msg_block.space()) == -1)  
  215. {  
  216.  ACE_ERROR_RETURN ((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::read"),-1);  
  217. }  
  218. _io_count++;  
  219. return 0;  
  220. }  
  221.   
  222. /**生成一个写请求 
  223. * /param mb 待发送的数据 
  224. * /param nBytes 待发送数据大小 
  225. * /return 0-成功,-1失败 
  226. */  
  227. int  ClientHandler::initiate_write_stream (ACE_Message_Block & mb, size_t nBytes )  
  228. {  
  229. ACE_Guard locker (_lock);  
  230. if (_ws.write (mb , nBytes ) == -1)  
  231. {  
  232.  mb.release ();  
  233.  ACE_ERROR_RETURN((LM_ERROR,"%p/n","ACE_Asynch_Write_File::write"),-1);  
  234. }  
  235. _io_count++;  
  236. return 0;  
  237. }  
  238.   
  239. void ClientHandler::open (ACE_HANDLE handle,ACE_Message_Block &message_block)  
  240. {  
  241. //FUNC_ENTER;  
  242. _last_net_io=ACE_OS::time(NULL);  
  243. _io_count=0;  
  244. if(_ws.open(*this,this->handle())==-1)  
  245. {  
  246.  ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::open"));  
  247. }  
  248. else if (_rs.open (*this, this->handle()) == -1)  
  249. {  
  250.  ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::open"));  
  251. }  
  252. else  
  253. {  
  254.  initiate_read_stream ();  
  255. }  
  256.   
  257. check_destroy();  
  258. //FUNC_LEAVE;  
  259. }  
  260.   
  261. void ClientHandler::fini()  
  262. {  
  263. }  
  264.   
  265. void ClientHandler::check_time_out(time_t cur_time)  
  266. {  
  267. //ACE_Guard locker (_lock);  
  268. //ACE_DEBUG((LM_DEBUG,"cur_time is %u,last io is %u/n",cur_time,_last_net_io));  
  269.   
  270. //检测是否已经为0了  
  271. if(this->handle()==ACE_INVALID_HANDLE)  
  272.  return;  
  273. if(cur_time-_last_net_io>CLIENT_TIME_OUT_SECONDS)  
  274. {  
  275.  ACE_OS::shutdown(this->handle(),SD_BOTH);  
  276.  ACE_OS::closesocket(this->handle());  
  277.  this->handle(ACE_INVALID_HANDLE);  
  278. }  
  279. }  
  280.   
  281. int ClientHandler::check_destroy()  
  282. {  
  283. {  
  284.  ACE_Guard locker (_lock);  
  285.  if (_io_count> 0)  
  286.   return 1;  
  287. }  
  288. ACE_OS::shutdown(this->handle(),SD_BOTH);  
  289. ACE_OS::closesocket(this->handle());  
  290. this->handle(ACE_INVALID_HANDLE);  
  291.   
  292.   
  293. //这个地方给内存池吧。  
  294. ClientManager::get_instance().release_client_handle(this);  
  295. //delete this;  
  296. return 0;  
  297. }  
  298.   
  299.    
  300.   
  301.   
  302. 这个也很好!ACE的好文!真是不转我觉得后悔啊!  
  303. 没啥好说的,管理所有的客户端和内存池的功能。  
  304.   
  305. class ClientManager : public SingleTon  
  306. {  
  307. public:  
  308.   
  309. ClientManager():_header(NULL){}  
  310.   
  311. ~ClientManager(){}  
  312.   
  313. public:  
  314.   
  315. void init(unsigned int default_pool_size,unsigned int default_read_buf_size);  
  316.   
  317. void fini();  
  318.   
  319. public:  
  320.   
  321. ClientHandler *get_clienthandler();  
  322.   
  323. void release_client_handle(ClientHandler *client);  
  324.   
  325. void check_time_out();  
  326.   
  327. size_t get_client_count();  
  328.   
  329. private:  
  330.   
  331. ClientHandler *_header;  
  332.   
  333. std::set _active_clients;  
  334.   
  335. ACE_Recursive_Thread_Mutex _lock;  
  336. };  
  337.   
  338.    
  339.   
  340.    
  341.   
  342. #include "clientmanager.h"  
  343. #include   
  344.   
  345. ClientHandler *ClientManager::get_clienthandler()  
  346. {  
  347. FUNC_ENTER;  
  348. ACE_Guard locker(_lock);  
  349. ClientHandler *ret=NULL;  
  350. if(_header==NULL)  
  351. {  
  352.  ACE_DEBUG((LM_DEBUG,"client > max clients!!!/n"));  
  353. }  
  354. else  
  355. {  
  356.  ret=_header;  
  357.  _header=_header->next();  
  358.  ret->init();  
  359.  _active_clients.insert(ret);  
  360. }  
  361. FUNC_LEAVE;  
  362. return ret;  
  363. }  
  364.   
  365. void ClientManager::release_client_handle(ClientHandler *client)  
  366. {  
  367. //FUNC_ENTER;  
  368. ACE_Guard locker(_lock);  
  369. client->fini();  
  370. client->next(_header);  
  371. _header=client;  
  372. _active_clients.erase(client);  
  373. //FUNC_LEAVE;  
  374. }  
  375.   
  376. void ClientManager::init(unsigned int default_pool_size,unsigned int default_read_buf_size)  
  377. {  
  378. //FUNC_ENTER;  
  379. for(unsigned int i=0;inext(_header);  
  380.  _header=client;  
  381. }  
  382. //FUNC_LEAVE;  
  383. }  
  384.   
  385. void ClientManager::fini()  
  386. {  
  387. //FUNC_ENTER;  
  388. while(_header)  
  389. {  
  390.  ClientHandler *temp=_header->next();  
  391.  delete _header;  
  392.  _header=temp;  
  393. }  
  394. //FUNC_LEAVE;  
  395. }  
  396.   
  397. void ClientManager::check_time_out()  
  398. {  
  399. time_t cur_time=ACE_OS::time(NULL);  
  400. ACE_Guard locker(_lock);  
  401. for(std::set::iterator it=_active_clients.begin();it!=_active_clients.end();it++)  
  402. {  
  403.  (*it)->check_time_out(cur_time);  
  404. }  
  405. }  
  406.   
  407. size_t ClientManager::get_client_count()  
  408. {  
  409. ACE_Guard locker(_lock);  
  410. return _active_clients.size();  
  411. }  
  412.   
  413.   
  414. //服务器的设计  
  415. 按照七猫的说话,这个框架可以达到IOCP的效率,真的利害!但是我也不知道真伪!所以大家不要认为是我说的阿!我没有测试过,所以也不太清楚是否真的有那么高的效率!  
  416. 没有什么可说的! 好文章!  
  417.   
  418. class MyServer : public SingleTon  
  419. {  
  420. public:  
  421. /**主服务器初始化工作 
  422. * /param *listenaddr 监听地址:"192.168.0.188:80"-在192.168.0.188的80端口进行监听 
  423. "80"-在所有IP地址的80端口进行监听 
  424. * /param numOfThreads 服务器网络消息处理的线程个数 
  425. * /return 1:成功,0或者其他值:失败 
  426. */  
  427. int init(const char *listenaddr,unsigned int numOfThreads);  
  428.   
  429. /**最后清理工作,资源释放工作 
  430.  * 
  431.  */  
  432. void fini();  
  433.   
  434. /**主服务器开始运行 
  435. * /return 1-成功,-1失败 
  436. */  
  437. int start();  
  438.   
  439. /**主服务器停止运行 
  440. */  
  441. void stop();  
  442. private:  
  443. //任务管理器(线程池)  
  444. ServerTask _task;  
  445. //监听地址  
  446. ACE_INET_Addr _listen_addr;  
  447. //网络接收器  
  448. ClientAcceptor _acceptor;  
  449. //网络消息处理线程数量  
  450. unsigned int _num_of_threads;  
  451. private:  
  452.   
  453.    
  454.   
  455. Observer _observer;  
  456.   
  457. //检查是否有客户端超时  
  458. CheckClientTimeoutHandler _check_time_out_handler;  
  459. };  
  460.   
  461.   
  462. int MyServer::init(const char *listenaddr,unsigned int numOfThreads)  
  463. {  
  464. //ACE_WIN32_Proactor *pImpl = new ACE_WIN32_Proactor;  
  465. //static ACE_Proactor proactor(pImpl,1);  
  466. //ACE_Proactor::instance( & proactor);  
  467. _listen_addr=ACE_INET_Addr(listenaddr);  
  468. //_num_of_threads=numOfThreads;  
  469. _num_of_threads=1;  
  470. _observer.init();  
  471. _syn_cms_handler.init();  
  472. _check_time_out_handler.init();  
  473. return 1;  
  474. }  
  475.   
  476. void MyServer::fini()  
  477. {  
  478. ItemManager::get_instance().purge_all_items();  
  479. _observer.fini();  
  480. _syn_cms_handler.fini();  
  481. _check_time_out_handler.fini();  
  482. }  
  483.   
  484. /**主服务器开始运行 
  485. * /return 1-成功,-1失败 
  486. */  
  487. int MyServer::start()  
  488. {  
  489. int Rc = _acceptor.open (_listen_addr,0,1);  
  490. if(Rc==-1)  
  491. {  
  492.  ACE_ERROR_RETURN ((LM_ERROR, "acceptor error./n"), -1);  
  493. }  
  494.   
  495.    
  496.   
  497.   
  498. //每20秒检查一次,检查是否有客户端超时  
  499. ACE_Time_Value check_client_timeout_interval(120);  
  500. Rc=ACE_Proactor::instance ()->schedule_timer (  
  501.  _check_time_out_handler,  
  502.  (void *) "timeout",  
  503.  ACE_Time_Value::zero,  
  504.  check_client_timeout_interval);  
  505. if(Rc==-1)  
  506. {  
  507.  ACE_ERROR_RETURN ((LM_ERROR, "%p/n", "check_client_timeout schedule_timer"), -1);  
  508. }  
  509.   
  510. ACE_Time_Value observertime(20);  
  511. Rc=ACE_Proactor::instance ()->schedule_timer (  
  512.  _observer,  
  513.  (void *) "observer",  
  514.  ACE_Time_Value::zero,  
  515.  observertime);  
  516. if(Rc==-1)  
  517. {  
  518.  ACE_ERROR_RETURN ((LM_ERROR, "%p/n", "observer schedule_timer"), -1);  
  519. }  
  520.   
  521. if (_task.activate (THR_NEW_LWP, _num_of_threads ) == -1)  
  522. {  
  523.  ACE_ERROR_RETURN ((LM_ERROR, "task start error./n", "main"), -1);  
  524. }  
  525. return 1;  
  526. }  
  527. /**主服务器停止运行 
  528. */  
  529. void MyServer::stop()  
  530. {  
  531. ACE_Proactor::end_event_loop () ;  
  532. ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();  
  533.   
  534. pTM->wait_task ( & _task) ;  
  535. //ACE_Proactor::instance( ( ACE_Proactor* )NULL );  
  536. }  
  537.   
  538.   
  539. int ACE_TMAIN(int argc,char *argv[])  
  540. {  
  541. FUNC_ENTER;   
  542. std::cout
关注
打赏
1659628745
查看更多评论
立即登录/注册

微信扫码登录

0.1077s