写点什么

使用 GenStage 构建一套稳定的持续数据处理系统

  • 2021 年 12 月 12 日
  • 本文字数:2425 字

    阅读完需:约 8 分钟

简单介绍一下 Archiver

在 RingCentral,我们有一个名叫 Archiver 的系统,它能够自动将客户的通话录音,短讯和传真讯息同步到客户连接的云盘上。

例如,一家公司将 Archiver 和他们自己的 Dropbox 账户相关联之后,当这家公司的员工拨打了录音电话,Archiver 就会自动将这些通话录音同步到客户的 Dropbox 里。

初看起来,Archiver 似乎是一个很简单的系统,但是要把它正确地实现,还是很富有挑战性的。我们的客户里有很多像航空公司这样的大型公司客户,一家公司一天能够产生数万条通话录音,这些录音文件都需要被及时同步到客户连接的云盘上,因此 Archiver 最核心的部分是就是那些后台运行着的工作进程。

我们的后台工作进程使用了调度的方式,每五分钟就会启动一次存档的工作。每次工作执行的时候,会先遍历所有的客户,找到需要同步的文件信息,然后对每个文件开启一个线程来同步文件。使用这种机制,整个系统运行良好,客户的信息都能够及时同步完成;但是 RingCentral 的客户正在高速增长,有一些问题正在慢慢暴露出来。


问题出在哪儿

就在近期,我们发现了整体的数据处理速率变慢了,通过查看日志文件,发现日志里有很多 HTTP 429 (Too many requests) 类型的错误。通过调查后,发现是最近有许多大体量的公司开始使用 Archiver 关联了它们的 Dropbox 账户,它们可能有上千名员工,每个员工每天产生几百通的电话录音。文件量变大之后,我们每五分钟执行一次同步的操作,轻易就触碰到了 Dropbox 的同步速率上限。

使用现有的机制,为了确保客户的文件可以及时被同步成功,我们必须对 Archiver 可以处理的数据总量做一次评估。影响处理速率的因素有:触发同步操作的频率、每次同步的总量、Dropbox 的速率上限等等。我们最终估算了一个比较合理的数值,确保了客户的文件可以最终被同步成功。但是因为网络,数据库 IO 的瓶颈,以及其他各种不确定性,我们急需一个更健壮的解决方案。


GenStage 初探

在我们团队寻找解决方案的时候,我想起了 José Valim 2017 年的演说 GenStage and Flow,发现这是一个符合 Archiver 的解决方案。

所以,什么是 GenStage 呢?项目的简介里面描述自己为:

GenStage is a specification for exchanging events between producers and consumers.

字面翻译过来是:「生产者和消费者之间交换事件的一套规范」,这是什么意思呢?GenStage 提供给我们的是一套用来实现「事件流」处理的工具箱,它由「阶段」和各个「阶段」之间的「事件流」组成。

让我们来模拟这样一个场景:

[A] -> [B] -> [C]
复制代码

上图里,A是一个「生产者」;B是一个「消费者」,同时也是「生产者」;C则是「消费者」。A会生成「事件」,然后传递给B消费,B将这些「事件」进行一系列的计算、变换之后,生成新的「事件」,并传递给C消费。

咋一看上面的场景,只是又一个无聊的定时任务或者一个消息队列系统,但是,GenStage 的实现和它们有本质的不同:GenStage 里的「生产者」会等待「消费者」的请求。

那么,等待请求是什么意思呢?相较于使用一个调度器来每五分钟启动一次「生产者」来产生事件,GenStage 独特的实现方式是:「生产者」不会直接去获取数据,而是会等待「消费者」的请求,当「生产者」收到「消费者」的请求之后,「生产者」才会去生成「事件」,然后把这些「事件」传递给「消费者」。

这种方式就实现了「背压」的机制,这种机制保证了当「消费者」繁忙的时候,「生产者」不会持续再往「消费者」端发送数据,造成「消费者」处理不了而崩溃,这样就保证了整个数据处理系统可以稳健地以最大的处理速率运行。


举个例子

假设构建这样一个应用:数据库中持续有客户的文件记录写入,需求是构建一个数据处理系统,从这个数据库中将这些文件同步至客户的 Dropbox。

所以,我们可以这样描绘这个系统:

Database --> [Fetcher] --> [Uploader] --> Dropbox                 |             |                 |             |             (producer)    (consumer)
复制代码

上图里,Fetcher 先从数据库里取出记录,然后 Uploader 将这些记录又同步到 Dropbox。


眼见为实

我们来看一看如何使用 GenStage 实现这样一个系统:

Fetcher

defmodule Archiver.Fetcher do  use GenStage  def start_link(args) do    GenStage.start_link(__MODULE__, args, name: __MODULE__)  end  def init(state), do: {:producer, state}  def handle_demand(demand, state) do    items = Database.get_items()    {:noreply, items, state}  endend
复制代码

Uploader:

defmodule Archiver.Uploader do  use GenStage  def start_link(args) do    GenStage.start_link(__MODULE__, args, name: __MODULE__)  end  def init(_state) do    {:consumer, :the_state_does_not_matter, subscribe_to: [Archiver.Fetcher]}  end  def handle_events(items, _from, _state) do    items    |> Enum.map(&Task.start_link(Dropbox, :upload, [&1]))    {:noreply, [], :the_state_does_not_matter}  endend
复制代码

当 Uploader 开始运行的时候,它将会订阅到 Fetcher,并且发送消息请求,这时候,Fetcher 的handle_demand/2将会被调用,然后 Fetcher 就从数据库里取出数据,传递给 Uploader 的handle_events/3,接着 Uploader 将会使用 Task 来处理这些数据。

当我们的系统处理速率超过了 Dropbox 的限速,Uploader 就会告诉 Fetcher:停一停,我无法处理更多的内容了,这时,Fetcher 就会暂停从数据库读取更多内容;等待 Uploader 消化完已有的内容,整个系统又继续执行,这样就不会导致系统过载而崩溃。

以上,只用了 34 行代码,我们就使用 GenStage 实现了一个简洁的,稳健的数据处理系统:通过背压的机制,使系统得以以最大的处理速率运行。我们无需估算整个系统的处理能力,来决定要多久执行一次同步工作,只需定义好整个数据处理流程和边界即可。


更进一步

由于 Archiver 现在依然是使用 Java 作为主要语言的,所以之后希望我们可以引入 Akka Stream 或者 RxJava 来实现更佳的数据处理系统。

发布于: 2 小时前阅读数: 8
用户头像

还未添加个人签名 2021.11.24 加入

全球云商务通信与协作解决方案领导者,连续七年荣膺Gartner UCaaS(统一通信即服务)魔力象限全球领导者。与你分享各种技术专家的文章、公开课,各种好玩有趣的活动与福利,以及最新的招聘机会。

评论

发布
暂无评论
使用 GenStage 构建一套稳定的持续数据处理系统