写点什么

高性能发件箱模式(每天处理 20 亿条消息)

作者:俞凡
  • 2025-05-07
    上海
  • 本文字数:9450 字

    阅读完需:约 31 分钟

本文介绍了如何扩展、优化发件箱模式的处理性能,从而为分布式系统打造高性能、高吞吐量的消息传递机制。原文:Scaling the Outbox Pattern (2B+ messages per day)


在上一篇文章中,我们谈到了发件箱模式的实现。发件箱模式是分布式消息可靠传递的重要工具,但实现只是第一步。


真正的挑战是什么?对其扩容从而处理大量信息。


今天,我们将更上一层楼,从一个基本发件箱处理进程开始,将它改造为每天能处理超过 20 亿条信息的高性能引擎。


让我们深入了解一下!

开始

我们从这里开始,现在有一个发件箱处理进程(OutboxProcessor),用于轮询未处理的邮件并将其发布到队列中。我们首先可以调整的是频率批量大小


internal sealed class OutboxProcessor(NpgsqlDataSource dataSource, IPublishEndpoint publishEndpoint){    private const int BatchSize = 1000;
public async Task<int> Execute(CancellationToken cancellationToken = default) { await using var connection = await dataSource.OpenConnectionAsync(cancellationToken); await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
var messages = await connection.QueryAsync<OutboxMessage>( @""" SELECT * FROM outbox_messages WHERE processed_on_utc IS NULL ORDER BY occurred_on_utc LIMIT @BatchSize """, new { BatchSize }, transaction: transaction);
foreach (var message in messages) { try { var messageType = Messaging.Contracts.AssemblyReference.Assembly.GetType(message.Type); var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
await publishEndpoint.Publish(deserializedMessage, messageType, cancellationToken);
await connection.ExecuteAsync( @""" UPDATE outbox_messages SET processed_on_utc = @ProcessedOnUtc WHERE id = @Id """, new { ProcessedOnUtc = DateTime.UtcNow, message.Id }, transaction: transaction); } catch (Exception ex) { await connection.ExecuteAsync( @""" UPDATE outbox_messages SET processed_on_utc = @ProcessedOnUtc, error = @Error WHERE id = @Id """, new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id }, transaction: transaction); } }
await transaction.CommitAsync(cancellationToken);
return messages.Count; }}
复制代码


假设连续运行 OutboxProcessor,将批量大小增加到 1000


能处理多少消息?


运行发件箱处理程序 1 分钟,我们来算一下处理了多少消息。


基础实现在一分钟内处理了 81,000 条信息,即 1,350 MPS(每秒信息)。


还不错,不过我们看看还能改进多少。

测量每个步骤

你无法改进无法测量的东西。对吧?所以,我会用 Stopwatch 来测量总执行时间和每个步骤所需时间。


请注意,我还拆分了发布和更新步骤,这样就可以分别测量发布和更新的时间。这一点很重要,因为我想分别优化每个步骤。


下面是基础实现中每个步骤的执行时间:


  • 查询时间: ~70ms

  • 发布时间: ~320ms

  • 更新时间: ~300ms


internal sealed class OutboxProcessor(    NpgsqlDataSource dataSource,    IPublishEndpoint publishEndpoint,    ILogger<OutboxProcessor> logger){    private const int BatchSize = 1000;
public async Task<int> Execute(CancellationToken cancellationToken = default) { var totalStopwatch = Stopwatch.StartNew(); var stepStopwatch = new Stopwatch();
await using var connection = await dataSource.OpenConnectionAsync(cancellationToken); await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
stepStopwatch.Restart(); var messages = (await connection.QueryAsync<OutboxMessage>( @""" SELECT * FROM outbox_messages WHERE processed_on_utc IS NULL ORDER BY occurred_on_utc LIMIT @BatchSize """, new { BatchSize }, transaction: transaction)).AsList(); var queryTime = stepStopwatch.ElapsedMilliseconds;
var updateQueue = new ConcurrentQueue<OutboxUpdate>();
stepStopwatch.Restart(); foreach (var message in messages) { try { var messageType = Messaging.Contracts.AssemblyReference.Assembly.GetType(message.Type); var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
await publishEndpoint.Publish(deserializedMessage, messageType, cancellationToken);
updateQueue.Enqueue(new OutboxUpdate { Id = message.Id, ProcessedOnUtc = DateTime.UtcNow }); } catch (Exception ex) { updateQueue.Enqueue(new OutboxUpdate { Id = message.Id, ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString() }); } } var publishTime = stepStopwatch.ElapsedMilliseconds;
stepStopwatch.Restart(); foreach (var outboxUpdate in updateQueue) { await connection.ExecuteAsync( @""" UPDATE outbox_messages SET processed_on_utc = @ProcessedOnUtc, error = @Error WHERE id = @Id """, outboxUpdate, transaction: transaction); } var updateTime = stepStopwatch.ElapsedMilliseconds;
await transaction.CommitAsync(cancellationToken);
totalStopwatch.Stop(); var totalTime = totalStopwatch.ElapsedMilliseconds;
OutboxLoggers.Processing(logger, totalTime, queryTime, publishTime, updateTime, messages.Count);
return messages.Count; }
private struct OutboxUpdate { public Guid Id { get; init; } public DateTime ProcessedOnUtc { get; init; } public string? Error { get; init; } }}
复制代码


下面是有趣的部分!

优化读取查询

首先要优化的是获取未处理邮件的查询。如果我们不需要所有列(提示:确实不需要),执行 SELECT * 查询会有影响。


下面是当前的 SQL 查询:


SELECT *FROM outbox_messagesWHERE processed_on_utc IS NULLORDER BY occurred_on_utc LIMIT @BatchSize
复制代码


可以修改查询,只返回需要的列。这将节省一些带宽,但不会显著提高性能。


SELECT id AS Id, type AS Type, content as ContentFROM outbox_messagesWHERE processed_on_utc IS NULLORDER BY occurred_on_utc LIMIT @BatchSize
复制代码


检查一下该查询的执行计划,可以看到它正在执行表扫描。我在 PostgreSQL 上运行这个查询,下面是 EXPLAIN ANALYZE 的结果:


Limit  (cost=86169.40..86286.08 rows=1000 width=129) (actual time=122.744..124.234 rows=1000 loops=1)  ->  Gather Merge  (cost=86169.40..245080.50 rows=1362000 width=129) (actual time=122.743..124.198 rows=1000 loops=1)        Workers Planned: 2        Workers Launched: 2        ->  Sort  (cost=85169.38..86871.88 rows=681000 width=129) (actual time=121.478..121.492 rows=607 loops=3)              Sort Key: occurred_on_utc              Sort Method: top-N heapsort  Memory: 306kB              Worker 0:  Sort Method: top-N heapsort  Memory: 306kB              Worker 1:  Sort Method: top-N heapsort  Memory: 306kB              ->  Parallel Seq Scan on outbox_messages  (cost=0.00..47830.88 rows=681000 width=129) (actual time=0.016..67.481 rows=666667 loops=3)                    Filter: (processed_on_utc IS NULL)Planning Time: 0.051 msExecution Time: 124.298 ms
复制代码


现在创建一个针对该查询的覆盖索引,用于获取未处理的报文。覆盖索引包含满足查询所需的所有列,而无需访问表本身。


索引将针对 occurred_on_utcprocessed_on_utc 列,包括 idtypecontent 列。最后,我们将应用过滤器,只对未处理的邮件建立索引。


CREATE INDEX IF NOT EXISTS idx_outbox_messages_unprocessedON public.outbox_messages (occurred_on_utc, processed_on_utc)INCLUDE (id, type, content)WHERE processed_on_utc IS NULL
复制代码


下面来解释一下每个决定背后的原因:


  • occurred_on_utc 建立索引将以升序存储索引中的条目。这与查询中的 ORDER BY occurred_on_utc 语句相匹配。意味着查询可以扫描索引,而无需对结果进行排序。结果已经按照正确的排序顺序排列。

  • 在索引中包含选择的列,可以让我们从索引条目中返回这些列,从而避免从表行中读取值。

  • 在索引中过滤未处理的报文符合 WHERE processed_on_utc IS NULL 语句。


注意:PostgreSQL 的最大索引行大小为 2712B(不要问我是怎么知道的)。INCLUDE 列表中的列也是索引行(B-tree 元组)的一部分。content 列包含序列化的 JSON 信息,因此最有可能导致超出此限制。这也是没有办法的办法,所以建议尽可能减少消息的长度。也可以将这一列从 INCLUDE 列表中排除,对性能影响不大。


下面是创建该索引后更新的执行计划:


Limit  (cost=0.43..102.82 rows=1000 width=129) (actual time=0.016..0.160 rows=1000 loops=1)  ->  Index Only Scan using idx_outbox_messages_unprocessed on outbox_messages  (cost=0.43..204777.36 rows=2000000 width=129) (actual time=0.015..0.125 rows=1000 loops=1)        Heap Fetches: 0Planning Time: 0.059 msExecution Time: 0.189 ms
复制代码


因为有了覆盖索引,所以执行计划只包含 Index Only Scan (仅扫描索引)和 Limit(限制)操作,不需要进行过滤或排序,这就是性能大幅提升的原因。


查询时间对性能有什么影响?


  • 查询时间:70 毫秒 → 1 毫秒 (-98.5%)

优化消息发布

下一个可以优化的是如何向队列发布消息,我正在用来自 MassTransit 的 IPublishEndpoint 将消息发布到 RabbitMQ。


更准确的说,我们是在向交换机发布消息。然后,交换机会将消息路由到相应队列。


那么怎么才能优化呢?


可以做的一个微优化是为序列化中使用的消息类型引入缓存。对每种消息类型不断执行反射是很昂贵的,因此只需执行一次反射,并将结果保存起来。


var messageType = Messaging.Contracts.AssemblyReference.Assembly.GetType(message.Type);
复制代码


缓存可以是 ConcurrentDictionary,通过 GetOrAdd 来检索缓存类型。


把这段代码提取到 GetOrAddMessageType 辅助方法中:


private static readonly ConcurrentDictionary<string, Type> TypeCache = new();
private static Type GetOrAddMessageType(string typeName){ return TypeCache.GetOrAdd( typeName, name => Messaging.Contracts.AssemblyReference.Assembly.GetType(name));}
复制代码


这就是信息发布步骤,其中最大的问题是需要等待发布完成。发布需要一些时间,要等待消息代理的确认,我们在循环中进行确认,因此效率更低。


var updateQueue = new ConcurrentQueue<OutboxUpdate>();
foreach (var message in messages){ try { var messageType = Messaging.Contracts.AssemblyReference.Assembly.GetType(message.Type); var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
// 等待消息代理的确认 await publishEndpoint.Publish(deserializedMessage, messageType, cancellationToken);
updateQueue.Enqueue(new OutboxUpdate { Id = message.Id, ProcessedOnUtc = DateTime.UtcNow }); } catch (Exception ex) { updateQueue.Enqueue(new OutboxUpdate { Id = message.Id, ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString() }); }}
复制代码


可以通过批量发布来改善这种情况。事实上,IPublishEndpoint 有一个 PublishBatch 扩展方法。仔细看一下,就会发现:


// MassTransit implementationpublic static Task PublishBatch(    this IPublishEndpoint endpoint,    IEnumerable<object> messages,    CancellationToken cancellationToken = default){    return Task.WhenAll(messages.Select(x => endpoint.Publish(x, cancellationToken)));}
复制代码


因此,可以将消息集合转换为发布任务列表,然后使用 Task.WhenAll 等待发布任务。


var updateQueue = new ConcurrentQueue<OutboxUpdate>();
var publishTasks = messages .Select(message => PublishMessage(message, updateQueue, publishEndpoint, cancellationToken)) .ToList();
await Task.WhenAll(publishTasks);
// I extracted the message publishing into a separate method for readability.private static async Task PublishMessage( OutboxMessage message, ConcurrentQueue<OutboxUpdate> updateQueue, IPublishEndpoint publishEndpoint, CancellationToken cancellationToken){ try { var messageType = GetOrAddMessageType(message.Type); var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
await publishEndpoint.Publish(deserializedMessage, messageType, cancellationToken);
updateQueue.Enqueue(new OutboxUpdate { Id = message.Id, ProcessedOnUtc = DateTime.UtcNow }); } catch (Exception ex) { updateQueue.Enqueue(new OutboxUpdate { Id = message.Id, ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString() }); }}
复制代码


如何改进消息发布?


  • 发布时间:320ms → 289ms (-9.8%)


如你所见,速度并没有明显加快。但这是从其他优化中获益的必要条件。

优化更新查询

优化之旅的下一步是处理更新已处理消息的查询。


目前的实现效率很低,因为要为每条消息向数据库进行一次查询。


foreach (var outboxUpdate in updateQueue){    await connection.ExecuteAsync(        @"""        UPDATE outbox_messages        SET processed_on_utc = @ProcessedOnUtc, error = @Error        WHERE id = @Id        """,        outboxUpdate,        transaction: transaction);}
复制代码


如果你现在还不明白,那就记住核心就是批处理。我们需要一种向数据库发送大型 UPDATE 查询的方法。


我们必须为这个批量查询手动构造 SQL,将使用 Dapper 的 DynamicParameters 类型来提供所有参数。


var updateSql =    @"""    UPDATE outbox_messages    SET processed_on_utc = v.processed_on_utc,        error = v.error    FROM (VALUES        {0}    ) AS v(id, processed_on_utc, error)    WHERE outbox_messages.id = v.id::uuid    """;
var updates = updateQueue.ToList();var paramNames = string.Join(",", updates.Select((_, i) => $"(@Id{i}, @ProcessedOn{i}, @Error{i})"));
var formattedSql = string.Format(updateSql, paramNames);
var parameters = new DynamicParameters();
for (int i = 0; i < updates.Count; i++){ parameters.Add($"Id{i}", updates[i].Id.ToString()); parameters.Add($"ProcessedOn{i}", updates[i].ProcessedOnUtc); parameters.Add($"Error{i}", updates[i].Error);}
await connection.ExecuteAsync(formattedSql, parameters, transaction: transaction);
复制代码


这将构造类似下面这样的 SQL 查询:


UPDATE outbox_messagesSET processed_on_utc = v.processed_on_utc,    error = v.errorFROM (VALUES    (@Id0, @ProcessedOn0, @Error0),    (@Id1, @ProcessedOn1, @Error1),    (@Id2, @ProcessedOn2, @Error2),    -- A few hundred rows in beteween    (@Id999, @ProcessedOn999, @Error999)) AS v(id, processed_on_utc, error)WHERE outbox_messages.id = v.id::uuid
复制代码


我们可以发送一次查询来更新所有信息,而不是为每条信息发送一次更新查询。


显然这会带来明显的性能优势:


  • 更新时间:300ms → 52ms (-82.6%)

我们走了多远?

我们测试一下优化后的性能改进。到目前为止,所做的更改主要集中在提高发件箱处理进程的速度上。


以下是各个步骤优化后的粗略数字:


  • 查询时间: ~1ms

  • 发布时间: ~289ms

  • 更新时间: ~52ms


运行发件箱处理程序 1 分钟,并计算已处理邮件的数量。


优化后的实现方案在 1 分钟内处理了 162,000 条信息,即 2,700 MPS。


这意味着现在每天可处理超过 2.3 亿条消息。


但我们才刚刚开始。

发件箱并行处理

如果想更进一步,就必须扩展 OutboxProcessor。我们可能面临的问题是对同一消息进行多次处理。因此需要对当前批次的消息实现某种形式的锁定。


PostgreSQL 有一个方便的 FOR UPDATE 语句,可以在这里使用。它会在当前事务持续期间锁定所选记录。不过,我们必须添加 SKIP LOCKED 语句,以允许其他查询跳过锁定的记录。否则,任何其他查询都将被阻止,直到当前事务完成。


这是更新后的查询:


SELECT id AS Id, type AS Type, content as ContentFROM outbox_messagesWHERE processed_on_utc IS NULLORDER BY occurred_on_utc LIMIT @BatchSizeFOR UPDATE SKIP LOCKED
复制代码


要扩展发件箱处理进程,只需运行多个后台作业实例即可。


我用 Parallel.ForEachAsync 进行模拟,可以控制 MaxDegreeOfParallelism


var parallelOptions = new ParallelOptions{    MaxDegreeOfParallelism = _maxParallelism,    CancellationToken = cancellationToken};
await Parallel.ForEachAsync( Enumerable.Range(0, _maxParallelism), parallelOptions, async (_, token) => { await ProcessOutboxMessages(token); });
复制代码


我们可以在 1 分钟内处理 179,000 条信息,或者说通过 5 个处理进程处理 2,983 MPS 信息。


我觉得应该可以更快。


在不进行并行处理的情况下,我们能够获得大约 2,700 MPS。


一个新的瓶颈出现了:分批发布信息。


发布时间从 ~289ms 增加到 ~1,540ms。


有趣的是,如果将基本发布时间(一个工作进程的发布时间)乘以工作进程数量,就能大致得出新的发布时间。


我们正在浪费大量时间等待消息代理的确认。


如何解决这个问题?

批量发布消息

RabbitMQ 支持批量发布消息,可以在配置 MassTransit 时调用 ConfigureBatchPublish 方法来启用此功能。MassTransit 会在将消息发送到 RabbitMQ 之前对其进行缓冲,以提高吞吐量。


builder.Services.AddMassTransit(x =>{    x.UsingRabbitMq((context, cfg) =>    {        cfg.Host(builder.Configuration.GetConnectionString("Queue"), hostCfg =>        {            hostCfg.ConfigureBatchPublish(batch =>            {                batch.Enabled = true;            });        });
cfg.ConfigureEndpoints(context); });});
复制代码


只需稍作改动,用 5 个工作进程重新进行测试。


这次我们能够在 1 分钟内处理 195.6 万条信息。


这样,处理速度就达到了 32 500 MPS。


相当于每天处理超过 28 亿条消息。


就到这里吧,不过还有样东西需要讨论。

关闭发布确认(危险)

还可以做的一件事(不建议)是关闭发布确认。这意味着调用 Publish 时不会等待消息被代理确认。这可能会导致可靠性问题,并有可能丢失消息。


尽管如此,在关闭发布确认的情况下,还是获得了 ~37,000 MPS。


cfg.Host(builder.Configuration.GetConnectionString("Queue"), hostCfg =>{    hostCfg.PublisherConfirmation = false; // 危险!不建议!    hostCfg.ConfigureBatchPublish(batch =>    {        batch.Enabled = true;    });});
复制代码

扩容的主要考虑因素

虽然现在的吞吐量令人印象深刻,但在现实中实施这些技术时,还要考虑这些因素:


  • 消费者能力:消费者跟得上吗?提高生产者吞吐量而不匹配消费者容量会造成积压。扩展时要考虑整个流水线。

  • 交付保证:我们的优化措施能保证至少一次交付。将消费者设计为可幂等的,以处理偶尔出现的重复消息。

  • 报文排序:使用 FOR UPDATE SKIP LOCKED 进行并行处理可能会导致报文乱序。要严格排序,可考虑在用户端使用收件箱模式来缓冲消息。即使报文到达时顺序不对,收件箱也可以帮助我们按正确的顺序处理报文。

  • 可靠性与性能的权衡:关闭发布确认可提高速度,但有可能丢失信息。请根据具体需求权衡性能与可靠性。


通过解决这些问题,我们就能创建一个高性能的发件箱处理程序,与系统架构顺利整合。

总结

与最初的发件箱处理程序相比,我们已经取得了长足进步。以下是我们的成果:


  • 利用智能索引优化数据库查询

  • 通过批处理改进消息发布

  • 利用批处理功能简化数据库更新

  • 使用并行工作进程扩大发件箱处理规模

  • 利用 RabbitMQ 的批量发布功能


结果如何?我们将消息处理速度从每秒 1,350 条提升到了令人印象深刻的 32,500 MPS,相当于每天处理超过 28 亿条信息!


扩展不仅仅是原始速度的问题,还涉及到识别和解决每个步骤的瓶颈问题。通过测量、优化和重新思考采用的方法,可以实现了巨大的性能提升。




你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。为了方便大家以后能第一时间看到文章,请朋友们关注公众号"DeepNoMind",并设个星标吧,如果能一键三连(转发、点赞、在看),则能给我带来更多的支持和动力,激励我持续写下去,和大家共同成长进步!

发布于: 2025-05-07阅读数: 2
用户头像

俞凡

关注

公众号:DeepNoMind 2017-10-18 加入

俞凡,Mavenir Systems研发总监,关注高可用架构、高性能服务、5G、人工智能、区块链、DevOps、Agile等。公众号:DeepNoMind

评论

发布
暂无评论
高性能发件箱模式(每天处理20亿条消息)_最佳实践_俞凡_InfoQ写作社区