您当前的位置: 首页 > 

phymat.nico

暂无认证

  • 1浏览

    0关注

    1967博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ASIO协程彻底转变你的思维

phymat.nico 发布时间:2017-12-19 23:16:48 ,浏览量:1

avbot 发布了许久了, 最近突然有个用户跑来说,希望能增加个调用 “外部脚本” 的功能,方便扩展。 我一向对设计一个 plugin 机制极力的避免,不喜欢动态载入的模块扩展程序本身的功能。何况 avbot 是 c++开发的,调用脚本并不是容易的事情。(好吧,真实的原因是我被 mingw (VC 不支持 utf8源码,我已经抛弃了) 折腾怕了,不想再搞个 python 。windows实在是恐怖的平台,写点程序麻烦的要死,编译麻烦的要死。可是 avbot 又必须跨平台,结果是我一天写好的东西要在 windows (虚拟机) 里折腾好几天,累死人 ) 于是我决定提供一个  JSON 接口,内置一个简单的 HTTP Server, 用脚本(python应该 HTTP JSON 模块有的是,对吧)连接到 avbot ,然后 avbot 将发生的每条消息以 json 的形式返回给 外部脚本。 另外,默认使用 HTTP 的connection: keep-alive 模式,所以保持一个长连接即可。 那么,avbot 需要支持 不确定数目的消息接收方了。 对于链接到 avbot 的客户端而言, avbot 并不保留之前的所有消息,而是从连接上的那一刻开始,后续的消息才能通知到。 一个很明显的思路就是,将链接上的客户端做成一个链表/列队, avbot 收到消息后,遍历这个列队执行消息发送。 这个思路很简单,可是如果要求 : 必须单线程异步呢? avbot 是一个纯粹的单线程程序,绝对不允许多线程化。所有的逻辑必须使用异步处理。 那么,这个问题就复杂化了, “avbot 收到消息后,遍历这个列队执行消息发送” 这个做法,不可避免的带来了阻塞。好吧,异步遍历吧。 要是异步遍历还没遍历完,又来一个消息呢? 考虑这个问题,你会发疯的。因为异步,太多的细节需要考虑了。真的。 好吧,又有个好主意了,为每个客户端建立一个列队,每次遍历就是把要发送的消息挂入列队即可。这样也不需要异步遍历了,同步就可以。解决了异步遍历的时候又来一个消息导致的痛苦的调度。 然后细分,考虑每个客户端,就是等待 “发送列队” 不为空!等等,一直这么等待也不行,如果客户断开了链接呢? 所以要 “同时等待发送列队不为空&&客户正常在线,并且已经发送了 HTTP 请求头部” 好绕口,不过也只能如此了。 avbot 因为默认使用了 keep-alive , 所以发送是一个死循环,知道客户端主动断开链接或者网络发生错误。如果 客户端死了,那么,发送列队兴许会出现 爆队 的情况。所以要限制发送列队的大小。不是满了就不发送,而是满了后就把早的消息踢掉,也就是让 客户端发生“暂时性卡死”后,还能继续处理最后的几条信息。 诶,复杂的逻辑终于理清了,代码呢?! 啊累? 靠,这么复杂的 逻辑,得写一长段代码,调试几百年了吧? 错,我只花了几个小时, 不到 100 行的代码就轻松实现了全部要求。 !!!!!!!!!!!!!!!!!!! WHAT !!!!!!!!!!!!!!!!!!! 这种功能不可能不用个千把行代码的吧?! 如果使用以前的老办法,确实如此。 可是,自从发现了 ASIO 后,我被 ASIO 爸爸发明的协程深深的 震惊了! 利用 ASIO 爸爸提出的协程思想,我只用了不到 100行代码就全部完成了以上复杂的逻辑,而且,全部都是异步的哦~ 。 好,废话不多,先贴代码。然后解释。
  1. // avbot_rpc_server 由 acceptor_server 这个辅助类调用
  2. // 为其构造函数传入一个 m_socket, 是 shared_ptr 的.
  3. class avbot_rpc_server
  4. {
  5. public:
  6.         typedef boost::signals2::signal on_message_signal_type;
  7.         static on_message_signal_type on_message;
  8.         typedef boost::asio::ip::tcp Protocol;
  9.         typedef boost::asio::basic_stream_socket socket_type;
  10.         typedef void result_type;
  11.         avbot_rpc_server(boost::shared_ptr _socket)
  12.           : m_socket(_socket)
  13.           , m_request(new boost::asio::streambuf)
  14.           , m_responses(new boost::circular_buffer_space_optimized(20) )
  15.         {
  16.                 m_socket->get_io_service().post(
  17.                         boost::asio::detail::bind_handler(*this, boost::coro::coroutine(), boost::system::error_code(), 0)
  18.                 );
  19.         }
  20.         // 数据操作跑这里,嘻嘻.
  21.         void operator()(boost::coro::coroutine coro, boost::system::error_code ec, std::size_t bytestransfered)
  22.         {
  23.                 boost::shared_ptr        sendbuf;
  24.                 if (ec){
  25.                         m_socket->close(ec);
  26.                         // 看来不是 HTTP 客户端,诶,滚蛋啊!
  27.                         // 沉默,直接关闭链接. 取消信号注册.
  28.                          if (m_connect && m_connect->connected())
  29.                                 m_connect->disconnect();
  30.                         return;
  31.                 }
  32.                 CORO_REENTER(&coro)
  33.                 {
  34.                 do{
  35.                         // 发起 HTTP 处理操作.
  36.                         _yield boost::asio::async_read_until(*m_socket, *m_request, "\r\n\r\n", boost::bind(*this, coro, _1, _2));
  37.                         m_request->consume(bytestransfered);
  38.                         // 解析 HTTP
  39.                         // 等待消息.
  40.                         if (m_responses->empty())
  41.                         {
  42.                                 if (!m_connect){
  43.                                         // 将自己注册到 avbot 的 signal 去
  44.                                         // 等 有消息的时候,on_message 被调用,也就是下面的 operator() 被调用.
  45.                                         _yield m_connect = boost::make_shared
  46.                                                 (on_message.connect(boost::bind(*this, coro, _1, _2, _3, _4, _5)));
  47.                                         // 就这么退出了,但是消息来的时候,om_message 被调用,然后下面的那个
  48.                                         // operator() 就被调用了,那个 operator() 接着就会重新回调本 operator()
  49.                                         // 结果就是随着 coroutine 的作用,代码进入这一行,然后退出  if 判定
  50.                                         // 然后进入发送过程.
  51.                                 }else{
  52.                                         // 如果已经注册,直接返回。时候如果消息来了,on_message 被调用,也就
  53.                                         // 是下面的 operator() 被调用. 结果就是随着 coroutine 的作用,代码
  54.                                         // 进入上面那行,然后退出  if 判定。然后进入发送过程.
  55.                                         return;
  56.                                 }
  57.                                 // signals2 回调的时候会进入到这一行.
  58.                         }
  59.                         // 进入发送过程
  60.                         sendbuf = m_responses->front();
  61.                         _yield boost::asio::async_write(*m_socket, *sendbuf, boost::bind(*this, coro, _1, _2) );
  62.                         m_responses->pop_front();
  63.                         // 写好了,重新开始我们的处理吧!
  64.                 }while(1);
  65.                 }
  66.         }
  67.         // signal 的回调到了这里, 这里我们要区分对方是不是用了 keep-alive 呢.
  68.         void operator()(boost::coro::coroutine coro, std::string protocol, std::string room, std::string who, std::string message, sender_flags)
  69.         {
  70.                 pt::ptree jsonmessage;
  71.                 boost::shared_ptr buf(new boost::asio::streambuf);
  72.                 std::ostream        stream(buf.get());
  73.                 std::stringstream        teststream;
  74.                 jsonmessage.put("protocol", protocol);
  75.                 jsonmessage.put("root", room);
  76.                 jsonmessage.put("who", who);
  77.                 jsonmessage.put("msg", message);
  78.                 js::write_json(teststream,  jsonmessage);
  79.                 // 直接写入 json 格式的消息吧!
  80.                 stream
关注
打赏
1659628745
查看更多评论
立即登录/注册

微信扫码登录

0.0460s