您当前的位置: 首页 >  udp

phymat.nico

暂无认证

  • 3浏览

    0关注

    1967博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ACE_Proactor UDP V2.0

phymat.nico 发布时间:2017-12-20 23:41:31 ,浏览量:3

单次发送单次接收

下面的程序使用Proactor模式用UDP通信:

(1)发送端发送一个复合消息,并打印发送的内容

(2)接收端接收一个复合消息并打印接收到的内容

 由于UDP是无连接的,所以这里没有Connector和Acceptor

本例是对ACE自带的example的稍微修改了一下(打印发送和接收的内容,这样更加直观)

发送端:client_main.cpp

[cpp] view plain copy
  1. #include   
  2. #include   
  3. #include   
  4. #include   
  5. #include   
  6. using namespace std;  
  7. #include "ace/Reactor.h"  
  8. #include "ace/Message_Queue.h"  
  9. #include "ace/Asynch_IO.h"  
  10. #include "ace/OS.h"  
  11. #include "ace/Proactor.h"  
  12. #include "ace/Asynch_Connector.h"  
  13. #include    
  14.   
  15.   
  16.   
  17.   
  18. //=============================================================================  
  19. /** 
  20.  *  @file    test_udp_proactor.cpp 
  21.  * 
  22.  *  $Id: test_udp_proactor.cpp 93639 2011-03-24 13:32:13Z johnnyw $ 
  23.  * 
  24.  *  This program illustrates how the  can be used to 
  25.  *  implement an application that does asynchronous operations using 
  26.  *  datagrams. 
  27.  * 
  28.  * 
  29.  *  @author Irfan Pyarali  and Roger Tragin  
  30.  */  
  31. //=============================================================================  
  32.   
  33.   
  34. #include "ace/OS_NS_string.h"  
  35. #include "ace/OS_main.h"  
  36. #include "ace/Proactor.h"  
  37. #include "ace/Asynch_IO.h"  
  38. #include "ace/INET_Addr.h"  
  39. #include "ace/SOCK_Dgram.h"  
  40. #include "ace/Message_Block.h"  
  41. #include "ace/Get_Opt.h"  
  42. #include "ace/Log_Msg.h"  
  43.   
  44.   
  45.   
  46.   
  47.   
  48.   
  49.   
  50. // Keep track of when we're done.  
  51. static int done = 0;  
  52.   
  53. /** 
  54.  * @class Sender 
  55.  * 
  56.  * @brief The class will be created by . 
  57.  */  
  58. class Sender : public ACE_Handler  
  59. {  
  60. public:  
  61.   Sender (void);  
  62.   ~Sender (void);  
  63.   
  64.   //FUZZ: disable check_for_lack_ACE_OS  
  65.   ///FUZZ: enable check_for_lack_ACE_OS  
  66.   int open (const ACE_TCHAR *host, u_short port);  
  67.   
  68. protected:  
  69.   // These methods are called by the freamwork  
  70.   
  71.   /// This is called when asynchronous writes from the dgram socket  
  72.   /// complete  
  73.   virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);  
  74.   
  75. private:  
  76.   
  77.   /// Network I/O handle  
  78.   ACE_SOCK_Dgram sock_dgram_;  
  79.   
  80.   /// wd (write dgram): for writing to the socket  
  81.   ACE_Asynch_Write_Dgram wd_;  
  82.   
  83.   const char* completion_key_;  
  84.   const char* act_;  
  85. };  
  86.   
  87. Sender::Sender (void)  
  88.   : completion_key_ ("Sender completion key"),  
  89.     act_ ("Sender ACT")  
  90. {  
  91. }  
  92.   
  93. Sender::~Sender (void)  
  94. {  
  95.   this->sock_dgram_.close ();  
  96. }  
  97.   
  98. int  
  99. Sender::open (const ACE_TCHAR *host,  
  100.               u_short port)  
  101. {  
  102.   // Initialize stuff  
  103.   
  104.   if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1)  
  105.     ACE_ERROR_RETURN ((LM_ERROR,  
  106.                        "[%D][line:%l]%p\n",  
  107.                        "ACE_SOCK_Dgram::open"), -1);  
  108.   
  109.   // Initialize the asynchronous read.  
  110.   if (this->wd_.open (*this,  
  111.                       this->sock_dgram_.get_handle (),  
  112.                       this->completion_key_,  
  113.                       ACE_Proactor::instance ()) == -1)  
  114.     ACE_ERROR_RETURN ((LM_ERROR,  
  115.                        "[%D][line:%l]%p\n",  
  116.                        "ACE_Asynch_Write_Dgram::open"), -1);  
  117.   
  118.   // We are using scatter/gather to send the message header and  
  119.   // message body using 2 buffers  
  120.   
  121.   // create a message block for the message header  
  122.   ACE_Message_Block* msg = 0;  
  123.   ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1);  
  124.   const char raw_msg [] = "To be or not to be.";  
  125.   // Copy buf into the Message_Block and update the wr_ptr ().  
  126.   msg->copy (raw_msg, ACE_OS::strlen (raw_msg) + 1);  
  127.   
  128.   // create a message block for the message body  
  129.   ACE_Message_Block* body = 0;  
  130.   ACE_NEW_RETURN (body, ACE_Message_Block (100), -1);  
  131.   ACE_OS::memset (body->wr_ptr (), 'X', 100);  
  132.   body->wr_ptr (100); // always remember to update the wr_ptr ()  
  133.   
  134.   // set body as the cont of msg.  This associates the 2 message blocks so  
  135.   // that a send will send the first block (which is the header) up to  
  136.   // length (), and use the cont () to get the next block to send.  You can  
  137.   // chain up to IOV_MAX message block using this method.  
  138.   msg->cont (body);  
  139.   
  140.   // do the asynch send  
  141.   size_t number_of_bytes_sent = 0;  
  142.   ACE_INET_Addr serverAddr (port, host);  
  143.   int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_);  
  144.     
  145.   
  146.   ACE_Message_Block* p = 0;  
  147.   p= msg;  
  148.   
  149.   switch (res)  
  150.     {  
  151.     case 0:  
  152.       // this is a good error.  The proactor will call our handler when the  
  153.       // send has completed.  
  154.       break;  
  155.     case 1:  
  156.       // actually sent something, we will handle it in the handler callback  
  157.       ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  158.       ACE_DEBUG ((LM_DEBUG,  
  159.                   "%s = %d\n",  
  160.                   "bytes sent immediately",  
  161.                   number_of_bytes_sent));  
  162.   
  163.       while (p != NULL)  
  164.       {  
  165.           ACE_DEBUG ((LM_DEBUG,"YOU SEND[%s]\n",p->rd_ptr()));  
  166.           p = p->cont();  
  167.       }  
  168.         
  169.       ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  170.       res = 0;  
  171.       break;  
  172.     case -1:  
  173.       // Something else went wrong.  
  174.       ACE_ERROR ((LM_ERROR,  
  175.                   "[%D][line:%l]%p\n",  
  176.                   "ACE_Asynch_Write_Dgram::recv"));  
  177.       // the handler will not get called in this case so lets clean up our msg  
  178.       msg->release ();  
  179.       break;  
  180.     default:  
  181.       // Something undocumented really went wrong.  
  182.       ACE_ERROR ((LM_ERROR,  
  183.                   "[%D][line:%l]%p\n",  
  184.                   "ACE_Asynch_Write_Dgram::recv"));  
  185.       msg->release ();  
  186.       break;  
  187.     }  
  188.   return res;  
  189. }  
  190.   
  191. void  
  192. Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)  
  193. {  
  194.   ACE_DEBUG ((LM_DEBUG,  
  195.               "handle_write_dgram called\n"));  
  196.   
  197.   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  198.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));  
  199.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));  
  200.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));  
  201.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ()));  
  202.   ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ()));  
  203.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));  
  204.   ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ()));  
  205.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));  
  206.   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  207.   
  208.   ACE_DEBUG ((LM_DEBUG,  
  209.               "Sender completed\n"));  
  210.   
  211.   // No need for this message block anymore.  
  212.   result.message_block ()->release ();  
  213.   
  214.   // Note that we are done with the test.  
  215.   done++;  
  216. }  
  217.   
  218.   
  219.   
  220. int  
  221. ACE_TMAIN (int argc, ACE_TCHAR *argv[])  
  222. {  
  223.   
  224.     //ACE_LOG_MSG->clr_flags(0);  
  225.     //ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::VERBOSE);  
  226.   
  227.     Sender sender;  
  228.     // Port that we're receiving connections on.  
  229.     u_short port = ACE_DEFAULT_SERVER_PORT;  
  230.     // Host that we're connecting to.  
  231.     string host("localhost");  
  232.     if (sender.open (host.c_str(), port) == -1)  
  233.     return -1;  
  234.   
  235.     while (true)  
  236.     {  
  237.         ACE_Proactor::instance ()->handle_events ();  
  238.     }  
  239.       
  240.     return 0;  
  241. }  

接收端server_main.cpp

[cpp] view plain copy
  1. #include "ace/OS_NS_string.h"  
  2. #include "ace/OS_main.h"  
  3. #include "ace/Proactor.h"  
  4. #include "ace/Asynch_IO.h"  
  5. #include "ace/INET_Addr.h"  
  6. #include "ace/SOCK_Dgram.h"  
  7. #include "ace/Message_Block.h"  
  8. #include "ace/Get_Opt.h"  
  9. #include "ace/Log_Msg.h"  
  10.   
  11.   
  12.   
  13. // Host that we're connecting to.  
  14. static ACE_TCHAR *host = 0;  
  15.   
  16. // Port that we're receiving connections on.  
  17. static u_short port = ACE_DEFAULT_SERVER_PORT;  
  18.   
  19. // Keep track of when we're done.  
  20. static int done = 0;  
  21.   
  22. /** 
  23.  * @class Receiver 
  24.  * 
  25.  * @brief This class will receive data from 
  26.  * the network connection and dump it to a file. 
  27.  */  
  28. class Receiver : public ACE_Service_Handler  
  29. {  
  30. public:  
  31.   // = Initialization and termination.  
  32.   Receiver (void);  
  33.   ~Receiver (void);  
  34.   
  35.   int open_addr (const ACE_INET_Addr &localAddr);  
  36.   
  37. protected:  
  38.   // These methods are called by the framework  
  39.   
  40.   /// This method will be called when an asynchronous read completes on  
  41.   /// a UDP socket.  
  42.   virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);  
  43.   
  44. private:  
  45.   ACE_SOCK_Dgram sock_dgram_;  
  46.   
  47.   /// rd (read dgram): for reading from a UDP socket.  
  48.   ACE_Asynch_Read_Dgram rd_;  
  49.   const char* completion_key_;  
  50.   const char* act_;  
  51. };  
  52.   
  53. Receiver::Receiver (void)  
  54.   : completion_key_ ("Receiver Completion Key"),  
  55.     act_ ("Receiver ACT")  
  56. {  
  57. }  
  58.   
  59. Receiver::~Receiver (void)  
  60. {  
  61.   sock_dgram_.close ();  
  62. }  
  63.   
  64. int  
  65. Receiver::open_addr (const ACE_INET_Addr &localAddr)  
  66. {  
  67.   ACE_DEBUG ((LM_DEBUG,  
  68.               "[%D][line:%l]Receiver::open_addr called\n"));  
  69.   
  70.   // Create a local UDP socket to receive datagrams.  
  71.   if (this->sock_dgram_.open (localAddr) == -1)  
  72.     ACE_ERROR_RETURN ((LM_ERROR,  
  73.                        "[%D][line:%l]%p\n",  
  74.                        "ACE_SOCK_Dgram::open"), -1);  
  75.   
  76.   // Initialize the asynchronous read.  
  77.   if (this->rd_.open (*this,  
  78.                       this->sock_dgram_.get_handle (),  
  79.                       this->completion_key_,  
  80.                       ACE_Proactor::instance ()) == -1)  
  81.     ACE_ERROR_RETURN ((LM_ERROR,  
  82.                        "[%D][line:%l]%p\n",  
  83.                        "ACE_Asynch_Read_Dgram::open"), -1);  
  84.   
  85.   // Create a buffer to read into.  We are using scatter/gather to  
  86.   // read the message header and message body into 2 buffers  
  87.   
  88.   // create a message block to read the message header  
  89.   ACE_Message_Block* msg = 0;  
  90.   ACE_NEW_RETURN (msg, ACE_Message_Block (1024), -1);  
  91.   
  92.   // the next line sets the size of the header, even though we  
  93.   // allocated a the message block of 1k, by setting the size to 20  
  94.   // bytes then the first 20 bytes of the reveived datagram will be  
  95.   // put into this message block.  
  96.   msg->size (20); // size of header to read is 20 bytes  
  97.   
  98.   // create a message block to read the message body  
  99.   ACE_Message_Block* body = 0;  
  100.   ACE_NEW_RETURN (body, ACE_Message_Block (1024), -1);  
  101.   // The message body will not exceed 1024 bytes, at least not in this test.  
  102.   
  103.   // set body as the cont of msg.  This associates the 2 message  
  104.   // blocks so that a read will fill the first block (which is the  
  105.   // header) up to size (), and use the cont () block for the rest of  
  106.   // the data.  You can chain up to IOV_MAX message block using this  
  107.   // method.  
  108.   msg->cont (body);  
  109.   
  110.   // ok lets do the asynch read  
  111.   size_t number_of_bytes_recvd = 0;  
  112.   
  113.   int res = rd_.recv (msg,  
  114.                       number_of_bytes_recvd,  
  115.                       0,  
  116.                       PF_INET,  
  117.                       this->act_);  
  118.   switch (res)  
  119.     {  
  120.     case 0:  
  121.       // this is a good error.  The proactor will call our handler when the  
  122.       // read has completed.  
  123.       break;  
  124.     case 1:  
  125.       // actually read something, we will handle it in the handler callback  
  126.       ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  127.       ACE_DEBUG ((LM_DEBUG,  
  128.                   "%s = %d\n",  
  129.                   "bytes recieved immediately",  
  130.                   number_of_bytes_recvd));  
  131.   
  132.       ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  133.       res = 0;  
  134.       break;  
  135.     case -1:  
  136.       // Something else went wrong.  
  137.       ACE_ERROR ((LM_ERROR,  
  138.                   "[%D][line:%l]%p\n",  
  139.                   "ACE_Asynch_Read_Dgram::recv"));  
  140.       // the handler will not get called in this case so lets clean up our msg  
  141.       msg->release ();  
  142.       break;  
  143.     default:  
  144.       // Something undocumented really went wrong.  
  145.       ACE_ERROR ((LM_ERROR,  
  146.                   "[%D][line:%l]%p\n",  
  147.                   "ACE_Asynch_Read_Dgram::recv"));  
  148.       msg->release ();  
  149.       break;  
  150.     }  
  151.   
  152.   return res;  
  153. }  
  154.   
  155. void  
  156. Receiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)  
  157. {  
  158.   ACE_DEBUG ((LM_DEBUG,  
  159.               "handle_read_dgram called\n"));  
  160.   
  161.   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  162.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));  
  163.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));  
  164.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));  
  165.   ACE_INET_Addr peerAddr;  
  166.   result.remote_address (peerAddr);  
  167.   ACE_DEBUG ((LM_DEBUG, "%s = %s:%d\n", "peer_address", peerAddr.get_host_addr (), peerAddr.get_port_number ()));  
  168.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ()));  
  169.   ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ()));  
  170.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));  
  171.   ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ()));  
  172.   ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));  
  173.   ACE_DEBUG ((LM_DEBUG, "********************\n"));  
  174.   
  175.   if (result.success () && result.bytes_transferred () != 0)  
  176.     {  
  177.       // loop through our message block and print out the contents  
  178.       for (const ACE_Message_Block* msg = result.message_block (); msg != 0; msg = msg->cont ())  
  179.         { // use msg->length () to get the number of bytes written to the message  
  180.           // block.  
  181.           ACE_DEBUG ((LM_DEBUG, "Buf=[size=", msg->length ()));  
  182.           for (u_long i = 0; i length (); ++i)  
  183.             ACE_DEBUG ((LM_DEBUG,  
  184.                         "%c", (msg->rd_ptr ())[i]));  
  185.           ACE_DEBUG ((LM_DEBUG, "]\n"));  
  186.         }  
  187.     }  
  188.   
  189.   ACE_DEBUG ((LM_DEBUG,  
  190.               "Receiver completed\n"));  
  191.   
  192.   // No need for this message block anymore.  
  193.   result.message_block ()->release ();  
  194.   
  195.   // Note that we are done with the test.  
  196.   done++;  
  197. }  
  198.   
  199.   
  200. int  
  201.     ACE_TMAIN (int argc, ACE_TCHAR *argv[])  
  202. {  
  203.   
  204.     //ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::VERBOSE);  
  205.   
  206.     Receiver receiver;  
  207.   
  208.     if (receiver.open_addr (ACE_INET_Addr (port)) == -1)  
  209.         return -1;  
  210.   
  211.     while (true)  
  212.     {  
  213.         ACE_Proactor::instance ()->handle_events ();  
  214.     }  
  215.   
  216.     return 0;  
  217. }  

先运行接收端,再运行发送端,你懂的。

发送端程序运行结果:

接收端运行结果:

定时多目标发送

程序的功能:

(1)UDP发送内容到P1,IP2,...,IPn(地址列表从文件读取) (1)发送内容从文件中读取; (1)发送时间间隔从文件中读取;

[cpp] view plain copy
  1. //=============================================================================  
  2. /** 
  3.  *  @file    test_udp_proactor.cpp 
  4.  * 
  5.  *  $Id: test_udp_proactor.cpp 93639 2011-03-24 13:32:13Z johnnyw $ 
  6.  * 
  7.  *  This program illustrates how the  can be used to 
  8.  *  implement an application that does asynchronous operations using 
  9.  *  datagrams. 
  10.  * 
  11.  * 
  12.  *  @author Irfan Pyarali  and Roger Tragin  
  13.  */  
  14. //=============================================================================  
  15.   
  16. #include   
  17. #include   
  18. #include   
  19. #include   
  20. #include   
  21. using namespace std;  
  22. //#include "ace/Reactor.h"  
  23. #include "ace/Message_Queue.h"  
  24. #include "ace/Asynch_IO.h"  
  25. #include "ace/OS.h"  
  26. #include "ace/Proactor.h"  
  27. #include "ace/Asynch_Connector.h"  
  28. #include    
  29.   
  30. #include "ace/OS_NS_string.h"  
  31. #include "ace/OS_main.h"  
  32. #include "ace/INET_Addr.h"  
  33. #include "ace/SOCK_Dgram.h"  
  34. #include "ace/Message_Block.h"  
  35. #include "ace/Get_Opt.h"  
  36. #include "ace/Log_Msg.h"  
  37. #include "ace/Event_Handler.h"  
  38. #include "ace/Date_Time.h"  
  39. #include "ace/WIN32_Proactor.h"  
  40.   
  41. namespace global  
  42. {  
  43.     int delay = 2;  
  44.     //int interval = 60*10;//每interval 秒计时一次  
  45.     int interval = 2;//每interval 秒计时一次  
  46.     void print_current_time(void)  
  47.     {  
  48.         ACE_Date_Time date(ACE_OS::gettimeofday());  
  49.         cout
关注
打赏
1659628745
查看更多评论
立即登录/注册

微信扫码登录

0.2501s