Server Queue 提高 QPS
发布于: 2020 年 05 月 29 日
任务场景
提供API接口,同时将API 接收到的数据,推送到Kafka/RabbitMQ。
机器数据:1CPU 2G 内存 1T 机械硬盘
QPS峰值:8000/10000
body大小:2B
编程语言:Python3
web框架:Flask
0.1 版本
使用 gevent.monkey + gevent.pywgi,单次请求异步 写入 Kafka
#!/usr/bin/env python# encoding: utf-8# 猴子补丁from gevent import monkeymonkey.patch_all()import jsonimport loggingfrom flask import Flaskfrom flask import requestfrom gevent import pywsgifrom kafka import KafkaProducerapp = Flask(__name__)# kafka 生产者 配置topic = "test"host = ["127.0.0.1:8009"]produc = KafkaProducer(hosts=host)logging.basicConfig(filename=__name__, filemode="w", format="%(asctime)s-%(name)s-%(levelname)s-%(message)s", level=logging.INFO)logger = logging.getLogger(__name__)@app.route("/api/v1/show/", methods=["GET"])def api_v1_show(): try: data = json.dumps(request.args.to_dict()) logger.info("data") # 因为 Kafka 使用的是,异步批量写入 Kafka 中,所以需要 produc.send(topic=topic, value=data) except Exception as e: logger.error("/api/v1/show/ err") pass return "succ"if __name__ == '__main__': # 使用 pywsgi.WSGIServer 异步 方式提高 性能 server = pywsgi.WSGIServer(('', 8015), app) server.serve_forever()
0.2 版本
在代码中 增加队列 进一步提高 其并发性能
使用 gevent.queue.JoinableQueue 增加一层队列,因为在内存中,同时能够快速返回,整体会有一定的性能提升
#!/usr/bin/env python# encoding: utf-8# 猴子补丁from gevent import monkeymonkey.patch_all()import jsonimport geventimport loggingfrom flask import Flaskfrom flask import requestfrom gevent import pywsgifrom kafka import KafkaProducerfrom gevent.queue import JoinableQueueapp = Flask(__name__)REQ_Q = JoinableQueue(100000) # 中间数据缓存# kafka 生产者 配置topic = "test"host = ["127.0.0.1:8009"]produc = KafkaProducer(hosts=host)logging.basicConfig(filename=__name__, filemode="w", format="%(asctime)s-%(name)s-%(levelname)s-%(message)s", level=logging.INFO)logger = logging.getLogger(__name__)def main_worker(): while True: try: item = REQ_Q.get() produc.send(item) except Exception as e: logger.error(f"main_work err {e}") print(e) finally: REQ_Q.task_done()@app.route("/api/v1/show/", methods=["GET"])def api_v1_show(): try: # 因为 Kafka 使用的是,异步批量写入 Kafka 中,所以需要 data = json.dumps(request.args.to_dict()) logger.info("data") REQ_Q.put(data) except Exception as e: logger.error("/api/v1/show/ err") pass return "succ"def init_workers(size): for i in range(size): gevent.spawn(main_worker)if __name__ == '__main__': # 异步轮训消费 init_workers(50) # 使用 pywsgi.WSGIServer 异步 方式提高 性能 server = pywsgi.WSGIServer(('', 8015), app) server.serve_forever()
总结
以为Kafka,已经是异步批量推送,所以,整体的性能还是非常不错的,但是因为有存在突升QPS,所以还是需要增加一层 Queue 进行进行缓存,但是会有 OOM 的情况发生,所以这种方式,可以简单的作为一个预备临时方案,正确方案:应该是能够自动扩容。
划线
评论
复制
发布于: 2020 年 05 月 29 日阅读数: 61
风含叶
关注
能独立搭建 1.5亿订单零失误 ERP核心后端 2018.06.24 加入
凡是过往,即为序章
评论