#pragma once
#include "CLibEventData.h"
#include "LibEventFunction.h"
#include "LibUserFunction.h"
class CLibEvent
{
public:
CLibEvent(void);
~CLibEvent(void);
private:
//当前服务器对象
Server m_Server;
public:
bool StartServer(int port, short workernum, unsigned int connnum, int read_timeout, int write_timeout);
void StopServer();
private:
void LoadFuns();
static void DoAccept(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data);
static void DoError(struct bufferevent *bev, short error, void *ctx);
static void CloseConn(Conn *pConn,int nFunID);
static void CloseConn(Conn *pConn);
static void DoRead(struct bufferevent *bev, void *ctx);
static DWORD WINAPI ThreadServer(LPVOID lPVOID);
static DWORD WINAPI ThreadWorkers(LPVOID lPVOID);
};
#include "StdAfx.h"
#include "LibEvent.h"
#include
#include
using namespace std;
#include
#include
#include
#pragma comment (lib,"ws2_32.lib")
#pragma comment (lib,"wsock32.lib")
#include "LibEventFunction.h"
#ifdef _DEBUG
#pragma comment (lib,"libevent.lib")
#else
#pragma comment (lib,"libevent.lib")
#endif
CLibEvent::CLibEvent(void)
{
LoadFuns();
ZeroMemory(&m_Server,sizeof(m_Server));
WSADATA WSAData;
WSAStartup(0x0201, &WSAData);
}
CLibEvent::~CLibEvent(void)
{
WSACleanup();
}
bool CLibEvent::StartServer(int port, short workernum, unsigned int connnum, int read_timeout, int write_timeout)
{
m_Server.bStart=false;
m_Server.nCurrentWorker=0;
m_Server.nPort=port;
m_Server.workernum=workernum;
m_Server.connnum=connnum;
m_Server.read_timeout=read_timeout;
m_Server.write_timeout=write_timeout;
evthread_use_windows_threads();
m_Server.pBase=event_base_new();
if (m_Server.pBase==NULL)
{
return false;
}
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(m_Server.nPort);
m_Server.pListener=evconnlistener_new_bind(m_Server.pBase,DoAccept,(void*)&m_Server,LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,-1,(struct sockaddr*)&sin,sizeof(sin));
if (m_Server.pListener==NULL)
{
return false;
}
CLibEventFunction::RegistConnectedFunc(CLibUserFunction::Connect);
CLibEventFunction::RegistDisconnectedFunc(CLibUserFunction::DisConnect);
CLibEventFunction::RegistStx(emStx);
m_Server.pWorker=new Worker[workernum];
for (int i=0;iplistConn=new Conn[m_Server.connnum+1];
m_Server.pWorker[i].pListConn->head=&m_Server.pWorker[i].pListConn->plistConn[0];
m_Server.pWorker[i].pListConn->tail=&m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum];
for (int j=0; jplistConn[j].index=j;
m_Server.pWorker[i].pListConn->plistConn[j].next=&m_Server.pWorker[i].pListConn->plistConn[j+1];
}
m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].index=m_Server.connnum;
m_Server.pWorker[i].pListConn->plistConn[m_Server.connnum].next=NULL;
//设置当前事件
Conn *p=m_Server.pWorker[i].pListConn->head;
while (p!=NULL)
{
p->bufev=bufferevent_socket_new(m_Server.pWorker[i].pWokerbase,-1, BEV_OPT_CLOSE_ON_FREE);
if (p->bufev==NULL)
{
return false;
}
bufferevent_setcb(p->bufev, DoRead, NULL, DoError, p);
bufferevent_setwatermark(p->bufev, EV_READ, 0, emMaxBuffLen);
bufferevent_enable(p->bufev, EV_READ|EV_WRITE);
struct timeval delayWriteTimeout;
delayWriteTimeout.tv_sec=m_Server.write_timeout;
delayWriteTimeout.tv_usec=0;
struct timeval delayReadTimeout;
delayReadTimeout.tv_sec=m_Server.read_timeout;
delayReadTimeout.tv_usec=0;
bufferevent_set_timeouts(p->bufev,&delayReadTimeout,&delayWriteTimeout);
p->owner=&m_Server.pWorker[i];
p=p->next;
}
}
m_Server.pWorker[i].hThread=CreateThread(NULL,0,ThreadWorkers,&m_Server.pWorker[i],0,NULL);
}
m_Server.hThread=CreateThread(NULL,0,ThreadServer,&m_Server,0,NULL);
if (m_Server.hThread==NULL)
{
return false;
}
m_Server.bStart=true;
return true;
}
void CLibEvent::StopServer()
{
if (m_Server.bStart)
{
struct timeval delay = { 2, 0 };
event_base_loopexit(m_Server.pBase, &delay);
WaitForSingleObject(m_Server.hThread,INFINITE);
if (m_Server.pWorker)
{
for (int i=0;iin_buf_len+=evbuffer_remove(input, c->in_buf,emMaxBuffLen-c->in_buf_len);
}
while(true)
{
//一个协议包的请求头还没读完,则继续循环读或者等待下一个libevent时间进行循环读
if (c->in_buf_lenin_buf;
if (h->pkglen >emMaxBuffLen-sizeof(Head)||h->pkglenstx!=CLibEventFunction::LoadStx())
{
//请求包不合法
CloseConn(c);
return;
}
//读取的数据不够
if (c->in_buf_lenpkglen)
{
return;
}
//执行协议指令
if (h->nFunID>emFunBase&&CLibEventFunction::DispatchFunction(h->nFunID,NULL))
{
switch(CLibEventFunction::DispatchFunction(h->nFunID,c))
{
case emFunReturnSend:
{
struct evbuffer * output=bufferevent_get_output(bev);
evbuffer_add(output,c->out_buf,c->out_buf_len);
}
break;
case emFunReturClose:
{
CloseConn(c);
}
return;
case emFunReturnRecv:
break;
}
} else
{
CloseConn(c);
return;
}
//处理下一个协议包,或者继续读
c->in_buf_len-=h->pkglen;
if (c->in_buf_len==0)
{
break;
}
else
{
assert(c->in_buf_len>0);
memmove(c->in_buf,c->in_buf+h->pkglen,c->in_buf_len);
}
}
}
}
void CLibEvent::CloseConn(Conn *pConn,int nFunID)
{
pConn->in_buf_len = 0;
CLibEventFunction::DispatchFunction(nFunID,pConn);
bufferevent_disable(pConn->bufev, EV_READ | EV_WRITE);
evutil_closesocket(pConn->fd);
pConn->owner->PutFreeConn(pConn);
}
void CLibEvent::CloseConn(Conn *pConn)
{
CloseConn(pConn,emFunClosed);
}
void CLibEvent::DoError(struct bufferevent *bev, short error, void *ctx)
{
Conn *c=(Conn*)ctx;
emFunID id=emFunNull;
if (error&EVBUFFER_TIMEOUT)
{
id=emFunTimeout;
} else if (error&EVBUFFER_EOF)
{
id=emFunClosed;
} else if (error&EVBUFFER_ERROR)
{
id=emFunError;
}
CloseConn(c, id);
}
void CLibEvent::DoAccept(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data)
{
//此处为监听线程的event.不做处理.
Server *pServer = (Server *)user_data;
//主线程处做任务分发.
int nCurrent=pServer->nCurrentWorker++%pServer->workernum;
//当前线程所在ID号
Worker &pWorker=pServer->pWorker[nCurrent];
//通知线程开始读取数据,用于分配哪一个线程来处理此处的event事件
Conn *pConn=pWorker.GetFreeConn();
if (pConn==NULL)
{
return;
}
pConn->fd=fd;
evutil_make_socket_nonblocking(pConn->fd);
bufferevent_setfd(pConn->bufev, pConn->fd);
//转发发送事件
CLibEventFunction::DispatchFunction(emFunConnected,pConn);
bufferevent_enable(pConn->bufev, EV_READ | EV_WRITE);
}
DWORD WINAPI CLibEvent::ThreadServer(LPVOID lPVOID)
{
Server * pServer=reinterpret_cast(lPVOID);
if (pServer==NULL)
{
return -1;
}
event_base_dispatch(pServer->pBase);
return GetCurrentThreadId();
}
DWORD WINAPI CLibEvent::ThreadWorkers(LPVOID lPVOID)
{
Worker *pWorker=reinterpret_cast(lPVOID);
if (pWorker==NULL)
{
return -1;
}
event_base_dispatch(pWorker->pWokerbase);
return GetCurrentThreadId();
}
void CLibEvent::LoadFuns()
{
CLibEventFunction::RegistFunc(emFunLogin,CLibUserFunction::Login);
CLibEventFunction::RegistFunc(emFunLogout,CLibUserFunction::Logout);
}