网上看的两个好的例子-学习ACE时经常看的。
这个文章应该是介绍ACE编程的一个很好的原创文章,个人非常推荐了!
1、WIN32下面用proactor可以达到几乎RAW IOCP的效率,由于封装关系,应该是差那么一点。
客户端处理类的常规写法:
- //处理客户端连接消息
- class ClientHandler : public ACE_Service_Handler
- {
- public:
- /**构造函数
- *
- *
- */
- ClientHandler(unsigned int client_recv_buf_size=SERVER_CLIENT_RECEIVE_BUF_SIZE)
- :_read_msg_block(client_recv_buf_size),_io_count(0)
- {
- }
- ~ClientHandler(){}
- /**
- *初始化,因为可能要用到ClientHandler内存池,而这个池又不一定要用NEW
- */
- void init();
- /**清理函数,因为可能要用到内存池
- *
- */
- void fini();
- //检查是否超时的函数
- void check_time_out(time_t cur_time);
- public:
- /**客户端连接服务器成功后调用
- *
- * /param handle 套接字句柄
- * /param &message_block 第一次读到的数据(未用)
- */
- //由Acceptor来调用!!!
- virtual void open (ACE_HANDLE handle,ACE_Message_Block &message_block);
- /**处理网络读操作结束消息
- *
- * /param &result 读操作结果
- */
- virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
- /**处理网络写操作结束消息
- *
- * /param &result 写操作结果
- */
- virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
- private:
- //**生成一个网络读请求
- *
- * /param void
- * /return 0-成功,-1失败
- */
- int initiate_read_stream (void);
- /**生成一个写请求
- *
- * /param mb 待发送的数据
- * /param nBytes 待发送数据大小
- * /return 0-成功,-1失败
- */
- int initiate_write_stream (ACE_Message_Block & mb, size_t nBytes );
- /**
- *
- * /return 检查是否可以删除,用的是一个引用计数。每一个外出IO的时候+1,每一个IO成功后-1
- */
- int check_destroy();
- //异步读
- ACE_Asynch_Read_Stream _rs;
- //异步写
- ACE_Asynch_Write_Stream _ws;
- //接收缓冲区只要一个就够了,因为压根就没想过要多读,我直到现在也不是很清楚为什么要多读,多读的话要考虑很多问题
- ACE_Message_Block _read_msg_block;
- //套接字句柄,这个可以不要了,因为基类就有个HANDLER在里面的。
- //ACE_HANDLE _handle;
- //一个锁,客户端反正有东东要锁的,注意,要用ACE_Recursive_Thread_Mutex而不是用ACE_Thread_Mutex,这里面是可以重入的,而且在WIN32下是直接的EnterCriticalSection,可以达到很高的效率
- ACE_Recursive_Thread_Mutex _lock;
- //在外IO数量,其实就是引用计数啦,没啥的。为0的时候就把这个东东关掉啦。
- long _io_count;
- //检查超时用的,过段时间没东东就CLOSE他了。
- time_t _last_net_io;
- private:
- //本来想用另外一种模型的,只用1个或者2个外出读,后来想想,反正一般内存都是足够的,就不管了。
- //ACE_Message_Block _send_msg_blocks[2];
- //ACE_Message_Block &_sending_msg_block;
- //ACE_Message_Block &_idle_msg_block;
- private:
- public:
- //TODO:move to prriva and use friend class!!!
- //只是为了效率更高,不用STL的LIST是因为到现在我没有可用的Node_Allocator,所以效率上会有问题。
- ClientHandler *_next;
- ClientHandler *next(){return _next;}
- void next(ClientHandler *obj){_next=obj;}
- };
- //这是具体实现,有些地方比较乱,懒得管了,锁的有些地方不对。懒得改了,反正在出错或者有瓶颈的时候再做也不迟。
- void ClientHandler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
- {
- _last_net_io=ACE_OS::time(NULL);
- int byterecved=result.bytes_transferred ();
- if ( (result.success ()) && (byterecved != 0))
- {
- //ACE_DEBUG ((LM_DEBUG, "Receiver completed:%d/n",byterecved));
- //处理完数据
- if(handle_received_data()==true)
- {
- //ACE_DEBUG ((LM_DEBUG, "go on reading.../n"));
- //把东东推到头部,处理粘包
- _read_msg_block.crunch();
- initiate_read_stream();
- }
- }
- //这个地方不想用ACE_Atom_op,因为反正要有一个锁,而且一般都会用锁,不管了。假如不在意的话,应该直接用ACE_Atom_Op以达到最好的效率
- {
- ACE_Guard locker (_lock);
- _io_count--;
- }
- check_destroy ();
- }
- void ClientHandler::init()
- {
- //初始化数据,并不在构造函数里做。
- _last_net_io=ACE_OS::time(NULL);
- _read_msg_block.rd_ptr(_read_msg_block.base());
- _read_msg_block.wr_ptr(_read_msg_block.base());
- this->handle(ACE_INVALID_HANDLE);
- }
- bool ClientHandler::handle_received_data()
- {
- ...........自己处理
- return true;
- }
- //==================================================================
- void ClientHandler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
- {
- //发送成功,RELEASE掉
- //这个不可能有多个RELEASE,直接XX掉
- //result.message_block ().release ();
- MsgBlockManager::get_instance().release_msg_block(&result.message_block());
- {
- ACE_Guard locker (_lock);
- _io_count--;
- }
- check_destroy ();
- }
- //bool ClientHandler::destroy ()
- //{
- // FUNC_ENTER;
- // ClientManager::get_instance().release_client_handle(this);
- // FUNC_LEAVE;
- // return false ;
- //}
- int ClientHandler::initiate_read_stream (void)
- {
- ACE_Guard locker (_lock);
- //考虑到粘包的呀
- if (_rs.read (_read_msg_block, _read_msg_block.space()) == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::read"),-1);
- }
- _io_count++;
- return 0;
- }
- /**生成一个写请求
- *
- * /param mb 待发送的数据
- * /param nBytes 待发送数据大小
- * /return 0-成功,-1失败
- */
- int ClientHandler::initiate_write_stream (ACE_Message_Block & mb, size_t nBytes )
- {
- ACE_Guard locker (_lock);
- if (_ws.write (mb , nBytes ) == -1)
- {
- mb.release ();
- ACE_ERROR_RETURN((LM_ERROR,"%p/n","ACE_Asynch_Write_File::write"),-1);
- }
- _io_count++;
- return 0;
- }
- void ClientHandler::open (ACE_HANDLE handle,ACE_Message_Block &message_block)
- {
- //FUNC_ENTER;
- _last_net_io=ACE_OS::time(NULL);
- _io_count=0;
- if(_ws.open(*this,this->handle())==-1)
- {
- ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Write_Stream::open"));
- }
- else if (_rs.open (*this, this->handle()) == -1)
- {
- ACE_ERROR ((LM_ERROR,"%p/n","ACE_Asynch_Read_Stream::open"));
- }
- else
- {
- initiate_read_stream ();
- }
- check_destroy();
- //FUNC_LEAVE;
- }
- void ClientHandler::fini()
- {
- }
- void ClientHandler::check_time_out(time_t cur_time)
- {
- //ACE_Guard locker (_lock);
- //ACE_DEBUG((LM_DEBUG,"cur_time is %u,last io is %u/n",cur_time,_last_net_io));
- //检测是否已经为0了
- if(this->handle()==ACE_INVALID_HANDLE)
- return;
- if(cur_time-_last_net_io>CLIENT_TIME_OUT_SECONDS)
- {
- ACE_OS::shutdown(this->handle(),SD_BOTH);
- ACE_OS::closesocket(this->handle());
- this->handle(ACE_INVALID_HANDLE);
- }
- }
- int ClientHandler::check_destroy()
- {
- {
- ACE_Guard locker (_lock);
- if (_io_count> 0)
- return 1;
- }
- ACE_OS::shutdown(this->handle(),SD_BOTH);
- ACE_OS::closesocket(this->handle());
- this->handle(ACE_INVALID_HANDLE);
- //这个地方给内存池吧。
- ClientManager::get_instance().release_client_handle(this);
- //delete this;
- return 0;
- }
- 这个也很好!ACE的好文!真是不转我觉得后悔啊!
- 没啥好说的,管理所有的客户端和内存池的功能。
- class ClientManager : public SingleTon
- {
- public:
- ClientManager():_header(NULL){}
- ~ClientManager(){}
- public:
- void init(unsigned int default_pool_size,unsigned int default_read_buf_size);
- void fini();
- public:
- ClientHandler *get_clienthandler();
- void release_client_handle(ClientHandler *client);
- void check_time_out();
- size_t get_client_count();
- private:
- ClientHandler *_header;
- std::set _active_clients;
- ACE_Recursive_Thread_Mutex _lock;
- };
- #include "clientmanager.h"
- #include
- ClientHandler *ClientManager::get_clienthandler()
- {
- FUNC_ENTER;
- ACE_Guard locker(_lock);
- ClientHandler *ret=NULL;
- if(_header==NULL)
- {
- ACE_DEBUG((LM_DEBUG,"client > max clients!!!/n"));
- }
- else
- {
- ret=_header;
- _header=_header->next();
- ret->init();
- _active_clients.insert(ret);
- }
- FUNC_LEAVE;
- return ret;
- }
- void ClientManager::release_client_handle(ClientHandler *client)
- {
- //FUNC_ENTER;
- ACE_Guard locker(_lock);
- client->fini();
- client->next(_header);
- _header=client;
- _active_clients.erase(client);
- //FUNC_LEAVE;
- }
- void ClientManager::init(unsigned int default_pool_size,unsigned int default_read_buf_size)
- {
- //FUNC_ENTER;
- for(unsigned int i=0;inext(_header);
- _header=client;
- }
- //FUNC_LEAVE;
- }
- void ClientManager::fini()
- {
- //FUNC_ENTER;
- while(_header)
- {
- ClientHandler *temp=_header->next();
- delete _header;
- _header=temp;
- }
- //FUNC_LEAVE;
- }
- void ClientManager::check_time_out()
- {
- time_t cur_time=ACE_OS::time(NULL);
- ACE_Guard locker(_lock);
- for(std::set::iterator it=_active_clients.begin();it!=_active_clients.end();it++)
- {
- (*it)->check_time_out(cur_time);
- }
- }
- size_t ClientManager::get_client_count()
- {
- ACE_Guard locker(_lock);
- return _active_clients.size();
- }
- //服务器的设计
- 按照七猫的说话,这个框架可以达到IOCP的效率,真的利害!但是我也不知道真伪!所以大家不要认为是我说的阿!我没有测试过,所以也不太清楚是否真的有那么高的效率!
- 没有什么可说的! 好文章!
- class MyServer : public SingleTon
- {
- public:
- /**主服务器初始化工作
- *
- * /param *listenaddr 监听地址:"192.168.0.188:80"-在192.168.0.188的80端口进行监听
- "80"-在所有IP地址的80端口进行监听
- * /param numOfThreads 服务器网络消息处理的线程个数
- * /return 1:成功,0或者其他值:失败
- */
- int init(const char *listenaddr,unsigned int numOfThreads);
- /**最后清理工作,资源释放工作
- *
- */
- void fini();
- /**主服务器开始运行
- *
- * /return 1-成功,-1失败
- */
- int start();
- /**主服务器停止运行
- *
- */
- void stop();
- private:
- //任务管理器(线程池)
- ServerTask _task;
- //监听地址
- ACE_INET_Addr _listen_addr;
- //网络接收器
- ClientAcceptor _acceptor;
- //网络消息处理线程数量
- unsigned int _num_of_threads;
- private:
- Observer _observer;
- //检查是否有客户端超时
- CheckClientTimeoutHandler _check_time_out_handler;
- };
- int MyServer::init(const char *listenaddr,unsigned int numOfThreads)
- {
- //ACE_WIN32_Proactor *pImpl = new ACE_WIN32_Proactor;
- //static ACE_Proactor proactor(pImpl,1);
- //ACE_Proactor::instance( & proactor);
- _listen_addr=ACE_INET_Addr(listenaddr);
- //_num_of_threads=numOfThreads;
- _num_of_threads=1;
- _observer.init();
- _syn_cms_handler.init();
- _check_time_out_handler.init();
- return 1;
- }
- void MyServer::fini()
- {
- ItemManager::get_instance().purge_all_items();
- _observer.fini();
- _syn_cms_handler.fini();
- _check_time_out_handler.fini();
- }
- /**主服务器开始运行
- *
- * /return 1-成功,-1失败
- */
- int MyServer::start()
- {
- int Rc = _acceptor.open (_listen_addr,0,1);
- if(Rc==-1)
- {
- ACE_ERROR_RETURN ((LM_ERROR, "acceptor error./n"), -1);
- }
- //每20秒检查一次,检查是否有客户端超时
- ACE_Time_Value check_client_timeout_interval(120);
- Rc=ACE_Proactor::instance ()->schedule_timer (
- _check_time_out_handler,
- (void *) "timeout",
- ACE_Time_Value::zero,
- check_client_timeout_interval);
- if(Rc==-1)
- {
- ACE_ERROR_RETURN ((LM_ERROR, "%p/n", "check_client_timeout schedule_timer"), -1);
- }
- ACE_Time_Value observertime(20);
- Rc=ACE_Proactor::instance ()->schedule_timer (
- _observer,
- (void *) "observer",
- ACE_Time_Value::zero,
- observertime);
- if(Rc==-1)
- {
- ACE_ERROR_RETURN ((LM_ERROR, "%p/n", "observer schedule_timer"), -1);
- }
- if (_task.activate (THR_NEW_LWP, _num_of_threads ) == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR, "task start error./n", "main"), -1);
- }
- return 1;
- }
- /**主服务器停止运行
- *
- */
- void MyServer::stop()
- {
- ACE_Proactor::end_event_loop () ;
- ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();
- pTM->wait_task ( & _task) ;
- //ACE_Proactor::instance( ( ACE_Proactor* )NULL );
- }
- int ACE_TMAIN(int argc,char *argv[])
- {
- FUNC_ENTER;
- std::cout
关注打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?
立即登录/注册


微信扫码登录