写点什么

消息队列优化 (1) -- 鶸的介绍篇

用户头像
1412
关注
发布于: 2021 年 01 月 04 日

[我的 blog 原地址:消息队列优化 -- 鶸的介绍篇,写于 2020-02-16,是关于单进程内消息队列优化的一些简单想法。也在知乎发过,最近会持续努力同步更新到 InfoQ~]




由我最近在做的一些事情开始介绍吧,这里的消息队列是指框架底层使用的数据结构+算法,而不是消息队列系统。先前把我们组的 workflow 异步调度框架https://github.com/sogou/workflow扩展到了 rpchttps://github.com/sogou/srpc,于是就做起了压测,发现在简单测试场景下,性能卡在了底层框架的消息队列上,这两天也翻了一下别的资料看了,希望能找到思路,对消息队列进行优化。 


一、srpc 简介


协议为:


固定协议头 + 用户自定义的protobuf + meta信息的protobuf
复制代码


和 brpc 是类似的,即序列化反序列化只做了简单的 pb 操作。


Client:设置并发数,一批发起 request 任务,在请求结束的回调函数中发起下一个任务。


Server:接收、找到对应的处理函数、处理完请求后返回。


二、workflow 框架简介


我组做的异步调度框架,上述 rpc 用到了其中的网络通信部分,所以 client 和 server 都可以不用考虑线程池连接池而进行全异步操作。workflow 框架的网络通信简单来说就是有一组 poller 线程和 epoll 打交道,拿到的结果会放到一个结果队列里,由 handler 线程去队列里拿结果并进行业务逻辑处理,所以这里是我们的消息队列。 


由于我们通信器当前的设计,一个通信器只有一个消息队列。 


三、一些思考


我们当前的消息队列模型是最简单的那种:


一把 mutex 锁,两个 cond 条件变量;


属于 n 生产者 m 消费者抢同一把锁并唤醒对方的模型。


这个在 leetcode 的多线程题库里有一道:


https://leetcode-cn.com/problems/design-bounded-blocking-queue/


在 32 核、万兆网卡、限制 2k 个长连接的情况下,rpc 简单的 echo example 可以跑到 64w 的 QPS,gstack 和 perf top 结合看,基本都卡在消息队列的那一把大锁里,于是在想有没有什么通用一点的优化方案。


首先明确下衡量消息队列最重要的两个指标是:吞吐长尾


到具体的实现,差异的维度应该有这些:


  • 是否拆队列

  • 锁和条件变量的个数

  • 是否保序

  • 处理的及时性等


四、几个现有的优化方案


1. BlockingQueue


google 的一组消息队列的实现,其中看了 linked-blocking-queue,大致思路是:


  1. 用了两把锁、两个条件变量去分别管理 n 个生产者和 m 个消费者;

  2. 内部存放数据是用的链表(所以叫做 linked-blocking-queue),生产者消费者各操作一头的时候不用锁对方的锁;

  3. 如果队列满了/空了,需要唤醒对方的条件变量的时候,才会同时锁两把锁;


借助以上三点,基本上可以避免生产者消费者同时抢一把锁的情况。


google 的代码和文档值得看一下:


https://developer.android.com/reference/java/util/concurrent/BlockingQueue


2. CAS


CAS 的实现很多,linux 内核中有 kfifo,这个还没看,有空可以试试。


3. 拆队列 + work stealing


顾名思义,work stealing 有一篇专门的 paper 介绍: http://supertech.csail.mit.edu/papers/steal.pdf


前几天看 GO 1.14 的优化点,其中就包括了 goroutine 使用了 workstealing,有前辈说 tcmalloc 也用了。


这个机制应该是不能保证有序性的,还需要深入思考一下。


4. 我们当前的一些尝试


基于原先最基本的消息队列实现,把唤醒改到了锁外,性能大概提升了 30%:


void BasicQueue::put(Node *node){    pthread_mutex_lock(&this->mutex);
while (!this->stopping && this->res_cnt > this->res_max) { this->put_wait_cnt++; pthread_cond_wait(&this->put_cond, &this->mutex); this->put_wait_cnd--; } list_add_tail(&node->list, &this->res_list); this->res_cnt++;
pthread_mutex_unlock(&this->mutex);
pthread_cond_signal(&this->get_cond); // signal outside the mutex}
复制代码


此外也对消费者进行了优化,大概方案是:


  1. 消费者有消费者的锁和队列,默认先从消费者的队列里拿;

  2. 拿不到的情况下,去生产者的队列里进行链表交换,把整个生产者队列挪到消费者队列;

  3. 只有在 2 的情况下才需要消费者和生产者抢大锁,其他情况消费者不和生产者抢锁;


这里就不贴代码了,具体可以参考 workflow 内部的队列实现:https://github.com/sogou/workflow


五、小结


任何额外的操作都可能对性能有损耗,但是只要能够提升吞吐,并且不加重长尾,那就是值得的,如何能够做一个通用的性能最好的队列是我们框架开发者所要做的事情。目前我写了几个 demo(用的是同样的 list node 存储数据): 


  • 简单的单锁队列

  • 单锁队列锁外唤醒版本

  • 单独针对消费者拆分一个锁和队列的版本

  • linked-blocking-queue

  • 目前准备写一个 work stealing


下一篇再把测试代码上传 github 和给出对比数据吧,我会自己写测试代码(也可以扔到 leetcod 上,但是以切过的几道多线程题来看,它的 case 都太过简单,测不出什么问题)。目前是我能看到的一些优化方案,也希望各位大神指教一下解决办法~


发布于: 2021 年 01 月 04 日阅读数: 40
用户头像

1412

关注

鶸鶸的架构师 2018.08.07 加入

专注于异步调度框架开发和分布式存储技术,开源框架Workflow和srpc的作者之一。

评论

发布
暂无评论
消息队列优化(1) -- 鶸的介绍篇