写点什么

消息队列优化 (2) -- 几种基本实现

用户头像
1412
关注
发布于: 2021 年 01 月 04 日

[我的 blog 原地址:消息队列优化 -- 几种基本实现,写于 2020-02-29,是关消息队列优化的集中基本实现对比,是本人在从事架构工作的一些简单总结。也在知乎发过,最近会持续努力同步更新到 InfoQ~]




听说今天不发 blog 就要等四年了,于是占个坑。


1. 基本实现与对比


这两周写了一部分很简单的实现,陆续上传到:


https://github.com/holmes1412/queue


随着进一步的了解,现在可以分别列一下我们这次关注的几种实现的对比:



另外说明:kfifo 和 work_stealing_queue 是无阻塞的, 并且因为 kfifo 和 work_stealing_queue 的基本操作都是满足单生产者单消费者,所以这里对比的时候还是按照用他们去实现多生产者多消费者来跟其他的实现相比。


其中前面三种实现(外加最笨的单锁队列锁内唤醒版本)也用来切了这道题:


https://leetcode-cn.com/problems/design-bounded-blocking-queue/


从最早到最近分别是这几版队列,不过 leetcode 的多线程题的耗时是非常说明不了问题的,所以自己写了个 demo,但是这个测试场景怎么才最能说明问题是比较需要想想的,而且由于 workstealing 要扩展到多生产者多消费者还需要封装消费者线程池,所以剩下的工作都打算放到下一篇说了。 


2. CAS


这里选取 linux 内核的 kfifo.h 来了解下,内核代码写得非常漂亮。kfifo 是一个使用循环数组来实现的 first-in-first-out 队列的,在单生产者单消费者的情况下,可以做到彻底无锁。如果多生产者多消费者,可以使用带 spinlock 的接口。先看下单生产者单消费者无锁的原理:


struct __kfifo {    unsigned int    in;    unsigned int    out;    unsigned int    mask;    unsigned int    esize;    void           *data;};
// 基本的enqueue接口extern unsigned int __kfifo_in(struct __kfifo *fifo, const void *buf, unsigned int len);
// 基本的dequeue接口extern unsigned int __kfifo_out(struct __kfifo *fifo, void *buf, unsigned int len);
复制代码


几个说明:


  1. 队列在初始化时会记录下 mask capacity - 1,这是用于循环数组寻址把 pos % capcity 转成 pos & (capacity - 1)

  2. 为了循环队列能够快速寻址,pos 使用了上述转换,因此 capacity 必须是 2 的幂

  3. in 是入队列的位置,out 是出队列的位置,两个值会一直加到 unsigned int 溢出回到 0 都是正确的;

  4. out()接口可以看到带了个 len 参数,这是一批我要放的元素的个数,内部实现是先算一下当前是不是够放、应该往哪放,然后先算出往数组后半部分可以放的个数,然后要是没放完就从数组[0]位置继续放;get()也是类似的道理;

  5. 必须先操作完再更新 in 或 out 的值;

  6. 使用了 smp_wmb()做内存屏障,保证对方正确观察到我的执行顺序。


关于带 spinlock 的接口,因为自旋锁是 busy-waiting,比较适合锁上之后很快释放的场景,这样避免互斥锁切线程的性能损耗,适合争抢的人数小于等于 cpu 数的场景,否则会导致 cpu cache 频繁失效性能会急剧下降。所以在实际使用带 spinlock 的模块时,线程数怎么根据使用情况和模块本身的特点来配就很重要了。


而我们作为一个通用的通信队列,生产者/消费者线程数量肯定远远大于 cpu 数(因为通信场景除了对 epoll 进行操作以外,还会做一些 read/write,甚至是序列化反序列化的事情,而我们作为框架提供者,不能保证任何一个服务哪部分资源使用会更多),如果大家都在比较闲,就都忙等 spinlock,这显然非常不合理。


因此我个人认为,CAS 这样看来是不太适合一个通用的队列实现的,但是这两天又看到许多关于 lock-free 的资料,还得深入了解下。如果大家有不一样的场景也欢迎交流~


3. work stealing


前面几种具体实现可以说都属于 work sharing(所有线程共享一个队列),而这里是线程不够消费了再去 steal。所以这里上升到一个更大的概念的话,里边具体的优化点就有很多,比如:


  • 怎样把任务分配给每个线程

  • 怎么 steal

  • 另外,work stealing 更多的是用于线程调度,所以业内有些优化是针对于减少提交时的线程切换做的,这种优化我们做队列时用不上。


想了想发现不太好写,于是看了下 brpc 内部的源码,它就满足了谢爷经常说的机制要和策略分离的架构设计理念。它实现的机制是:先从一个单生产者单消费者队列开始封装,如果使用者想用到多生产者多消费者场景,可以对每个消费者做一个队列,然后调用别人的队列的 steal( )接口,使用者去实现 steal 具体策略。最基本的实现竟然和 kfifo.h 有非常相似的地方:


https://github.com/apache/incubator-brpc/blob/master/src/bthread/work_stealing_queue.h


  1. 非阻塞

  2. 队列 capacity 都会初始化成 2^n,都是为了队列内循环数组寻址方便;

  3. 都是用了内存屏障去保证自己修改的 pos 能被看到;


steal( )接口值得看看,我按自己的理解加了点注释:


// Steal one item from the queue.    // Returns true on stolen.    // May run in parallel with push() pop() or another steal().    bool steal(T* val) {        // 1. 与kfifo一样用内存屏障保存环形数组的数据起始位置        size_t t = _top.load(butil::memory_order_acquire);        size_t b = _bottom.load(butil::memory_order_acquire);        if (t >= b) {            // Permit false negative for performance considerations.            // 2. 如果这步判断期间有了数据,其实是非空也返回了false,所以是false negative            return false;        }        do {            // 3. 保证大家的内存里拿到的bottom位置都是一致的            butil::atomic_thread_fence(butil::memory_order_seq_cst);            b = _bottom.load(butil::memory_order_acquire);            if (t >= b) {                return false;            }            // 4. 上述说过的kfifo也是同样的寻址做法            *val = _buffer[t & (_capacity - 1)];            // 5. 其他的dequeue或者steal也会竞争这个元素,如果竞争失败,继续while尝试下一次        } while (!_top.compare_exchange_strong(t, t + 1,                                               butil::memory_order_seq_cst,                                               butil::memory_order_relaxed));        return true;    }
复制代码


扩展到多生产者多消费者的场景,workstealing 的代价应该就比较明显了:


  • 长尾肯定比 work sharing 的任何做法都明显;

  • 需要的额外操作更多了,如果 steal 的策略是查看各队列个数,那么每个队列都要额外维护一个原子变量;如果想要针对各个线程的当前吞吐进行任务分配,更要持续检查每个队列的负载情况等。

  • 全局的顺序性没法保证(所以不能用来切 leetcode 1188 了,它要求 dequeue 的顺序必须严格按照 enqueue 来)。很多使用本身不需要有序,但是如果整体顺序做得比较好,那么外部的并行提交就能比较快的被同时处理完成,是个很友善的功能。


4. 其他


size


leetcode 这道题要求实现 size( )接口,然而很多队列实现 size 是不太确定的,work_stealing_queue 的 size 接口叫做 volitale_size(),也是因为这类队列的 size 本身就是不准确的。


平均等待个数


队列中平均等待个数应该是一个很重要的提示,不同的使用场景导致内部等着的节点数量不同,所能用的优化方式就不一样。


超时


队列的接口是否支持超时也是很重要的设计,内部实现越简单,超时就越好做。


内存屏障的使用


各种内存屏障的用法不一,消耗也是不一样的,完全值得以后单独写一篇来学习下。


最后


为了平衡吞吐和长尾,不可能对所有场景有完美的解决方案,但是不同场景的几种优化(比如 linux 内核的 kfifo, google 的 Linked_Blocking_Queue,还有我们当前的队列交换)其实都已经做得非常漂亮了,经典的代码还是值得学习和动手写一下的。


发布于: 2021 年 01 月 04 日阅读数: 30
用户头像

1412

关注

鶸鶸的架构师 2018.08.07 加入

专注于异步调度框架开发和分布式存储技术,开源框架Workflow和srpc的作者之一。

评论

发布
暂无评论
消息队列优化(2) -- 几种基本实现