写点什么

Server Queue 提高 QPS

用户头像
风含叶
关注
发布于: 2020 年 05 月 29 日
Server Queue 提高 QPS

任务场景

提供API接口,同时将API 接收到的数据,推送到Kafka/RabbitMQ。

机器数据:1CPU 2G 内存 1T 机械硬盘

QPS峰值:8000/10000

body大小:2B

编程语言:Python3

web框架:Flask



0.1 版本

使用 gevent.monkey + gevent.pywgi,单次请求异步 写入 Kafka

#!/usr/bin/env python
# encoding: utf-8
# 猴子补丁
from gevent import monkey
monkey.patch_all()
import json
import logging
from flask import Flask
from flask import request
from gevent import pywsgi
from kafka import KafkaProducer
app = Flask(__name__)
# kafka 生产者 配置
topic = "test"
host = ["127.0.0.1:8009"]
produc = KafkaProducer(hosts=host)
logging.basicConfig(filename=__name__, filemode="w", format="%(asctime)s-%(name)s-%(levelname)s-%(message)s",
level=logging.INFO)
logger = logging.getLogger(__name__)
@app.route("/api/v1/show/", methods=["GET"])
def api_v1_show():
try:
data = json.dumps(request.args.to_dict())
logger.info("data")
# 因为 Kafka 使用的是,异步批量写入 Kafka 中,所以需要
produc.send(topic=topic, value=data)
except Exception as e:
logger.error("/api/v1/show/ err")
pass
return "succ"
if __name__ == '__main__':
# 使用 pywsgi.WSGIServer 异步 方式提高 性能
server = pywsgi.WSGIServer(('', 8015), app)
server.serve_forever()



0.2 版本

在代码中 增加队列 进一步提高 其并发性能

使用 gevent.queue.JoinableQueue 增加一层队列,因为在内存中,同时能够快速返回,整体会有一定的性能提升

#!/usr/bin/env python
# encoding: utf-8
# 猴子补丁
from gevent import monkey
monkey.patch_all()
import json
import gevent
import logging
from flask import Flask
from flask import request
from gevent import pywsgi
from kafka import KafkaProducer
from gevent.queue import JoinableQueue
app = Flask(__name__)
REQ_Q = JoinableQueue(100000) # 中间数据缓存
# kafka 生产者 配置
topic = "test"
host = ["127.0.0.1:8009"]
produc = KafkaProducer(hosts=host)
logging.basicConfig(filename=__name__, filemode="w", format="%(asctime)s-%(name)s-%(levelname)s-%(message)s",
level=logging.INFO)
logger = logging.getLogger(__name__)
def main_worker():
while True:
try:
item = REQ_Q.get()
produc.send(item)
except Exception as e:
logger.error(f"main_work err {e}")
print(e)
finally:
REQ_Q.task_done()
@app.route("/api/v1/show/", methods=["GET"])
def api_v1_show():
try:
# 因为 Kafka 使用的是,异步批量写入 Kafka 中,所以需要
data = json.dumps(request.args.to_dict())
logger.info("data")
REQ_Q.put(data)
except Exception as e:
logger.error("/api/v1/show/ err")
pass
return "succ"
def init_workers(size):
for i in range(size):
gevent.spawn(main_worker)
if __name__ == '__main__':
# 异步轮训消费
init_workers(50)
# 使用 pywsgi.WSGIServer 异步 方式提高 性能
server = pywsgi.WSGIServer(('', 8015), app)
server.serve_forever()

总结

以为Kafka,已经是异步批量推送,所以,整体的性能还是非常不错的,但是因为有存在突升QPS,所以还是需要增加一层 Queue 进行进行缓存,但是会有 OOM 的情况发生,所以这种方式,可以简单的作为一个预备临时方案,正确方案:应该是能够自动扩容



用户头像

风含叶

关注

能独立搭建 1.5亿订单零失误 ERP核心后端 2018.06.24 加入

凡是过往,即为序章

评论

发布
暂无评论
Server Queue 提高 QPS