博客这几天由于服务器的问题打不开,在这里跟大家抱歉啦 老读者应该都知道,笔者有两个开源项目,分别是: fuload: 性能测试工具,可以用来给服务器做压力测试 bayonet: 基于两层状态机的epoll服务器框架 对于fuload的介绍,请看这里: fuload开源压力测试框架完成! 对于bayonet的介绍,请看这里: 有限状态机的C++实现(1)-epoll状态机 有限状态机的C++实现(2)-bayonet开源网络服务器框架 之前由于工作等原因,bayonet一直被搁置,最近有时间,所以就抓紧把bayonet完成了,目前功能上基本已经OK了,我简单列一下功能点:
- 接管了网络,调用方只需要关心业务逻辑
- 配置的方式,快速切换TCP-UDP
- 快速的增加加状态机的状态,业务可以无限拓展
代码编写中,也用到了很多技术,如引用计数来保证野指针不被访问(通过引用计数解决野指针的问题(C&C++)),延迟析构对象,等等。 由于详细介绍是个很庞大的工作,所以这里就直接放出一个基于bayonet写的http代理,我们来看一下,只需要多少代码:
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <map>
#include "bayonet_frame.h"
using namespace std;
#define APP_FSM_PROXY 2000
#define APP_FSM_LOGIC2 2001
/**
* @brief 获取ContentLen的数字的起始和长度
*
* @param strHttpBuf
* @param len
*
* @return
*/
size_t GetContentLenPos(const string& strHttpBuf, int& len)
{
string strContentLenKey = "Content-Length";
size_t pos;
size_t end_pos;
pos = strHttpBuf.find(strContentLenKey);
if (pos == string::npos)
{
return pos;
}
pos += strContentLenKey.size();
pos = strHttpBuf.find(":", pos);
if (pos == string::npos)
{
return pos;
}
pos += 1;
end_pos = strHttpBuf.find("\r\n", pos);
if (end_pos == string::npos)
{
return end_pos;
}
pos = strHttpBuf.find_first_not_of(" ", pos);//只能查找字符
if (pos == string::npos || pos >= end_pos)
{
return pos;
}
len = end_pos - pos;
return pos;
}
int HttpHandleInput(const char* buf, int len)
{
string strHttpBuf(buf,len);
int contentLenLen = 0;
size_t contentLenPos = GetContentLenPos(strHttpBuf,contentLenLen);
if (contentLenPos == string::npos)
{
//说明直接接收完了
return len;
}
//获取原来的content-len的值
string lenNum = strHttpBuf.substr(contentLenPos, contentLenLen);
int iContentLen = atoi(lenNum.c_str());
//接下来我们要看看当前接受的buf大小是否等于 head + content len
string spStr = "\r\n\r\n";
size_t spPos = strHttpBuf.find(spStr);
if (spPos == string::npos)
{
//没接收完,继续接收
return 0;
}
int headLen = spPos+spStr.size();
int realLen = headLen + iContentLen;
if (realLen > len)
{
return 0;
}
return realLen;
}
class CMyActor : public CAppActorBase
{
public:
CMyActor() {}
virtual ~CMyActor() {}
string m_req;
string m_rsp;
};
class CActionFirst : public IAction
{
public:
int HandleEncodeSendBuf(
CSocketActorData* pSocketActor,
CAppActorBase* pAppActor,
string & strSendBuf,
int &len)
{
CMyActor* app_actor = (CMyActor*)pAppActor;
if (app_actor == NULL)
{
return -1;
}
strSendBuf = app_actor->m_rsp;
len = strSendBuf.size();
return 0;
}
int HandleInput(
CSocketActorData* pSocketActor,
CAppActorBase* pAppActor,
const char *buf,
int len)
{
return HttpHandleInput(buf,len);
}
int HandleDecodeRecvBuf(
CSocketActorData* pSocketActor,
CAppActorBase* pAppActor,
const char *buf,
int len)
{
CMyActor * app_actor = new CMyActor();
app_actor->AttachFrame(pSocketActor->GetFrame());
app_actor->AttachCommu(pSocketActor);
app_actor->m_req = string(buf,len);
//转化状态操作一定要放在最后一步
app_actor->ChangeState(APP_FSM_PROXY);
return 0;
}
};
class CActionGetData: public IAction
{
public:
// 为发送打包
int HandleEncodeSendBuf(
CSocketActorData* pSocketActor,
CAppActorBase* pAppActor,
string & strSendBuf,
int &len)
{
CMyActor* app_actor = (CMyActor*)pAppActor;
if (app_actor == NULL)
{
return -1;
}
strSendBuf=app_actor->m_req;
len = strSendBuf.size();
return 0;
}
// 回应包完整性检查
int HandleInput(
CSocketActorData* pSocketActor,
CAppActorBase* pAppActor,
const char *buf,
int len)
{
return HttpHandleInput(buf,len);
}
// 回应包解析
int HandleDecodeRecvBuf(
CSocketActorData* pSocketActor,
CAppActorBase* pAppActor,
const char *buf,
int len)
{
CMyActor* app_actor = (CMyActor*)pAppActor;
//因为很有可能,appactor已经由于commu超时的原因被析构掉了
if (app_actor == NULL)
{
return -1;
}
app_actor->m_rsp = string(buf,len);
return 0;
}
};
class CAppFsmProxy : public CAppFsmBase
{
public:
virtual ~CAppFsmProxy () {}
virtual int HandleEntry(CActionInfoSet *pActionInfoSet, CAppActorBase* pAppActor)
{
static CActionGetData actionGetData;
StActionInfoParam param;
param.id = 1;
param.ip = "10.6.207.189";
param.port = 80;
param.protoType = PROTO_TYPE_TCP;
param.pAction = &actionGetData;
param.actionType = ACTIONTYPE_SENDRECV;
param.timeout_ms = 1000;
CActionInfo * pActionInfo = new CActionInfo();
pActionInfo->Init(param);
pActionInfoSet->Add(pActionInfo);
return 0;
}
virtual int HandleProcess(CActionInfoSet *pActionInfoSet, CAppActorBase* pAppActor)
{
set<CActionInfo*> &setAction = pActionInfoSet->GetActionSet();
for(set<CActionInfo*>::iterator it = setAction.begin(); it != setAction.end(); ++it)
{
trace_log("id:%d,error no:%d,timecost:%u ms",(*it)->GetID(),(*it)->GetErrno(),(*it)->GetTimeCost());
}
return APP_FSM_RSP;//代表要回复客户端啦
}
virtual int HandleExit(CActionInfoSet *pActionInfoSet, CAppActorBase* pAppActor)
{
return 0;
}
};
int main(int argc, const char *argv[])
{
CBayonetFrame srv;
StFrameParam param;
param.infoDir="bayonet";
param.ip="0.0.0.0";
param.port = 20001;
param.protoType = PROTO_TYPE_TCP;
param.pAction = new CActionFirst();
param.gcMaxCount = 10;
int ret = srv.Init(param);
if (ret != 0)
{
return -1;
}
srv.RegFsm(APP_FSM_PROXY,new CAppFsmProxy());
srv.Process();
return 0;
}
所有发往服务器的http请求,都会被转发到 10.6.207.189 的 80 端口,然后返回给调用端。 这就是所有需要调用端编写的代码了,如果去掉HTTP协议解析的那两个函数,并且去掉空行,一共才不到160行代码。而且你可以随意的再网上加逻辑状态,比如想要在拿到客户端请求之后,再去login服务器验证一下登录态,那只要在注册一个新的状态即可。
如果想看更多的例子,可以去svn上看如下路径的代码: http://code.google.com/p/bayonet/source/browse/#svn%2Ftrunk%2Fsrc%2Fsvr http://code.google.com/p/bayonet/source/browse/#svn%2Ftrunk%2Fsrc%2Fsvr2 http://code.google.com/p/bayonet/source/browse/#svn%2Ftrunk%2Fsrc%2Fhttp
当然,现在也还有很多问题,比如性能还没有做优化,各位如果有兴趣的话,欢迎和我一起完成剩下的性能调优工作,我相信这个过程一定比编码阶段有聊的多~
wf168 on #
学习了
Reply
moper on #
说实话,感觉有点高深,呵呵。很厉害的呢,加油~
Reply
liu1061 on #
不得啊,你在这方面走的好远啊! 我看你在开源这方面了得,看到你常用nginx, 我现在在看这个源码,看的也是一头晕, 可否给指点一下啊! 谢谢,我可是你的Blog的常客啊!呵呵!
Reply
Dante on #
过奖哈,只是想多练练手,也沉淀一下。
文中的链接有详细的介绍了bayonet的设计思路,可以看一下哈
其实代码本身不是很复杂,比起ngx的复杂度差得远。。。
Reply
vivi on #
支持一下 , 感觉不错。
Reply
bopy on #
楼主,直接编译不过,查了下是#include在高版本的linux里面已经没有了,可以修改为gcc提供的__sync_*系列的built-in函数。 最简单的办法是改为#include 但是要安装libasound2-dev. 支持楼主!
Reply
bopy on #
晕....尖括号不能显示 第一处为 asm/atomic.h 第二处是alsa/iatomic.h
Reply