您当前的位置: 首页 >  网络

phymat.nico

暂无认证

  • 1浏览

    0关注

    1967博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ACE_Proactor网络通信示例

phymat.nico 发布时间:2017-12-20 23:37:04 ,浏览量:1

注:本文仅对使用ACE进行网络通信进行演示说明。本文中的代码皆使用doxgen的注释风格。本文中使用的事件机制,其原理与实现请参考[ 基于C++的事件机制设计[2.0]]一文。

 

ACE的Proactor对Epoll和IOCP进行了良好包装,因此,使用ACE来进行网络开发是相当的便利,性能也不差。闲言少叙,看代码。

这里以TCP协议进行流式通信。我们需要解析流,得出每次接收到的数据包大小和包含的数据域,假定我们的包结构如下:

 

包序列号(32Bit) | 长度(16Bit) | 数据域(大小为长度所表示的字节)... | (下一包)

 

通过分析由包序列号和长度组成的包头来解决半包,粘包等问题,许多其它文章也有描述,这里就省略了。

这样可以确定我们的包头结构如下:

 

[cpp] view plain copy
  1. #pragma pack(push)  
  2. #pragma pack(1)  
  3.     /** 
  4.     * @brief Tcp包头结构 
  5.     */  
  6.     typedef struct tag_TTcpPackHeader  
  7.     {  
  8.         unsigned int seq; //length() m_Reader.read(*m_CurDataMB, m_CurDataMB->space());  
  9.                 return ;  
  10.             }  
  11.             TTcpPackHeader* header = reinterpret_cast(this->m_CurDataMB->rd_ptr());  
  12.             ACE_Message_Block* dataMB = this->m_CurDataMB->cont();  
  13.             if (!dataMB)  
  14.             {  
  15.                 ACE_NEW_NORETURN(dataMB, ACE_Message_Block(header->len));  
  16.                 if (dataMB)  
  17.                     this->m_CurDataMB->cont(dataMB);  
  18.                 else  
  19.                 {  
  20.                     this->m_CurDataMB->release();  
  21.                     ACE_DEBUG((LM_ERROR, ACE_TEXT("Failed to allocated: %i/n"), errno));  
  22.                     delete this;  
  23.                     return ;  
  24.                 }  
  25.             }  
  26.               
  27.             if (dataMB->length() == header->len)  
  28.             {  
  29.                 // 成功读取了数据?  
  30.                 m_OnDataReceive(m_ClientAddr.get_ip_address(), m_ClientAddr.get_port_number(), header->seq, dataMB->rd_ptr(), header->len);  
  31.                   
  32.                 m_CurDataMB->release();  
  33.                 initCurDataMB(); // 下一包数据  
  34.                   
  35.                 this->m_Reader.read(*m_CurDataMB, m_CurDataMB->space()); // next, try to get header  
  36.                 return ;  
  37.             }  
  38.               
  39.             this->m_Reader.read(*dataMB, dataMB->space()); // try to get data left  
  40.         }  
  41.     }  
  42.     void TTcpHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result& result)  
  43.     {  
  44.         if (result.success() && result.bytes_transferred() > 0) // 发送成功  
  45.         {  
  46.             ACE_Message_Block& mb = result.message_block();  
  47. #ifdef _DEBUG  
  48.             ACE_TCHAR addrStr[128];  
  49.               
  50.             m_ClientAddr.addr_to_string(addrStr, sizeof(addrStr) / sizeof(ACE_TCHAR));  
  51.               
  52.             ACE_DEBUG((LM_INFO, ACE_TEXT("Send to client: %s len:%i/n"), addrStr, result.bytes_transferred()));  
  53.               
  54.             char* ptr = mb.rd_ptr();  
  55.               
  56. #endif  
  57.               
  58.             mb.release();  
  59.         }  
  60.     }  
  61.     void TTcpHandler::initCurDataMB()  
  62.     {  
  63.         ACE_NEW_NORETURN(m_CurDataMB, ACE_Message_Block(TCP_PACK_HEADER_SIZE, TCP_DATA_RECEIVE));  
  64.     }  
  65. } // namespace igame  

 

然后是TTcpNetThread,该类的实现也相当简单:

[cpp] view plain copy
  1. #include   
  2. #include "TCPNetThread.h"  
  3. namespace igame  
  4. {  
  5.     int TTcpNetThread::open() { return this->activate(); }  
  6.     int TTcpNetThread::close()  
  7.     {  
  8.         ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环  
  9.         this->wait(); // 等待清理现场  
  10.         return 0;  
  11.     }  
  12.       
  13.     int TTcpNetThread::svc()  
  14.     {  
  15.         ACE_INET_Addr listenAddr(DEF_LISTENING_PORT); // 默认监听地址  
  16.         TTcpAcceptor tcpAcceptor; // 接收器  
  17.         // 设置事件  
  18.         tcpAcceptor.setOnClientConnect(m_OnClientConnect);  
  19.         tcpAcceptor.setOnClientDisconnect(m_OnClientDisconnect);  
  20.         tcpAcceptor.setOnClientValidate(m_OnClientValidate);  
  21.         tcpAcceptor.setOnDataReceive(m_OnDataReceive);  
  22.         tcpAcceptor.setOnDataSendFailed(m_OnDataSendFailed);  
  23.         tcpAcceptor.setOnDataSendSucceeded(m_OnDataSendSucceeded);  
  24.           
  25.         // 演出开始  
  26.         if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)  
  27.             ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p/n"), ACE_TEXT("failed to open TcpAcceptor errno=%i/n"), errno), -1);  
  28.         // Proactor的事件循环开始  
  29.         ACE_Proactor::instance()->proactor_run_event_loop();  
  30.         ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin/n")));  
  31.         return 0;  
  32.     }  
  33. } // namespace igame  

 

最后,对以上三个类进行聚合,封装,就成了TTcp类,在此之前,先定义消息类型:

[cpp] view plain copy
  1. /** 
  2. * @name TCP的ACE_Message_Block类型定义 @see ACE_Message_Block 
  3. * @{ 
  4. */  
  5. /// @brief TCP数据接收  
  6. #define TCP_DATA_RECEIVE            0x5505  
  7. /// @brief TCP客户端连接  
  8. #define TCP_CLIENT_CONNECT          0x5506  
  9. /// @brief TCP客户端断线  
  10. #define TCP_CLIENT_DISCONNECT       0x5507  
  11. /// @brief TCP数据发送  
  12. #define TCP_DATA_SEND               0x5508  
  13. /// @brief TCP数据发送成功  
  14. #define TCP_DATA_SEND_SUCCEEDED     0x5509  
  15. /// @brief TCP数据发送失败  
  16. #define TCP_DATA_SEND_FAILED        0x550A  
  17. /** 
  18. * @} 
  19. */  
  20. /// 默认监听地址:偶的车牌号  
  21. #define DEF_LISTENING_PORT  777  

现在看看TTcp的实现:

唔,太长了,下一篇吧。

 

 

 

此乃末技,

不知何用。

堆砌字数,

凑成更新。

走过路过,

不要错过。

 请原谅偶拖篇幅,这里奉上拖欠的数字。

TTcp的实现如下:

 

[cpp] view plain copy
  1. #include "Tcp.h"  
  2. namespace igame  
  3. {  
  4.     TTcp::TTcp()  
  5.         :m_TcpNetThd(0)  
  6.     {  
  7.         ACE_NEW_NORETURN(m_TcpNetThd, TTcpNetThread()); // 创建TTcpNetThread对象实例  
  8.     }  
  9.     TTcp::~TTcp()  
  10.     {  
  11.         if (m_TcpNetThd) // 释放  
  12.             delete m_TcpNetThd;  
  13.     }  
  14.     void TTcp::open()  
  15.     {  
  16.         ACE_TRACE("TTcp::open");  
  17.           
  18.         // 所有TTcpNetThread的事件,交由TTcp来处理  
  19.         // TOnClientValidate除外,该事件需要特定的逻辑,且无法异步  
  20.         if (m_TcpNetThd)  
  21.         {  
  22.             m_TcpNetThd->setOnClientConnect(EVENT(TTcpHandler::TOnClientConnect, TTcp, this, tcpNetThread_OnClientConnect));  
  23.             m_TcpNetThd->setOnClientDisconnect(EVENT(TTcpHandler::TOnClientDisconnect, TTcp, this, tcpNetThread_OnClientDisconnect));  
  24.             m_TcpNetThd->setOnClientValidate(m_OnClientValidate);  
  25.             m_TcpNetThd->setOnDataReceive(EVENT(TTcpHandler::TOnDataReceive, TTcp, this, tcpNetThread_OnDataReceive));  
  26.             m_TcpNetThd->setOnDataSendFailed(EVENT(TTcpHandler::TOnDataSendFailed, TTcp, this, tcpNetThread_OnDataSendFailed));  
  27.             m_TcpNetThd->setOnDataSendSucceeded(EVENT(TTcpHandler::TOnDataSendSucceeded, TTcp, this, tcpNetThread_OnDataSendSucceeded));  
  28.         }  
  29.         if (activate() == -1)  
  30.             ACE_DEBUG((LM_ERROR, ACE_TEXT("Resume thread failed")));  
  31.     }  
  32.     void TTcp::close()  
  33.     {  
  34.         if (m_TcpNetThd)  
  35.             m_TcpNetThd->close();  
  36.         ACE_TRACE("TTcp::close");  
  37.         ACE_Message_Block* termBlock; // 结束信号  
  38.           
  39.         ACE_NEW_NORETURN(termBlock, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP));  
  40.         if (!termBlock)  
  41.             ACE_DEBUG((LM_ERROR, ACE_TEXT("Allocate failed %i"), errno));  
  42.         else  
  43.         {  
  44.             putq(termBlock);  
  45.             wait();  
  46.         }  
  47.     }  
  48.     int TTcp::send(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* buf, unsigned short len)  
  49.     {  
  50.         ACE_Message_Block* mb = 0; // 数据包  
  51.         ACE_NEW_RETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + sizeof(unsigned int) + sizeof(unsigned short) + len, TCP_DATA_SEND), -1);  
  52.           
  53.         // 格式:ip | port | seq | len | 数据...  
  54.         mb->copy((const char *)&ip, sizeof(ACE_UINT32));  
  55.         mb->copy((const char *)&port, sizeof(ACE_UINT16));  
  56.         mb->copy((const char *)&seq, sizeof(unsigned int));  
  57.         mb->copy((const char *)&len, sizeof(unsigned short));  
  58.         mb->copy(buf, len);  
  59.         return putq(mb);  
  60.     }  
  61.     int TTcp::svc()  
  62.     {  
  63.         ACE_TRACE("TTcp::svc");  
  64.         if (m_TcpNetThd->open() == -1)  
  65.             ACE_DEBUG((LM_ERROR, ACE_TEXT("Failed to pen TTcpNetThread: %i"), errno));  
  66.         ACE_Message_Block* msg = 0;  
  67.         while(true)  
  68.         {  
  69.             if (getq(msg) == -1)  
  70.             {  
  71.                 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("Failed to getq %i"), errno), -1);  
  72.             }  
  73.             switch(msg->msg_type())  
  74.             {  
  75.             case ACE_Message_Block::MB_HANGUP: // 偶要退出  
  76.                 {  
  77.                     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Quit")));  
  78.                     msg->release();  
  79.                     return 0;  
  80.                 }  
  81.                 break;  
  82.             case TCP_CLIENT_CONNECT: // 客户端连接  
  83.                 {  
  84.                     int len = msg->length();  
  85.                     int hLen = sizeof(TTcpHandler *);  
  86.                     if (msg->length() != TCP_PACK_HEADER_SIZE + sizeof(TTcpHandler *))  
  87.                         ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("Tcp connection message block invalid!")), -1);  
  88.                     char* ptr = msg->rd_ptr();  
  89.                     ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);  
  90.                     ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);  
  91.                     TTcpHandler* handler = (TTcpHandler *)(*(int *)ptr);  
  92.                     {  
  93.                         ACE_Guard lock(m_Lock);  
  94.                         m_AddrMap.insert(make_pair((unsigned __int64)ip rd_ptr();  
  95.                     ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);  
  96.                     ACE_UINT16 port = *(ACE_UINT16 *)ptr;  
  97.                     {  
  98.                         ACE_Guard lock(m_Lock);  
  99.                         m_AddrMap.erase((unsigned __int64)ip seq, data, header->len);  
  100.                 }  
  101.                 break;  
  102.             case TCP_DATA_SEND:  
  103.                 {  
  104.                     if (msg->length() > sizeof(TTcpPackHeader))  
  105.                     {  
  106.                         char* ptr = msg->rd_ptr();  
  107.                         ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);  
  108.                         ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);  
  109.                         unsigned int seq = *(unsigned int *)ptr; ptr += sizeof(unsigned int);  
  110.                         unsigned short len = *(unsigned short *)ptr; ptr += sizeof(unsigned short);  
  111.                         const char* data = ptr;  
  112.                           
  113.                         {  
  114.                             ACE_Guard _lock(m_Lock);  
  115.                             hash_map::iterator it = m_AddrMap.find((unsigned __int64)ip rd_ptr();  
  116.                     ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);  
  117.                     ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);  
  118.                     TTcpPackHeader* header = (TTcpPackHeader *)ptr; ptr += TCP_PACK_HEADER_SIZE;  
  119.                     const char* data = ptr;  
  120.                     m_OnDataSendSucceeded(ip, port, header->seq, data, header->len);  
  121.                 }  
  122.                 break;  
  123.             case TCP_DATA_SEND_FAILED:  
  124.                 {  
  125.                     char* ptr = msg->rd_ptr();  
  126.                     ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);  
  127.                     ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);  
  128.                     TTcpPackHeader* header = (TTcpPackHeader *)ptr; ptr += TCP_PACK_HEADER_SIZE;  
  129.                     const char* data = ptr;  
  130.                     m_OnDataSendFailed(ip, port, header->seq, data, header->len);  
  131.                 }  
  132.                 break;  
  133.             default:  
  134.                 {  
  135.                     ACE_DEBUG((LM_ERROR, ACE_TEXT("Unknown ACE_Message_Block type %i/n"), msg->msg_type()));  
  136.                 }  
  137.                 break;  
  138.             } // switch  
  139.             msg->release();  
  140.         } // while true  
  141.         return 0;  
  142.     }  
  143.     void TTcp::tcpNetThread_OnClientConnect(ACE_UINT32 ip, ACE_UINT16 port, TTcpHandler* handler)  
  144.     {  
  145.         ACE_Message_Block* mb = 0;  
  146.         ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + sizeof(TTcpHandler *), TCP_CLIENT_CONNECT));  
  147.         if (mb)  
  148.         {  
  149.             mb->copy((const char *)&ip, sizeof(ACE_UINT32));  
  150.             mb->copy((const char *)&port, sizeof(ACE_UINT16));  
  151.             mb->copy((const char *)&handler, sizeof(TTcpHandler *));  
  152.             this->putq(mb);  
  153.         }  
  154.     }  
  155.     void TTcp::tcpNetThread_OnClientDisconnect(ACE_UINT32 ip, ACE_UINT16 port)  
  156.     {  
  157.         ACE_Message_Block* mb = 0;  
  158.         ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16), TCP_CLIENT_DISCONNECT));  
  159.         if (mb)  
  160.         {  
  161.             mb->copy((const char *)&ip, sizeof(ACE_UINT32));  
  162.             mb->copy((const char *)&port, sizeof(ACE_UINT16));  
  163.               
  164.             this->putq(mb);  
  165.         }  
  166.     }  
  167.       
  168.     void TTcp::tcpNetThread_OnDataReceive(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size)  
  169.     {  
  170.         ACE_Message_Block* mb = 0;  
  171.         ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + TCP_PACK_HEADER_SIZE + size, TCP_DATA_RECEIVE));  
  172.         if (mb)  
  173.         {  
  174.             mb->copy((const char *)&ip, sizeof(ACE_UINT32));  
  175.             mb->copy((const char *)&port, sizeof(ACE_UINT16));  
  176.             mb->copy((const char *)&seq, sizeof(unsigned int));  
  177.             mb->copy((const char *)&size, sizeof(unsigned short));  
  178.             mb->copy(data, size);  
  179.               
  180.             this->putq(mb);  
  181.         }  
  182.     }  
  183.     void TTcp::tcpNetThread_OnDataSendSucceeded(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size)  
  184.     {  
  185.         ACE_Message_Block* mb = 0;  
  186.         ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + TCP_PACK_HEADER_SIZE + size, TCP_DATA_SEND_SUCCEEDED));  
  187.         if (mb)  
  188.         {  
  189.             mb->copy((const char *)&ip, sizeof(ACE_UINT32));  
  190.             mb->copy((const char *)&port, sizeof(ACE_UINT16));  
  191.             mb->copy((const char *)&seq, sizeof(unsigned int));  
  192.             mb->copy((const char *)&size, sizeof(unsigned short));  
  193.             mb->copy(data, size);  
  194.               
  195.             this->putq(mb);  
  196.         }  
  197.     }  
  198.     void TTcp::tcpNetThread_OnDataSendFailed(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size)  
  199.     {  
  200.         ACE_Message_Block* mb = 0;  
  201.         ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + TCP_PACK_HEADER_SIZE + size, TCP_DATA_SEND_FAILED));  
  202.         if (mb)  
  203.         {  
  204.             mb->copy((const char *)&ip, sizeof(ACE_UINT32));  
  205.             mb->copy((const char *)&port, sizeof(ACE_UINT16));  
  206.             mb->copy((const char *)&seq, sizeof(unsigned int));  
  207.             mb->copy((const char *)&size, sizeof(unsigned short));  
  208.             mb->copy(data, size);  
  209.               
  210.             this->putq(mb);  
  211.         }  
  212.     }  
  213. } // namespace igame  

 

在完整的工程中,还有测试代码,这里就不列出了。本来已经在下载频道中上传了,并设置下载点数为0,结果传完后楞是自私都找不到?!NNDCSDN!!

这是下载资源。

 

来信到igame2000@hotmail.com

 

需要完整代码的请来信索取吧,必复。

 

 

此乃末技。。。。

 

关注
打赏
1659628745
查看更多评论
立即登录/注册

微信扫码登录

0.0545s