epoll是linux下高并发服务器的完美方案,因为是基于事件触发的,所以比select快的不只是一个数量级。
单线程epoll,触发量可达到15000,但是加上业务后,因为大多数业务都与数据库打交道,所以就会存在阻塞的情况,这个时候就必须用多线程来提速。
epoll在线程池内,测试结果2000个/s
增加了网络断线后的无效socket检测。
测试工具:stressmark
因为加了适用与ab的代码,所以也可以适用ab进行压力测试。
char buf[1000] = {0};
sprintf(buf,"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s","Hello world!\n");
send(socketfd,buf, strlen(buf),0);
#include #include #include #include #include #include #include #include #include #include #include #include #include //stl head #include //包含hash_map 的头文件 //#include
//stl的map
using
namespace
std
;
//std 命名空间
using
namespace __gnu_cxx
;
//而hash_map是在__gnu_cxx的命名空间里的
int init_thread_pool
(
int threadNum
)
;
void
*epoll_loop
(
void
* para
)
;
void
*check_connect_timeout
(
void
* para
)
;
struct sockStruct
{
time_t
time
;
unsigned
int
* recvBuf
;
}
;
//hash-map
//hash_map sock_map;
hash_map
sock_map
;
#
define MAXRECVBUF 4096
#
define MAXBUF MAXRECVBUF
+10
int fd_Setnonblocking
(
int fd
)
{
int op
;
op
=fcntl
(fd
,F_GETFL
,0
)
;
fcntl
(fd
,F_SETFL
,op
|O_NONBLOCK
)
;
return op
;
}
void on_sigint
(
int
signal
)
{
exit
(0
)
;
}
/* handle_message - 处理每个 socket 上的消息收发 */
int handle_message
(
int new_fd
)
{
char buf
[MAXBUF
+ 1
]
;
char sendbuf
[MAXBUF
+1
]
;
int len
;
/* 开始处理每个新连接上的数据收发 */
bzero
(buf
, MAXBUF
+ 1
)
;
/* 接收客户端的消息 */
//len = recv(new_fd, buf, MAXBUF, 0);
int nRecvBuf
= MAXRECVBUF
;
//设置为32K
setsockopt
(new_fd
, SOL_SOCKET
, SO_RCVBUF
,
(
const
char
*
)
&nRecvBuf
,
sizeof
(
int
)
)
;
len
=
recv
(new_fd
,
&buf
, MAXBUF
,0
)
;
//--------------------------------------------------------------------------------------------
//这块为了使用ab测试
char bufSend
[1000
]
=
{0
}
;
sprintf
(bufSend
,
"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s"
,
"Hello world!\n"
)
;
send
(new_fd
,bufSend
,
strlen
(buf
)
,0
)
;
//--------------------------------------------------------------------------------------------
if
(len
> 0
)
{
//printf ("%d接收消息成功:'%s',共%d个字节的数据\n", new_fd, buf, len);
//hash-map
hash_map
:
:
iterator it_find
;
it_find
= sock_map
.
find
(new_fd
)
;
if
(it_find
=
= sock_map
.end
(
)
)
{
//新的网络连接,申请新的接收缓冲区,并放入map中
//printf("new socket %d\n", new_fd);
sockStruct newSockStruct
;
newSockStruct
.
time
=
time
(
(
time_t
*
)0
)
;
newSockStruct
.recvBuf
=
(
unsigned
int
*
)
malloc
(1000
)
;
memset
(newSockStruct
.recvBuf
, 0
, 1000
)
;
strcat
(
(
char
*
)newSockStruct
.recvBuf
, buf
)
;
sock_map
.insert
(
pair
(new_fd
, newSockStruct
)
)
;
}
else
{
//网络连接已经存在,找到对应的数据缓冲区,将接收到的数据拼接到数据缓冲区中
//printf("socket %d exist!\n", it_find->first);
(it_find
-
>second
)
.
time
=
time
(
(
time_t
*
)0
)
;
//时间更改
char
* bufSockMap
=
(
char
*
)
(it_find
-
>second
)
.recvBuf
;
//数据存储
strcat
(bufSockMap
, buf
)
;
//printf("bufSockMap:%s\n", bufSockMap);
}
}
else
{
if
(len
second
)
.recvBuf
)
;
sock_map
.erase
(it_find
)
;
close
(new_fd
)
;
return len
;
}
int listenfd
;
int sock_op
=1
;
struct
sockaddr_in address
;
struct epoll_event event
;
struct epoll_event events
[1024
]
;
int epfd
;
int n
;
int i
;
char buf
[512
]
;
int off
;
int result
;
char
*p
;
int main
(
int argc
,
char
* argv
[
]
)
{
init_thread_pool
(1
)
;
signal
(SIGPIPE
,
SIG_IGN
)
;
signal
(SIGCHLD
,
SIG_IGN
)
;
signal
(
SIGINT
,
&on_sigint
)
;
listenfd
=
socket
(
AF_INET
,
SOCK_STREAM
,0
)
;
setsockopt
(listenfd
,SOL_SOCKET
,SO_REUSEADDR
,
&sock_op
,
sizeof
(sock_op
)
)
;
memset
(
&address
,0
,
sizeof
(address
)
)
;
address
.sin_addr
.s_addr
=
htonl
(
INADDR_ANY
)
;
address
.sin_port
=
htons
(8006
)
;
bind
(listenfd
,
(
struct
sockaddr
*
)
&address
,
sizeof
(address
)
)
;
listen
(listenfd
,1024
)
;
fd_Setnonblocking
(listenfd
)
;
epfd
=epoll_create
(65535
)
;
memset
(
&event
,0
,
sizeof
(event
)
)
;
event
.data
.fd
=listenfd
;
event
.events
=EPOLLIN
|EPOLLET
;
epoll_ctl
(epfd
,EPOLL_CTL_ADD
,listenfd
,
&event
)
;
while
(1
)
{
sleep
(1000
)
;
}
return 0
;
}
/************************************************* * Function: * init_thread_pool * Description: * 初始化线程 * Input: * threadNum:用于处理epoll的线程数 * Output: * * Others: * 此函数为静态static函数, *************************************************/
int init_thread_pool
(
int threadNum
)
{
int i
,ret
;
pthread_t threadId
;
//初始化epoll线程池
for
( i
= 0
; i
0
)
{
for
(i
=0
;i
0
)
{
fd_Setnonblocking
(event
.data
.fd
)
;
event
.events
=EPOLLIN
|EPOLLET
;
epoll_ctl
(epfd
,EPOLL_CTL_ADD
,event
.data
.fd
,
&event
)
;
}
else
{
if
(
errno
=
=EAGAIN
)
break
;
}
}
}
else
{
if
(events
[i
]
.events
&EPOLLIN
)
{
//handle_message(events[i].data.fd);
char recvBuf
[1024
]
=
{0
}
;
int ret
= 999
;
int rs
= 1
;
while
(rs
)
{
ret
=
recv
(events
[n
]
.data
.fd
,recvBuf
,1024
,0
)
;
// 接受客户端消息
if
(ret
0
)
{
count111
+
+
;
struct
tm
*today
;
time_t ltime
;
time
(
&nowtime
)
;
if
(nowtime
!
= oldtime
)
{
printf
(
"%d\n"
, count111
)
;
oldtime
= nowtime
;
count111
= 0
;
}
char buf
[1000
]
=
{0
}
;
sprintf
(buf
,
"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s"
,
"Hello world!\n"
)
;
send
(events
[i
]
.data
.fd
,buf
,
strlen
(buf
)
,0
)
;
// CGelsServer Gelsserver;
// Gelsserver.handle_message(events[i].data.fd);
}
epoll_ctl
(epfd
, EPOLL_CTL_DEL
, events
[i
]
.data
.fd
,
&event
)
;
close
(events
[i
]
.data
.fd
)
;
}
else
if
(events
[i
]
.events
&EPOLLOUT
)
{
sprintf
(buf
,
"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s"
,
"Hello world!\n"
)
;
send
(events
[i
]
.data
.fd
,buf
,
strlen
(buf
)
,0
)
;
/* if(p!=NULL) { free(p); p=NULL; } */
close
(events
[i
]
.data
.fd
)
;
}
else
{
close
(events
[i
]
.data
.fd
)
;
}
}
}
}
}
}
/************************************************* * Function: * check_connect_timeout * Description: * 检测长时间没反应的网络连接,并关闭删除 * Input: * * Output: * * Others: * *************************************************/
void
*check_connect_timeout
(
void
* para
)
{
hash_map
:
:
iterator it_find
;
for
(it_find
= sock_map
.begin
(
)
; it_find!=sock_map
.end
(
)
;
+
+it_find
)
{
if
(
time
(
(
time_t
*
)0
)
-
(it_find
-
>second
)
.
time
> 120
)
{
//时间更改
free
(
(it_find
-
>second
)
.recvBuf
)
;
sock_map
.erase
(it_find
)
;
close
(it_find
-
>first
)
;
}
}
}