面向大规模队列,百万并发的多优先级消费系统设计
大规模队列的核心诉求,不仅需要「快」,还需要兼顾「公平」。
01 引言
HTTP 是一种常用的通信协议,除了常见网站访问、上传下载,HTTP 协议还经常被用在消息推送场景上。
设想你搭建了一个电商平台,有很多大型商家入驻了该电商平台并售卖各类商品,在消费者购买某个商品后,平台会通过 HTTP 协议将消费者购买商品的信息通知商家,商家则会在后台接收平台推送过来的消息。
一般情况下,所有的系统都正常工作。但突然有一天,A 商家出现了爆款产品,使得销售量迅速上升,由于 A 商家的后台服务处理能力是有限的,便会出现平台一直在给 A 商家推送商品售卖信息,而将其他商家的消息都排在后面,这便导致大量其他商家不能及时知道商品的售卖情况。
这种情况也会发生在某个大客户系统异常、响应急剧变慢,导致平台推送能力下降影响其他客户。因此,实现不同客户消息推送的隔离与控制就显得十分重要。
除了消息推送场景,类似的需求也发生在平台型的任务处理场景和资源调度场景。
在任务处理场景,很多客户会使用平台来处理任务,比如:通过通信平台发送语音通知,每个客户都有大量的语音通知业务请求需要处理。
由于资源是有限的,所以需要给每个客户配额一定的处理能力,当客户提交的请求大于配额的时候,平台会按最高配额的处理速度来处理客户的请求,并将超过配额的部分积压延后处理,这样会避免因为某些头部客户的请求量过大导致其他客户的请求长时间无法处理的情况。
在资源调度场景,假设平台有很多资源用于处理客户的请求,虽然每个资源都能处理某些类型的任务,但是资源的实时处理能力是受限的。
比如:资源只能实时处理 100QPS 的请求,这时需要建立一套机制,将对应资源能处理的任务选择出来,并按资源的实际处理能力提交给对应的资源进行处理,保证所有资源都能满负荷运行。
02 问题分析
上面三个场景看似不同,但背后其实隐藏的是同样的问题模型:
| 系统处理能力受限,或者系统能承诺处理能力受限。
| 实际的请求量可能在某个时间点远大于系统的处理能力。
| 个体与个体之间是独立且存在差异的,平台上有不同的客户,不同客户对时效的要求是不一样的,不同客户的任务规模也是不一样的。
| 超高的并发,比如十万甚至百万 QPS 的 HTTP 消息推送请求。
对于这种类型的问题,我们如何解决呢?
其实,不管是资源还是客户,都可以抽象为一个实时处理能力受限的队列。
对于消息推送场景,可以为每个客户建立一个队列,把需要推送的消息放到对应的客户队列里,并按客户最大配置流量轮流进行推送。
对于任务接收场景,每个客户都可以被当作是一个队列,只要能控制每个队列的最大消费速度,就能保证不会因为头部客户的突发流量导致其他客户被影响。
对于资源调度场景,可以为每个资源建立一个队列,然后将对应资源能处理的任务放在队列里面,再按照队列的实时处理能力,消费里面的数据。
此外,即使同个客户或者同个资源里,业务内部也是有优先级的,所以队列内部也需要支持业务的优先级。
因此,这类队列模型于普通的生产消费者模型存在显著的区别:
队列数量非常多,队列的消费速度需要满足上限,支持优先级。
如何构建这类面向百万并发、支持优先级的大规模队列的生产消费系统?
03 技术选型
提到生产消费模型,很自然会想到一些成熟的消息中间件,如 METAQ,KAFKA 等。但是经过调研发现:当队列数量的量级非常大,达到千级甚至万级的时候,这些中间件还是存在较大瓶颈的。
以 METAQ 为例,由于 METAQ 是一个线程池模式,一个 TOPIC 就有一个线程池,所以当 TOPIC 非常多的时候,机器上需要开非常多的线程,这显然是不可能的。通过分析发现,METAQ 的问题主要是实现机制的问题,所以另一个思路是:基于开源的 METAQ 源代码,对其消费端进行二次开发。
但这也会存在一系列的问题……
首先,METAQ 的代码本身非常庞大,熟悉里面的细节就需要投入非常大的成本。此外,METAQ 的设计思路与面向大规模队列的场景有着本质区别,METAQ 核心设计思路是“快”。
然而,大规模队列的核心诉求不仅需要“快”,还需要兼顾"公平",即保证所有的队列都能达到自己的性能目标。
这就导致 METAQ 里面有大量的逻辑其实并不匹配大规模队列的生产消费模型。同时,考虑到后续 METAQ 的版本迭代等的稳定性风险也是非常难以控制的。
不管是 METAQ 还是 KAFFA,在队列优先级的支持上比较弱,这些中间件在设计的时候,并非主要面向多优先级的消息。因此,支持这个特性也非常难,改造的成本也非常高。
通过综合评估,基于分布式基础队列进行自建会更稳定、可靠、可落地。通过系统调研发现阿里云的 LINDORM 和 REDIS 都提供基础的队列操作,LINDORM 提供的 STRONG CONSISTENCY(SC)级别的数据一致性能力,可以支持数据写入后 100%被立即读出。而 REDIS 主要采用的是一种异步备份的机制,所以从数据的高可靠考虑,选择 LINDORM 是更可靠的方案。
LINDORM 是一个支持多模型的 NOSQL 系统,兼容 HBASE/CASSANDRA、OPENTSDB、SOLR、SQL、HDFS 等多种开源标准接口,支持的模型包括:KV,WIDECOLUMN,TABULAR 和队列模型等,这里使用的就是它的队列模型。
虽然 LINDORM 也叫队列模型,但是它跟 METAQ 消息队列不一样,他核心的主要只有两个操作接口: 一个是 PUT,把数据放入到某个队列的队尾,成功后会返回消息对应的偏移,另一个是 GET(I),从某个偏移地址获取对应的数据,且单队列最大只支持 150QPS。
到这里便可以发现理想与现实的巨大鸿沟,我们生产消费系统的目标是要支持十万、百万并发,并且希望能自动解决消费进度管理、异常的恢复等问题,以 LINDORM 目前的状况来看都是没有的。
04 大规模队列生产消费系统总体设计
通过前文分析发现 LINDORM 只提供了插入数据及获取数据两个基础操作,单队列只支持 150QPS,而且没有消费进度管理和异常灰度机制,这些问题该如何解决?
这里将构建百万并发、支持多优先级的大规模队列生产消费系统称为 EMQ(ENORMOUSE SCALE MESSAGE QUEUE )。EMQ 系统主要分为 6 个模块:队列拆分、队列分配、队列消费、容量控制、消费进度管理、容错机制。
队列拆分
为了便于理解,将之前提到的客户对应的队列及资源对应的队列统一称之为逻辑队列,将 LINDORM 的队列称之为物理队列。
LINDORM 单队列只支持 150QPS,且任何物理队列都存在容量限制。但是,我们系统设计的目标是一百万 QPS(尽管这个一百万 QPS 是所有逻辑队列的总和)。
单个逻辑队列超过 1000QPS 在实际情况中非常常见,且从设计角度来讲,逻辑队列的 QPS 也十分难控制。因此,必须把逻辑队列拆分成一个个 150QPS 以内的物理队列。
队列分配
在队列拆分完后,系统需要消费这些物理队列,因此需要把队列合理的分配到应用集群的机器上。
每台机器上需要启动对应的客户端去消费各队列里面的数据,因为把一个支持 1000QPS 的队列拆分成了 20 个小的物理队列,所以每个队列支持 50QPS。
这里通过单队列容量 50QPS 乘以总的物理队列数等于 1000QPS 来实现逻辑队列支持 1000QPS 的目标,但是从逻辑上如果存在数据倾斜的时候,会存在总容量不满 1000PQS 的情况。
考虑到该系统主要面向的是一个海量数据场景,因此从概率上来讲,这是可以接受的。
队列消费
队列分配完后,还需要构建一个支持高性能的消费客户端。该客户端的高性能主要表现在:实现避免网络 IO 访问导致的性能下降;能快速处理本台机器上的所有队列,保证既不会轮不到,又能满负荷处理;同时,在消费完消息后能快速的执行业务系统的任务。
容量控制
当完成队列消费后,仍需要构建一个消费进度的管理模块,即管理当前队列生产的点位和已经消费的数据的点位,这样子才能清楚地知道下一个需要消费的数据以及队列的积压量。
容错机制
系统的容错机制主要包括三个方面:首先,如果某个偏移量没有数据的时候,需要能发现并跳过对应的偏移;其次,因为消费完的数据需要提交各业务层进行处理,如果业务层处理失败后我们应该有一定的异常恢复机制,如果业务层持续失败的时候我们需要有一定的兜底机制;此外,当机器因为异常宕机的时候,在原来机器上消费的队列需要平滑迁移到其他机器,从而实现无损恢复。
05 EMQ 集群模型
队列模型
如下图为 EMQ 的队列模型:
ROOT 节点下分两个节点:一是 ONLINE 节点,主要是面向生产环境,二是 SANBOX,主要面向生产前的测试,这能保证系统在更新某个功能的时候可以先进行充分的测试然后再同步到生产环境。
在 ONLINE 节点下面是一个个 TOPIC,这里的 TOPIC 就是我们之前说的逻辑队列,也就是分配给客户的队列或者为每个资源分配的队列(后文使用 TOPIC 代指逻辑队列)。每个 TOPIC 有一定的容量,也就是我们说的 QPS。
每个 TOPIC 下有若干个 GROUP,每个 GROUP 有独立的容量,其值为 TOPIC 的容量除以总的 GROUP 数,并且要求这个值需要小于 LINDORM 物理队列支持的最大 QPS。
每个 TOPIC 下面有分优先级的 QUEUE,该设计主要是为了支持优先级能力设计的。本文为了描述方便,会以高中低三个优先级为例介绍。这三个优先级的 QUEUE 是共享 GROUP 的容量,也就是说如果 GROUP 支持 50QPS,那么三个 QUEUE 的总 QPS 是 50。这里 QUEUE 才是真正对应 LINDORM 的物理队列,这也是为什么要求 GROUP 的容量需要小于 LINDROM 物理队列支持的最大 QPS。
对于资源调度场景,假设有一个资源的 QPS 是 500QPS。那么,他会对应一个 TOPIC,这个 TOPIC 下面有 10 个 GROUP,每个 GROUP 有 3 个优先级,也就是它会生产 3*10 = 30 个 LINDORM 队列。
队列分配模型
假设每个 GROUP 的 QPS 为 50,那么对于 100 万并发的系统将有约 6 万个物理队列,如何将这么大数量级的队列分配到机器上去?队列分配应该满足哪些原则?
首先,尽可能将队列平均分配到每台机器上,避免出现某个机器消费队列数据量太多产生性能问题;其次,当机器下线、宕机或置换的时候,机器上消费的队列尽可能不要发生大面积的迁移;最后,当队列新增或者删除的时候,机器上消费的队列也尽可能得不要发生大面积的迁移。
基于这些原则,设计了如下图所示的队列分配模型。
首先,引入一个 ZOOKEEPER 集群,在主节点下面建立两个节点,一个是 RUNNING 节点,用于保存机器的心跳信息,在机器上线的时候会创建一个以机器 IP 为名字的临时节点,在机器下线的时候会销毁对应节点。二是 SERVERLIST 节点,该节点保存的是所有消费的机器 IP 为名字的子节点,而在子节点里保存的是机器消费的所有队列。
现在有一个队列结集合和有一个机器列表集合,需要把队列尽可能平均的分配到机器上。一个简单的方法就是取所有的队列除以机器总数,平均分配到所有机器。这看似简单又完美的方法其实存在一些问题,当机器下线的时候,这个计算的过程就要重新来一把,可能导致大量的机器消费的队列迁移。
如果不重新计算而是在第一次取平均,即在机器下线的时候把这个机器上的队列平均分配到其他机器,机器上线的时候把其他机器上队列抽取一部分过来,这种方案在逻辑上是可行的。
但是,如果有队列新增的时候要执行队列的配置,在队列删除的时候要重新平衡机器的消费队列,这个无疑是非常复杂的。最为重要的是,这种增量变更的方式如果在其中某次分配存在问题,那么后面可能一直无法挽回。
综合考虑下,采用了一致性 HASH 的方案,考虑到一致性 HASH 的平衡性,能保证所有机器分配的队列数较为接近,同时,由于一致性 HASH 的单调性,不管是机器变更或者队列变更,不会导致大量的队列机器关系发生变化。
在引入一个中心计算任务后,当机器发生变化或消费的队列发生变化时,都会全量的重新计算每台机器消费的队列。如果机器消费的队列有新增,那么它会新增消费对应的队列,如果有减少,就会取消对应队列的消费。
06 EMQ 单机模型
经过前面的一系列设计,已经完成了队列的拆分,并且将队列分配到集群机器上。那还有最重要的一件事情,就是构建一个高效可靠消费客户端。
保证能准确无误高性能地消费队列的里面的数据,保证在队列有数据的情况下按队列配额的最大容量进行消费,以及当队列里的数据比较少的时候所有数据都快速被消费。
在原型机验证环节,设计目标按照在 8 核 16G 的机器上,单机 3000 队列的时候支持 1000QPS 并发的处理。如下图,是 EMQ 的单机模型图,主要包括分布式物理队列、远程数据拉取模块、本地缓冲处理模块、缓冲队列分发 &速度控制模块、消息任务处理模块以及消费进度管理模块。
分布式物理队列
分布式物理队使用的是 LINDORM 的队列模型,考虑到后续的扩展,通过对 LINDORM 的操作做了一层抽象,只要实现适配层的方法,便可以快速支持其他基础队列模型,比如:SWIFT,REDIS 等。
远程数据拉取模块
远程数据拉取模块主要包括 IO 任务孵化器,核心是一个线程池会周期性地孵化一些任务,将远端队列里面的数据拉取到本地,保障本地队列缓冲区里面的数据达到一定阈值。它的结束条件是本地缓冲区里的数据满足未来一段时间内的处理要求,或者所有远端的数据都已经拉取到本地缓冲区。
本地缓冲区
本地缓冲区是一系列的本地队列,与这台机器上消费的 LINDORM 上物理队列是一一对应的。也就是说:在远端这台机器有多少要消费的 LINDORM 队列,本地就有多少个对应的队列。
缓冲队列分发 &速度控制
缓冲队列分发与速度控制主要包括一个缓冲任务孵化器,它的核心职责是孵化一些队列任务以及消费本地缓冲区里面的数据,直到到达当前队列的 QPS 上限设置,或者缓冲区的数据空了。
消费进度管理
当消费完成一个新的数据之后,会更新对应通道的消费进度的点位,下次再消费的时候从新的点位开始消费,这样保证消费进度不断向前推移。同时,还会将消费进度的信息周期性的实例化到数据库,保证如果机器发生异常或者迁移的时候,能重新恢复之前的消费点位开始消费,因为这个备份是异步且有延时的,这便于所有的消息中间件一样,一个消息是可能重推的,需要业务处理的时候支持幂等操作。
这里再重点介绍一下,消费速度控制,要单机消费几千个队列,但是每台机器的线程是有限的,所以一定采用的是线程池的方案,如下图:
每个队列都有一个独立的消费计数器,每秒钟会执行若干个 LOOP,每个 LOOP 会为每个队列生成一个消费的任务,这个任务包含目标队列和消费的最大的任务数。
每次执行拉取的时候会先对当前队列的消费计数器加一,提前预占,然后去消费队列里面的数据,如果成功了,那么流程结束,如果失败了会将计数器减一,实现回滚的操作。当越到后面的时候,有些队列的当前秒需要拉取的数据已经足够了,就无需再继续拉取了。
07 优先级控制
在完成 EMQ 集群模型和单机模型的设计之后,已经能够实现面向大规模队列百万并发的生产消费系统能力,但是在很多业务场景下我们的任务都是存在一定优先级的。
比如以短信发送场景为例,短信分为通知业务、营销业务、验证码业务,一个资源如果既能处理通知业务,也能处理营销和验证码业务,在正常情况我们肯定是希望验证码的任务能优先被处理,然后再处理通知业务,最后才去处理去处理营销业务。
也就是在资源调度场景,我们为每个资源建立了一个逻辑队列,在 EMQ 里面也就是一个 TOPIC,这个队列是需要能支持优先级调度的,如果验证码的任务最后进入到队列,它里面已经堆积了大量的营销业务请求,我们也希望这个验证码的请求能优先于其他营销类型的请求被处理。
如果对应通用的队列机制是不现实的,通用的队列核心的逻辑就是先进先出。
那我们现在要实现优先级抢占,必须要在队列设计上做文章,如下图:
我们需要将一个队列拆分成 N 个队列,N 是需要支持的优先级个数。以三个优先级为例,我们会构建高,中,低三个优先级的队列,这个三个优先级队列组成一个 GROUP,共享这个 GROUP 的容量。也就是说如果这个 GROUP 的 QPS 是 50,那么在一秒钟消费高中低三个优先级队列的总 QPS 不能超过 50。
在消费队列消息的时候,会先消费高优先级的队列里面的数据,然后再消费中优先级队列里面的数据,最后才消费低优先级队列里面的数据。这样子就保证,高优先级里面的数据一定会先于中优先级里面的数据被处理。中优先级里面队列的数据也会先于低优先级里面的数据被处理。
本文重点介绍了如何快速、低成本地构建面向百万并发多优先级的大规模队列生产消费系统。在拥有基础能力以后,在上面做各种复杂的业务能力设计便十分容易。比如:文章最开始提到的 HTTP 推送场景,那么假设某个客户突然有超 10 万 QPS 的消息需要推送,系统只支持 10 万 QPS 推送能力,如果按先进先出,那么可能其他客户的消息都无法推送了。但是,如果基于 EMQ 构建生产者消费者模型,为每个客户(或客户组)建立一个队列,并且配置这个队列支持的上限推送的 QPS,消息在发送前先推送到 EMQ 队列,按配置的限额消费。那么,即使客户瞬间有很大的信息推送请求,也不会影响到其他客户的正常业务处理
版权声明: 本文为 InfoQ 作者【阿里云视频云】的原创文章。
原文链接:【http://xie.infoq.cn/article/947e35ac41cd03f9c1b1387cc】。文章转载请联系作者。
评论