您当前的位置: 首页 > 

phymat.nico

暂无认证

  • 0浏览

    0关注

    1967博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

高并发的epoll+线程池,epoll在线程池内

phymat.nico 发布时间:2017-12-26 01:18:06 ,浏览量:0

epoll是linux下高并发服务器的完美方案,因为是基于事件触发的,所以比select快的不只是一个数量级。
单线程epoll,触发量可达到15000,但是加上业务后,因为大多数业务都与数据库打交道,所以就会存在阻塞的情况,这个时候就必须用多线程来提速。
 
epoll在线程池内,测试结果2000个/s
增加了网络断线后的无效socket检测。
 
测试工具:stressmark
因为加了适用与ab的代码,所以也可以适用ab进行压力测试。
char buf[1000] = {0}; sprintf(buf,"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s","Hello world!\n"); send(socketfd,buf, strlen(buf),0);
 
 
 
#include #include #include #include #include #include #include #include #include #include #include #include #include //stl head #include //包含hash_map 的头文件 //#include //stl的map using namespace std ; //std 命名空间 using namespace __gnu_cxx ; //而hash_map是在__gnu_cxx的命名空间里的 int init_thread_pool ( int threadNum ) ; void *epoll_loop ( void * para ) ; void *check_connect_timeout ( void * para ) ; struct sockStruct {      time_t time ;      unsigned int * recvBuf ; } ; //hash-map //hash_map        sock_map; hash_map         sock_map ;   # define MAXRECVBUF 4096 # define MAXBUF MAXRECVBUF +10 int fd_Setnonblocking ( int fd ) {      int op ;       op =fcntl (fd ,F_GETFL ,0 ) ;     fcntl (fd ,F_SETFL ,op |O_NONBLOCK ) ;        return op ; }   void on_sigint ( int signal ) {      exit (0 ) ; }   /* handle_message - 处理每个 socket 上的消息收发 */ int handle_message ( int new_fd ) {      char buf [MAXBUF + 1 ] ;      char sendbuf [MAXBUF +1 ] ;      int len ;      /* 开始处理每个新连接上的数据收发 */     bzero (buf , MAXBUF + 1 ) ;      /* 接收客户端的消息 */      //len = recv(new_fd, buf, MAXBUF, 0);      int nRecvBuf = MAXRECVBUF ; //设置为32K      setsockopt (new_fd , SOL_SOCKET , SO_RCVBUF , ( const char * ) &nRecvBuf , sizeof ( int ) ) ;     len = recv (new_fd , &buf , MAXBUF ,0 ) ;      //--------------------------------------------------------------------------------------------      //这块为了使用ab测试      char bufSend [1000 ] = {0 } ;      sprintf (bufSend , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;      send (new_fd ,bufSend , strlen (buf ) ,0 ) ;      //--------------------------------------------------------------------------------------------      if (len > 0 ) {          //printf ("%d接收消息成功:'%s',共%d个字节的数据\n", new_fd, buf, len);          //hash-map                  hash_map : : iterator it_find ;         it_find = sock_map . find (new_fd ) ;          if (it_find = = sock_map .end ( ) ) {              //新的网络连接,申请新的接收缓冲区,并放入map中              //printf("new socket %d\n", new_fd);             sockStruct newSockStruct ;             newSockStruct . time = time ( ( time_t * )0 ) ;             newSockStruct .recvBuf = ( unsigned int * ) malloc (1000 ) ;              memset (newSockStruct .recvBuf , 0 , 1000 ) ;              strcat ( ( char * )newSockStruct .recvBuf , buf ) ;             sock_map .insert ( pair (new_fd , newSockStruct ) ) ;          } else {              //网络连接已经存在,找到对应的数据缓冲区,将接收到的数据拼接到数据缓冲区中              //printf("socket %d exist!\n", it_find->first);              (it_find - >second ) . time = time ( ( time_t * )0 ) ;                 //时间更改              char * bufSockMap = ( char * ) (it_find - >second ) .recvBuf ;     //数据存储              strcat (bufSockMap , buf ) ;              //printf("bufSockMap:%s\n", bufSockMap);          }      }      else {          if (len second ) .recvBuf ) ;     sock_map .erase (it_find ) ;      close (new_fd ) ;      return len ; }      int listenfd ;      int sock_op =1 ;      struct sockaddr_in address ;      struct epoll_event event ;      struct epoll_event events [1024 ] ;      int epfd ;      int n ;      int i ;      char buf [512 ] ;      int off ;      int result ;      char *p ; int main ( int argc , char * argv [ ] ) {     init_thread_pool (1 ) ;      signal (SIGPIPE , SIG_IGN ) ;      signal (SIGCHLD , SIG_IGN ) ;      signal ( SIGINT , &on_sigint ) ;     listenfd = socket ( AF_INET , SOCK_STREAM ,0 ) ;      setsockopt (listenfd ,SOL_SOCKET ,SO_REUSEADDR , &sock_op , sizeof (sock_op ) ) ;        memset ( &address ,0 , sizeof (address ) ) ;     address .sin_addr .s_addr = htonl ( INADDR_ANY ) ;     address .sin_port = htons (8006 ) ;      bind (listenfd , ( struct sockaddr * ) &address , sizeof (address ) ) ;      listen (listenfd ,1024 ) ;     fd_Setnonblocking (listenfd ) ;       epfd =epoll_create (65535 ) ;      memset ( &event ,0 , sizeof (event ) ) ;     event .data .fd =listenfd ;     event .events =EPOLLIN |EPOLLET ;     epoll_ctl (epfd ,EPOLL_CTL_ADD ,listenfd , &event ) ;      while (1 ) {          sleep (1000 ) ;      }      return 0 ; } /************************************************* * Function: * init_thread_pool * Description: * 初始化线程 * Input: * threadNum:用于处理epoll的线程数 * Output: * * Others: * 此函数为静态static函数, *************************************************/ int init_thread_pool ( int threadNum ) {      int i ,ret ;      pthread_t threadId ;      //初始化epoll线程池      for ( i = 0 ; i 0 )          {              for (i =0 ;i 0 )                          {                             fd_Setnonblocking (event .data .fd ) ;                             event .events =EPOLLIN |EPOLLET ;                             epoll_ctl (epfd ,EPOLL_CTL_ADD ,event .data .fd , &event ) ;                          }                          else                          {                              if ( errno = =EAGAIN )                              break ;                          }                      }                  }                  else                  {                      if (events [i ] .events &EPOLLIN )                      {                          //handle_message(events[i].data.fd);                          char recvBuf [1024 ] = {0 } ;                          int ret = 999 ;                          int rs = 1 ;                          while (rs )                          {                             ret = recv (events [n ] .data .fd ,recvBuf ,1024 ,0 ) ; // 接受客户端消息                              if (ret 0 ) {                             count111 + + ;                              struct tm *today ;                              time_t ltime ;                              time ( &nowtime ) ;                              if (nowtime ! = oldtime ) {                                  printf ( "%d\n" , count111 ) ;                                 oldtime = nowtime ;                                 count111 = 0 ;                              }                              char buf [1000 ] = {0 } ;                              sprintf (buf , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;                              send (events [i ] .data .fd ,buf , strlen (buf ) ,0 ) ;                              //    CGelsServer Gelsserver;                              //    Gelsserver.handle_message(events[i].data.fd);                          }                         epoll_ctl (epfd , EPOLL_CTL_DEL , events [i ] .data .fd , &event ) ;                          close (events [i ] .data .fd ) ;                      }                      else if (events [i ] .events &EPOLLOUT )                      {                          sprintf (buf , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;                          send (events [i ] .data .fd ,buf , strlen (buf ) ,0 ) ;                          /*                         if(p!=NULL)                         {                             free(p);                             p=NULL;                         }                         */                          close (events [i ] .data .fd ) ;                      }                      else                      {                          close (events [i ] .data .fd ) ;                      }                  }              }          }      } } /************************************************* * Function: * check_connect_timeout * Description: * 检测长时间没反应的网络连接,并关闭删除 * Input: * * Output: * * Others: * *************************************************/ void *check_connect_timeout ( void * para ) {     hash_map : : iterator it_find ;      for (it_find = sock_map .begin ( ) ; it_find!=sock_map .end ( ) ; + +it_find ) {          if ( time ( ( time_t * )0 ) - (it_find - >second ) . time > 120 ) {                 //时间更改              free ( (it_find - >second ) .recvBuf ) ;             sock_map .erase (it_find ) ;              close (it_find - >first ) ;          }      } }
关注
打赏
1659628745
查看更多评论
立即登录/注册

微信扫码登录

0.0429s