这几天的心情非常好,主要原因是我们把服务器端的架构升级到了 2.0,这样最大的一个好处就是:
Server重启完全不会影响外网服务
所以,也是想趁此机会,服务器端整个发展的历程,跟大家分享一下,干货比较多,框架代码也会全部开源:)
一. 农业时代
创业最重要的就是一个“快”字,所以最开始的时候,所有的架构都以快速出模型为前提。
而常看我博客的朋友应该知道我对python情有独钟,所以自然的,python成为了我开发服务端框架的语言。
python自带的多线程tcp服务器框架非常简单:ThreadingTCPServer,即每个链接一个线程的模式:
import SocketServer class RequestHandler(SocketServer.BaseRequestHandler): def handle(self): f = self.request.makefile('r') while True: message = f.readline() if not message: print 'client closed' break print "message, len: %s, content: %r" % (len(message), message) self.request.send('ok\n') class MyServer(SocketServer.ThreadingTCPServer): request_queue_size = 256 daemon_threads = True allow_reuse_address = True server = MyServer(('127.0.0.1', 7777), RequestHandler) server.serve_forever()
但是我又觉得多线程加锁使用实在太过麻烦,所以就引入了gevent:
import gevent from gevent.server import StreamServer class RequestHandler(object): closed = False def __init__(self, sock, address): self.sock = sock self.address = address self.f = self.sock.makefile('r') self.handle() def handle(self): while not self.closed: t = gevent.spawn(self.read_message) t.join() def read_message(self): message = self.f.readline() if not message: self.closed = True print 'client closed' return print "message, len: %s, content: %r" % (len(message), message) self.sock.send('ok\n') server = StreamServer(('127.0.0.1', 7777), RequestHandler) server.serve_forever()
而又因为之前在做个人开发者的时候,对flask的装饰器设计甚为喜欢,所以就参考flask设计了我自己的tcp server:
使用方法也是非常简单(服务器端):
import logging from haven import GHaven, THaven, logger from netkit.box import Box app = GHaven(Box) @app.before_request def before_request(request): logger.error('before_request') @app.route(1) def index(request): request.write(dict(ret=100)) app.run('127.0.0.1', 7777, workers=2)
客户端:
from netkit.contrib.tcp_client import TcpClient from netkit.box import Box import time client = TcpClient(Box, '127.0.0.1', 7777, timeout=5) client.connect() box = Box() box.cmd = 101 box.body = '我爱你' client.write(box) t1 = time.time() while True: # 阻塞 box = client.read() print 'time past: ', time.time() - t1 print box if not box: print 'server closed' break
这套架构在开发服务器端原型的时候非常有效,因为开发效率极高。
而我们开发的是棋牌游戏,当时为了图方便,几乎所有的数据都放在了进程内存里,所以操作起来也非常方便。
所以在整个研发过程中,服务器端的开发速度一直是客户端开发速度的数倍。
二. 工业时代
然而很快,我发现了现有框架的一些问题,而这些问题都极其致命。
1. 所有逻辑揉在一个进程中,性能太低,2000人同时打牌就会导致卡顿
2. 多线程的模型在大量用户在线时,性能极差
3. 逻辑server和存储server揉在一起,导致维护十分困难。重启逻辑服务器会影响业务,无法接受
因为如上的原因,我一直在考虑一套新的业务模型,除了要解决上面的问题之外,还要有如下的特性:
1. 尽量保留python开发业务逻辑,因为与c++相比,开发效率极高。
2. 可伸缩,分布式
3. 尽量少改动现有逻辑代码
4. 尽量让业务开发理解简单
最终,我实现了这套server框架,并将其开源在这里:
maple的实现,受到了很多想有框架的启发,其中包括zmq,half-async half-sync,以及当时淘宝的一个业务模型分享。
我记得印象比较清楚的是,当时主讲是这么说的:
我们怎么判断该不该给某个server发送消息呢?
根据成功率?根据响应时间?
no,我们把push换成pull,让worker完成处理后,自己过来要数据。
要,我才给,不要我就不给
maple的整个模型即是如此:
- gateway
- worker
- trigger
其中,gateway是用c++、epoll实现的一个高性能转发服务器,收到的客户端消息都会转发给对应的worker。
worker,即工作进程,他可以随时attach到某个gateway上来处理数据,也可以随时detach。并且worker使用python来实现的,兼顾了开发效率和运行效率。
trigger,触发器,即他可以触发事件来发给gateway,gateway会根据事件的不同,发给客户端或者worer。
详细的设计思路,可以参看maple的readme,里面有详细的设计思路。
一个简单的worker代码如下:
import logging LOG_FORMAT = '\n'.join(( '/' + '-' * 80, '[%(levelname)s][%(asctime)s][%(process)d:%(thread)d][%(filename)s:%(lineno)d %(funcName)s]:', '%(message)s', '-' * 80 + '/', )) logger = logging.getLogger('maple') handler = logging.StreamHandler() handler.setFormatter(logging.Formatter(LOG_FORMAT)) logger.addHandler(handler) logger.setLevel(logging.DEBUG) from maple import Worker from netkit.box import Box app = Worker(Box) @app.close_client def close_client(request): logger.error('close_client: %r', request) @app.route(1) def test(request): request.write_to_client(dict( ret=0, body="test" )) app.run("192.168.1.67", 28000, workers=2, debug=True)
一个简单trigger代码如下:
from maple import Trigger import time from netkit.box import Box import logging logger = logging.getLogger('maple') logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.DEBUG) def main(): trigger = Trigger(Box, '192.168.1.67', 28000) # trigger = Trigger(Box, '115.28.224.64', 28000) for it in xrange(0, 99999): time.sleep(1) print trigger.write_to_worker(dict( cmd=3, ret=100, body='from trigger: %s' % it )) # print trigger.close_users([-1,3]) # print trigger.write_to_users([ # ((1,2,3), dict(cmd=1, body='direct event from trigger: %s' % it)) # ]) main()
很简单,对吧?
值得一提的是,为了方便worker的随时重启而不会影响外网服务,worker内部实现了对TERM和HUP信号的特殊处理。分别代表安全停止所有进程和安全重新拉起workers。
最后,上一张gateway运行时的统计图,命令如下:
./tool_stat -f stat_file
github链接:
billow on #
这个tool跟spp好像 <img src="http://img.t.sinajs.cn/t35/style/images/common/face/ext/normal/0b/tootha_org.gif" />
Reply
Dante on #
哈哈,有点像,也不太一样,包括push 和pull、以及分布式都是结合了别的框架的思想。
Reply
Eric on #
你的maple怎么不在github开源了啊,我想重新学习一下
Reply
王子健特洛夫斯基 on #
学习了!我最近是对gevent感兴趣,学习的时候搜到这篇文章,非常好的技术思考!
Reply