注:本文仅对使用ACE进行网络通信进行演示说明。本文中的代码皆使用doxgen的注释风格。本文中使用的事件机制,其原理与实现请参考[ 基于C++的事件机制设计[2.0]]一文。
ACE的Proactor对Epoll和IOCP进行了良好包装,因此,使用ACE来进行网络开发是相当的便利,性能也不差。闲言少叙,看代码。
这里以TCP协议进行流式通信。我们需要解析流,得出每次接收到的数据包大小和包含的数据域,假定我们的包结构如下:
包序列号(32Bit) | 长度(16Bit) | 数据域(大小为长度所表示的字节)... | (下一包)
通过分析由包序列号和长度组成的包头来解决半包,粘包等问题,许多其它文章也有描述,这里就省略了。
这样可以确定我们的包头结构如下:
- #pragma pack(push)
- #pragma pack(1)
- /**
- * @brief Tcp包头结构
- */
- typedef struct tag_TTcpPackHeader
- {
- unsigned int seq; //length() m_Reader.read(*m_CurDataMB, m_CurDataMB->space());
- return ;
- }
- TTcpPackHeader* header = reinterpret_cast(this->m_CurDataMB->rd_ptr());
- ACE_Message_Block* dataMB = this->m_CurDataMB->cont();
- if (!dataMB)
- {
- ACE_NEW_NORETURN(dataMB, ACE_Message_Block(header->len));
- if (dataMB)
- this->m_CurDataMB->cont(dataMB);
- else
- {
- this->m_CurDataMB->release();
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Failed to allocated: %i/n"), errno));
- delete this;
- return ;
- }
- }
- if (dataMB->length() == header->len)
- {
- // 成功读取了数据?
- m_OnDataReceive(m_ClientAddr.get_ip_address(), m_ClientAddr.get_port_number(), header->seq, dataMB->rd_ptr(), header->len);
- m_CurDataMB->release();
- initCurDataMB(); // 下一包数据
- this->m_Reader.read(*m_CurDataMB, m_CurDataMB->space()); // next, try to get header
- return ;
- }
- this->m_Reader.read(*dataMB, dataMB->space()); // try to get data left
- }
- }
- void TTcpHandler::handle_write_stream(const ACE_Asynch_Write_Stream::Result& result)
- {
- if (result.success() && result.bytes_transferred() > 0) // 发送成功
- {
- ACE_Message_Block& mb = result.message_block();
- #ifdef _DEBUG
- ACE_TCHAR addrStr[128];
- m_ClientAddr.addr_to_string(addrStr, sizeof(addrStr) / sizeof(ACE_TCHAR));
- ACE_DEBUG((LM_INFO, ACE_TEXT("Send to client: %s len:%i/n"), addrStr, result.bytes_transferred()));
- char* ptr = mb.rd_ptr();
- #endif
- mb.release();
- }
- }
- void TTcpHandler::initCurDataMB()
- {
- ACE_NEW_NORETURN(m_CurDataMB, ACE_Message_Block(TCP_PACK_HEADER_SIZE, TCP_DATA_RECEIVE));
- }
- } // namespace igame
然后是TTcpNetThread,该类的实现也相当简单:
- #include
- #include "TCPNetThread.h"
- namespace igame
- {
- int TTcpNetThread::open() { return this->activate(); }
- int TTcpNetThread::close()
- {
- ACE_Proactor::instance()->proactor_end_event_loop(); // 终止ACE_Proactor循环
- this->wait(); // 等待清理现场
- return 0;
- }
- int TTcpNetThread::svc()
- {
- ACE_INET_Addr listenAddr(DEF_LISTENING_PORT); // 默认监听地址
- TTcpAcceptor tcpAcceptor; // 接收器
- // 设置事件
- tcpAcceptor.setOnClientConnect(m_OnClientConnect);
- tcpAcceptor.setOnClientDisconnect(m_OnClientDisconnect);
- tcpAcceptor.setOnClientValidate(m_OnClientValidate);
- tcpAcceptor.setOnDataReceive(m_OnDataReceive);
- tcpAcceptor.setOnDataSendFailed(m_OnDataSendFailed);
- tcpAcceptor.setOnDataSendSucceeded(m_OnDataSendSucceeded);
- // 演出开始
- if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)
- ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p/n"), ACE_TEXT("failed to open TcpAcceptor errno=%i/n"), errno), -1);
- // Proactor的事件循环开始
- ACE_Proactor::instance()->proactor_run_event_loop();
- ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin/n")));
- return 0;
- }
- } // namespace igame
最后,对以上三个类进行聚合,封装,就成了TTcp类,在此之前,先定义消息类型:
- /**
- * @name TCP的ACE_Message_Block类型定义 @see ACE_Message_Block
- * @{
- */
- /// @brief TCP数据接收
- #define TCP_DATA_RECEIVE 0x5505
- /// @brief TCP客户端连接
- #define TCP_CLIENT_CONNECT 0x5506
- /// @brief TCP客户端断线
- #define TCP_CLIENT_DISCONNECT 0x5507
- /// @brief TCP数据发送
- #define TCP_DATA_SEND 0x5508
- /// @brief TCP数据发送成功
- #define TCP_DATA_SEND_SUCCEEDED 0x5509
- /// @brief TCP数据发送失败
- #define TCP_DATA_SEND_FAILED 0x550A
- /**
- * @}
- */
- /// 默认监听地址:偶的车牌号
- #define DEF_LISTENING_PORT 777
现在看看TTcp的实现:
唔,太长了,下一篇吧。
此乃末技,
不知何用。
堆砌字数,
凑成更新。
走过路过,
不要错过。
请原谅偶拖篇幅,这里奉上拖欠的数字。
TTcp的实现如下:
- #include "Tcp.h"
- namespace igame
- {
- TTcp::TTcp()
- :m_TcpNetThd(0)
- {
- ACE_NEW_NORETURN(m_TcpNetThd, TTcpNetThread()); // 创建TTcpNetThread对象实例
- }
- TTcp::~TTcp()
- {
- if (m_TcpNetThd) // 释放
- delete m_TcpNetThd;
- }
- void TTcp::open()
- {
- ACE_TRACE("TTcp::open");
- // 所有TTcpNetThread的事件,交由TTcp来处理
- // TOnClientValidate除外,该事件需要特定的逻辑,且无法异步
- if (m_TcpNetThd)
- {
- m_TcpNetThd->setOnClientConnect(EVENT(TTcpHandler::TOnClientConnect, TTcp, this, tcpNetThread_OnClientConnect));
- m_TcpNetThd->setOnClientDisconnect(EVENT(TTcpHandler::TOnClientDisconnect, TTcp, this, tcpNetThread_OnClientDisconnect));
- m_TcpNetThd->setOnClientValidate(m_OnClientValidate);
- m_TcpNetThd->setOnDataReceive(EVENT(TTcpHandler::TOnDataReceive, TTcp, this, tcpNetThread_OnDataReceive));
- m_TcpNetThd->setOnDataSendFailed(EVENT(TTcpHandler::TOnDataSendFailed, TTcp, this, tcpNetThread_OnDataSendFailed));
- m_TcpNetThd->setOnDataSendSucceeded(EVENT(TTcpHandler::TOnDataSendSucceeded, TTcp, this, tcpNetThread_OnDataSendSucceeded));
- }
- if (activate() == -1)
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Resume thread failed")));
- }
- void TTcp::close()
- {
- if (m_TcpNetThd)
- m_TcpNetThd->close();
- ACE_TRACE("TTcp::close");
- ACE_Message_Block* termBlock; // 结束信号
- ACE_NEW_NORETURN(termBlock, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP));
- if (!termBlock)
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Allocate failed %i"), errno));
- else
- {
- putq(termBlock);
- wait();
- }
- }
- int TTcp::send(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* buf, unsigned short len)
- {
- ACE_Message_Block* mb = 0; // 数据包
- ACE_NEW_RETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + sizeof(unsigned int) + sizeof(unsigned short) + len, TCP_DATA_SEND), -1);
- // 格式:ip | port | seq | len | 数据...
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&seq, sizeof(unsigned int));
- mb->copy((const char *)&len, sizeof(unsigned short));
- mb->copy(buf, len);
- return putq(mb);
- }
- int TTcp::svc()
- {
- ACE_TRACE("TTcp::svc");
- if (m_TcpNetThd->open() == -1)
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Failed to pen TTcpNetThread: %i"), errno));
- ACE_Message_Block* msg = 0;
- while(true)
- {
- if (getq(msg) == -1)
- {
- ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("Failed to getq %i"), errno), -1);
- }
- switch(msg->msg_type())
- {
- case ACE_Message_Block::MB_HANGUP: // 偶要退出
- {
- ACE_DEBUG((LM_DEBUG, ACE_TEXT("Quit")));
- msg->release();
- return 0;
- }
- break;
- case TCP_CLIENT_CONNECT: // 客户端连接
- {
- int len = msg->length();
- int hLen = sizeof(TTcpHandler *);
- if (msg->length() != TCP_PACK_HEADER_SIZE + sizeof(TTcpHandler *))
- ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("Tcp connection message block invalid!")), -1);
- char* ptr = msg->rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);
- TTcpHandler* handler = (TTcpHandler *)(*(int *)ptr);
- {
- ACE_Guard lock(m_Lock);
- m_AddrMap.insert(make_pair((unsigned __int64)ip rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr;
- {
- ACE_Guard lock(m_Lock);
- m_AddrMap.erase((unsigned __int64)ip seq, data, header->len);
- }
- break;
- case TCP_DATA_SEND:
- {
- if (msg->length() > sizeof(TTcpPackHeader))
- {
- char* ptr = msg->rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);
- unsigned int seq = *(unsigned int *)ptr; ptr += sizeof(unsigned int);
- unsigned short len = *(unsigned short *)ptr; ptr += sizeof(unsigned short);
- const char* data = ptr;
- {
- ACE_Guard _lock(m_Lock);
- hash_map::iterator it = m_AddrMap.find((unsigned __int64)ip rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);
- TTcpPackHeader* header = (TTcpPackHeader *)ptr; ptr += TCP_PACK_HEADER_SIZE;
- const char* data = ptr;
- m_OnDataSendSucceeded(ip, port, header->seq, data, header->len);
- }
- break;
- case TCP_DATA_SEND_FAILED:
- {
- char* ptr = msg->rd_ptr();
- ACE_UINT32 ip = *(ACE_UINT32 *)ptr; ptr += sizeof(ACE_UINT32);
- ACE_UINT16 port = *(ACE_UINT16 *)ptr; ptr += sizeof(ACE_UINT16);
- TTcpPackHeader* header = (TTcpPackHeader *)ptr; ptr += TCP_PACK_HEADER_SIZE;
- const char* data = ptr;
- m_OnDataSendFailed(ip, port, header->seq, data, header->len);
- }
- break;
- default:
- {
- ACE_DEBUG((LM_ERROR, ACE_TEXT("Unknown ACE_Message_Block type %i/n"), msg->msg_type()));
- }
- break;
- } // switch
- msg->release();
- } // while true
- return 0;
- }
- void TTcp::tcpNetThread_OnClientConnect(ACE_UINT32 ip, ACE_UINT16 port, TTcpHandler* handler)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + sizeof(TTcpHandler *), TCP_CLIENT_CONNECT));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&handler, sizeof(TTcpHandler *));
- this->putq(mb);
- }
- }
- void TTcp::tcpNetThread_OnClientDisconnect(ACE_UINT32 ip, ACE_UINT16 port)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16), TCP_CLIENT_DISCONNECT));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- this->putq(mb);
- }
- }
- void TTcp::tcpNetThread_OnDataReceive(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + TCP_PACK_HEADER_SIZE + size, TCP_DATA_RECEIVE));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&seq, sizeof(unsigned int));
- mb->copy((const char *)&size, sizeof(unsigned short));
- mb->copy(data, size);
- this->putq(mb);
- }
- }
- void TTcp::tcpNetThread_OnDataSendSucceeded(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + TCP_PACK_HEADER_SIZE + size, TCP_DATA_SEND_SUCCEEDED));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&seq, sizeof(unsigned int));
- mb->copy((const char *)&size, sizeof(unsigned short));
- mb->copy(data, size);
- this->putq(mb);
- }
- }
- void TTcp::tcpNetThread_OnDataSendFailed(ACE_UINT32 ip, ACE_UINT16 port, unsigned int seq, const char* data, unsigned short size)
- {
- ACE_Message_Block* mb = 0;
- ACE_NEW_NORETURN(mb, ACE_Message_Block(sizeof(ACE_UINT32) + sizeof(ACE_UINT16) + TCP_PACK_HEADER_SIZE + size, TCP_DATA_SEND_FAILED));
- if (mb)
- {
- mb->copy((const char *)&ip, sizeof(ACE_UINT32));
- mb->copy((const char *)&port, sizeof(ACE_UINT16));
- mb->copy((const char *)&seq, sizeof(unsigned int));
- mb->copy((const char *)&size, sizeof(unsigned short));
- mb->copy(data, size);
- this->putq(mb);
- }
- }
- } // namespace igame
在完整的工程中,还有测试代码,这里就不列出了。本来已经在下载频道中上传了,并设置下载点数为0,结果传完后楞是自私都找不到?!NNDCSDN!!
这是下载资源。
来信到igame2000@hotmail.com
需要完整代码的请来信索取吧,必复。
此乃末技。。。。