本文介绍了如何扩展、优化发件箱模式的处理性能,从而为分布式系统打造高性能、高吞吐量的消息传递机制。原文:Scaling the Outbox Pattern (2B+ messages per day)
在上一篇文章中,我们谈到了发件箱模式的实现。发件箱模式是分布式消息可靠传递的重要工具,但实现只是第一步。
真正的挑战是什么?对其扩容从而处理大量信息。
今天,我们将更上一层楼,从一个基本发件箱处理进程开始,将它改造为每天能处理超过 20 亿条信息的高性能引擎。
让我们深入了解一下!
开始
我们从这里开始,现在有一个发件箱处理进程(OutboxProcessor),用于轮询未处理的邮件并将其发布到队列中。我们首先可以调整的是频率和批量大小。
 internal sealed class OutboxProcessor(NpgsqlDataSource dataSource, IPublishEndpoint publishEndpoint){    private const int BatchSize = 1000;
    public async Task<int> Execute(CancellationToken cancellationToken = default)    {        await using var connection = await dataSource.OpenConnectionAsync(cancellationToken);        await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
        var messages = await connection.QueryAsync<OutboxMessage>(            @"""            SELECT *            FROM outbox_messages            WHERE processed_on_utc IS NULL            ORDER BY occurred_on_utc LIMIT @BatchSize            """,            new { BatchSize },            transaction: transaction);
        foreach (var message in messages)        {            try            {                var messageType = Messaging.Contracts.AssemblyReference.Assembly.GetType(message.Type);                var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
                await publishEndpoint.Publish(deserializedMessage, messageType, cancellationToken);
                await connection.ExecuteAsync(                    @"""                    UPDATE outbox_messages                    SET processed_on_utc = @ProcessedOnUtc                    WHERE id = @Id                    """,                    new { ProcessedOnUtc = DateTime.UtcNow, message.Id },                    transaction: transaction);            }            catch (Exception ex)            {                await connection.ExecuteAsync(                    @"""                    UPDATE outbox_messages                    SET processed_on_utc = @ProcessedOnUtc, error = @Error                    WHERE id = @Id                    """,                    new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },                    transaction: transaction);            }        }
        await transaction.CommitAsync(cancellationToken);
        return messages.Count;    }}
       复制代码
 
假设连续运行 OutboxProcessor,将批量大小增加到 1000。
能处理多少消息?
运行发件箱处理程序 1 分钟,我们来算一下处理了多少消息。
基础实现在一分钟内处理了 81,000 条信息,即 1,350 MPS(每秒信息)。
还不错,不过我们看看还能改进多少。
测量每个步骤
你无法改进无法测量的东西。对吧?所以,我会用 Stopwatch 来测量总执行时间和每个步骤所需时间。
请注意,我还拆分了发布和更新步骤,这样就可以分别测量发布和更新的时间。这一点很重要,因为我想分别优化每个步骤。
下面是基础实现中每个步骤的执行时间:
查询时间: ~70ms
发布时间: ~320ms
更新时间: ~300ms
 internal sealed class OutboxProcessor(    NpgsqlDataSource dataSource,    IPublishEndpoint publishEndpoint,    ILogger<OutboxProcessor> logger){    private const int BatchSize = 1000;
    public async Task<int> Execute(CancellationToken cancellationToken = default)    {        var totalStopwatch = Stopwatch.StartNew();        var stepStopwatch = new Stopwatch();
        await using var connection = await dataSource.OpenConnectionAsync(cancellationToken);        await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
        stepStopwatch.Restart();        var messages = (await connection.QueryAsync<OutboxMessage>(            @"""            SELECT *            FROM outbox_messages            WHERE processed_on_utc IS NULL            ORDER BY occurred_on_utc LIMIT @BatchSize            """,            new { BatchSize },            transaction: transaction)).AsList();        var queryTime = stepStopwatch.ElapsedMilliseconds;
        var updateQueue = new ConcurrentQueue<OutboxUpdate>();
        stepStopwatch.Restart();        foreach (var message in messages)        {            try            {                var messageType = Messaging.Contracts.AssemblyReference.Assembly.GetType(message.Type);                var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
                await publishEndpoint.Publish(deserializedMessage, messageType, cancellationToken);
                updateQueue.Enqueue(new OutboxUpdate                {                    Id = message.Id,                    ProcessedOnUtc = DateTime.UtcNow                });            }            catch (Exception ex)            {                updateQueue.Enqueue(new OutboxUpdate                {                    Id = message.Id,                    ProcessedOnUtc = DateTime.UtcNow,                    Error = ex.ToString()                });            }        }        var publishTime = stepStopwatch.ElapsedMilliseconds;
        stepStopwatch.Restart();        foreach (var outboxUpdate in updateQueue)        {            await connection.ExecuteAsync(                @"""                UPDATE outbox_messages                SET processed_on_utc = @ProcessedOnUtc, error = @Error                WHERE id = @Id                """,                outboxUpdate,                transaction: transaction);        }        var updateTime = stepStopwatch.ElapsedMilliseconds;
        await transaction.CommitAsync(cancellationToken);
        totalStopwatch.Stop();        var totalTime = totalStopwatch.ElapsedMilliseconds;
        OutboxLoggers.Processing(logger, totalTime, queryTime, publishTime, updateTime, messages.Count);
        return messages.Count;    }
    private struct OutboxUpdate    {        public Guid Id { get; init; }        public DateTime ProcessedOnUtc { get; init; }        public string? Error { get; init; }    }}
       复制代码
 
下面是有趣的部分!
优化读取查询
首先要优化的是获取未处理邮件的查询。如果我们不需要所有列(提示:确实不需要),执行 SELECT * 查询会有影响。
下面是当前的 SQL 查询:
 SELECT *FROM outbox_messagesWHERE processed_on_utc IS NULLORDER BY occurred_on_utc LIMIT @BatchSize
       复制代码
 
可以修改查询,只返回需要的列。这将节省一些带宽,但不会显著提高性能。
 SELECT id AS Id, type AS Type, content as ContentFROM outbox_messagesWHERE processed_on_utc IS NULLORDER BY occurred_on_utc LIMIT @BatchSize
       复制代码
 
检查一下该查询的执行计划,可以看到它正在执行表扫描。我在 PostgreSQL 上运行这个查询,下面是 EXPLAIN ANALYZE 的结果:
 Limit  (cost=86169.40..86286.08 rows=1000 width=129) (actual time=122.744..124.234 rows=1000 loops=1)  ->  Gather Merge  (cost=86169.40..245080.50 rows=1362000 width=129) (actual time=122.743..124.198 rows=1000 loops=1)        Workers Planned: 2        Workers Launched: 2        ->  Sort  (cost=85169.38..86871.88 rows=681000 width=129) (actual time=121.478..121.492 rows=607 loops=3)              Sort Key: occurred_on_utc              Sort Method: top-N heapsort  Memory: 306kB              Worker 0:  Sort Method: top-N heapsort  Memory: 306kB              Worker 1:  Sort Method: top-N heapsort  Memory: 306kB              ->  Parallel Seq Scan on outbox_messages  (cost=0.00..47830.88 rows=681000 width=129) (actual time=0.016..67.481 rows=666667 loops=3)                    Filter: (processed_on_utc IS NULL)Planning Time: 0.051 msExecution Time: 124.298 ms
       复制代码
 
现在创建一个针对该查询的覆盖索引,用于获取未处理的报文。覆盖索引包含满足查询所需的所有列,而无需访问表本身。
索引将针对 occurred_on_utc 和 processed_on_utc 列,包括 id、type 和 content 列。最后,我们将应用过滤器,只对未处理的邮件建立索引。
 CREATE INDEX IF NOT EXISTS idx_outbox_messages_unprocessedON public.outbox_messages (occurred_on_utc, processed_on_utc)INCLUDE (id, type, content)WHERE processed_on_utc IS NULL
       复制代码
 
下面来解释一下每个决定背后的原因:
为 occurred_on_utc 建立索引将以升序存储索引中的条目。这与查询中的 ORDER BY occurred_on_utc 语句相匹配。意味着查询可以扫描索引,而无需对结果进行排序。结果已经按照正确的排序顺序排列。
在索引中包含选择的列,可以让我们从索引条目中返回这些列,从而避免从表行中读取值。
在索引中过滤未处理的报文符合 WHERE processed_on_utc IS NULL 语句。
注意:PostgreSQL 的最大索引行大小为 2712B(不要问我是怎么知道的)。INCLUDE 列表中的列也是索引行(B-tree 元组)的一部分。content 列包含序列化的 JSON 信息,因此最有可能导致超出此限制。这也是没有办法的办法,所以建议尽可能减少消息的长度。也可以将这一列从 INCLUDE 列表中排除,对性能影响不大。
下面是创建该索引后更新的执行计划:
 Limit  (cost=0.43..102.82 rows=1000 width=129) (actual time=0.016..0.160 rows=1000 loops=1)  ->  Index Only Scan using idx_outbox_messages_unprocessed on outbox_messages  (cost=0.43..204777.36 rows=2000000 width=129) (actual time=0.015..0.125 rows=1000 loops=1)        Heap Fetches: 0Planning Time: 0.059 msExecution Time: 0.189 ms
       复制代码
 
因为有了覆盖索引,所以执行计划只包含 Index Only Scan (仅扫描索引)和 Limit(限制)操作,不需要进行过滤或排序,这就是性能大幅提升的原因。
查询时间对性能有什么影响?
优化消息发布
下一个可以优化的是如何向队列发布消息,我正在用来自 MassTransit 的 IPublishEndpoint 将消息发布到 RabbitMQ。
更准确的说,我们是在向交换机发布消息。然后,交换机会将消息路由到相应队列。
那么怎么才能优化呢?
可以做的一个微优化是为序列化中使用的消息类型引入缓存。对每种消息类型不断执行反射是很昂贵的,因此只需执行一次反射,并将结果保存起来。
 var messageType = Messaging.Contracts.AssemblyReference.Assembly.GetType(message.Type);
       复制代码
 
缓存可以是 ConcurrentDictionary,通过 GetOrAdd 来检索缓存类型。
把这段代码提取到 GetOrAddMessageType 辅助方法中:
 private static readonly ConcurrentDictionary<string, Type> TypeCache = new();
private static Type GetOrAddMessageType(string typeName){    return TypeCache.GetOrAdd(        typeName,        name => Messaging.Contracts.AssemblyReference.Assembly.GetType(name));}
       复制代码
 
这就是信息发布步骤,其中最大的问题是需要等待发布完成。发布需要一些时间,要等待消息代理的确认,我们在循环中进行确认,因此效率更低。
 var updateQueue = new ConcurrentQueue<OutboxUpdate>();
foreach (var message in messages){    try    {        var messageType = Messaging.Contracts.AssemblyReference.Assembly.GetType(message.Type);        var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
        // 等待消息代理的确认        await publishEndpoint.Publish(deserializedMessage, messageType, cancellationToken);
        updateQueue.Enqueue(new OutboxUpdate        {            Id = message.Id,            ProcessedOnUtc = DateTime.UtcNow        });    }    catch (Exception ex)    {        updateQueue.Enqueue(new OutboxUpdate        {            Id = message.Id,            ProcessedOnUtc = DateTime.UtcNow,            Error = ex.ToString()        });    }}
       复制代码
 
可以通过批量发布来改善这种情况。事实上,IPublishEndpoint 有一个 PublishBatch 扩展方法。仔细看一下,就会发现:
 // MassTransit implementationpublic static Task PublishBatch(    this IPublishEndpoint endpoint,    IEnumerable<object> messages,    CancellationToken cancellationToken = default){    return Task.WhenAll(messages.Select(x => endpoint.Publish(x, cancellationToken)));}
       复制代码
 
因此,可以将消息集合转换为发布任务列表,然后使用 Task.WhenAll 等待发布任务。
 var updateQueue = new ConcurrentQueue<OutboxUpdate>();
var publishTasks = messages    .Select(message => PublishMessage(message, updateQueue, publishEndpoint, cancellationToken))    .ToList();
await Task.WhenAll(publishTasks);
// I extracted the message publishing into a separate method for readability.private static async Task PublishMessage(    OutboxMessage message,    ConcurrentQueue<OutboxUpdate> updateQueue,    IPublishEndpoint publishEndpoint,    CancellationToken cancellationToken){    try    {        var messageType = GetOrAddMessageType(message.Type);        var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
        await publishEndpoint.Publish(deserializedMessage, messageType, cancellationToken);
        updateQueue.Enqueue(new OutboxUpdate        {            Id = message.Id,            ProcessedOnUtc = DateTime.UtcNow        });    }    catch (Exception ex)    {        updateQueue.Enqueue(new OutboxUpdate        {            Id = message.Id,            ProcessedOnUtc = DateTime.UtcNow,            Error = ex.ToString()        });    }}
       复制代码
 
如何改进消息发布?
如你所见,速度并没有明显加快。但这是从其他优化中获益的必要条件。
优化更新查询
优化之旅的下一步是处理更新已处理消息的查询。
目前的实现效率很低,因为要为每条消息向数据库进行一次查询。
 foreach (var outboxUpdate in updateQueue){    await connection.ExecuteAsync(        @"""        UPDATE outbox_messages        SET processed_on_utc = @ProcessedOnUtc, error = @Error        WHERE id = @Id        """,        outboxUpdate,        transaction: transaction);}
       复制代码
 
如果你现在还不明白,那就记住核心就是批处理。我们需要一种向数据库发送大型 UPDATE 查询的方法。
我们必须为这个批量查询手动构造 SQL,将使用 Dapper 的 DynamicParameters 类型来提供所有参数。
 var updateSql =    @"""    UPDATE outbox_messages    SET processed_on_utc = v.processed_on_utc,        error = v.error    FROM (VALUES        {0}    ) AS v(id, processed_on_utc, error)    WHERE outbox_messages.id = v.id::uuid    """;
var updates = updateQueue.ToList();var paramNames = string.Join(",", updates.Select((_, i) => $"(@Id{i}, @ProcessedOn{i}, @Error{i})"));
var formattedSql = string.Format(updateSql, paramNames);
var parameters = new DynamicParameters();
for (int i = 0; i < updates.Count; i++){    parameters.Add($"Id{i}", updates[i].Id.ToString());    parameters.Add($"ProcessedOn{i}", updates[i].ProcessedOnUtc);    parameters.Add($"Error{i}", updates[i].Error);}
await connection.ExecuteAsync(formattedSql, parameters, transaction: transaction);
       复制代码
 
这将构造类似下面这样的 SQL 查询:
 UPDATE outbox_messagesSET processed_on_utc = v.processed_on_utc,    error = v.errorFROM (VALUES    (@Id0, @ProcessedOn0, @Error0),    (@Id1, @ProcessedOn1, @Error1),    (@Id2, @ProcessedOn2, @Error2),    -- A few hundred rows in beteween    (@Id999, @ProcessedOn999, @Error999)) AS v(id, processed_on_utc, error)WHERE outbox_messages.id = v.id::uuid
       复制代码
 
我们可以发送一次查询来更新所有信息,而不是为每条信息发送一次更新查询。
显然这会带来明显的性能优势:
我们走了多远?
我们测试一下优化后的性能改进。到目前为止,所做的更改主要集中在提高发件箱处理进程的速度上。
以下是各个步骤优化后的粗略数字:
查询时间: ~1ms
发布时间: ~289ms
更新时间: ~52ms
运行发件箱处理程序 1 分钟,并计算已处理邮件的数量。
优化后的实现方案在 1 分钟内处理了 162,000 条信息,即 2,700 MPS。
这意味着现在每天可处理超过 2.3 亿条消息。
但我们才刚刚开始。
发件箱并行处理
如果想更进一步,就必须扩展 OutboxProcessor。我们可能面临的问题是对同一消息进行多次处理。因此需要对当前批次的消息实现某种形式的锁定。
PostgreSQL 有一个方便的 FOR UPDATE 语句,可以在这里使用。它会在当前事务持续期间锁定所选记录。不过,我们必须添加 SKIP LOCKED 语句,以允许其他查询跳过锁定的记录。否则,任何其他查询都将被阻止,直到当前事务完成。
这是更新后的查询:
 SELECT id AS Id, type AS Type, content as ContentFROM outbox_messagesWHERE processed_on_utc IS NULLORDER BY occurred_on_utc LIMIT @BatchSizeFOR UPDATE SKIP LOCKED
       复制代码
 
要扩展发件箱处理进程,只需运行多个后台作业实例即可。
我用 Parallel.ForEachAsync 进行模拟,可以控制 MaxDegreeOfParallelism。
 var parallelOptions = new ParallelOptions{    MaxDegreeOfParallelism = _maxParallelism,    CancellationToken = cancellationToken};
await Parallel.ForEachAsync(    Enumerable.Range(0, _maxParallelism),    parallelOptions,    async (_, token) =>    {        await ProcessOutboxMessages(token);    });
       复制代码
 
我们可以在 1 分钟内处理 179,000 条信息,或者说通过 5 个处理进程处理 2,983 MPS 信息。
我觉得应该可以更快。
在不进行并行处理的情况下,我们能够获得大约 2,700 MPS。
一个新的瓶颈出现了:分批发布信息。
发布时间从 ~289ms 增加到 ~1,540ms。
有趣的是,如果将基本发布时间(一个工作进程的发布时间)乘以工作进程数量,就能大致得出新的发布时间。
我们正在浪费大量时间等待消息代理的确认。
如何解决这个问题?
批量发布消息
RabbitMQ 支持批量发布消息,可以在配置 MassTransit 时调用 ConfigureBatchPublish 方法来启用此功能。MassTransit 会在将消息发送到 RabbitMQ 之前对其进行缓冲,以提高吞吐量。
 builder.Services.AddMassTransit(x =>{    x.UsingRabbitMq((context, cfg) =>    {        cfg.Host(builder.Configuration.GetConnectionString("Queue"), hostCfg =>        {            hostCfg.ConfigureBatchPublish(batch =>            {                batch.Enabled = true;            });        });
        cfg.ConfigureEndpoints(context);    });});
       复制代码
 
只需稍作改动,用 5 个工作进程重新进行测试。
这次我们能够在 1 分钟内处理 195.6 万条信息。
这样,处理速度就达到了 32 500 MPS。
相当于每天处理超过 28 亿条消息。
就到这里吧,不过还有样东西需要讨论。
关闭发布确认(危险)
还可以做的一件事(不建议)是关闭发布确认。这意味着调用 Publish 时不会等待消息被代理确认。这可能会导致可靠性问题,并有可能丢失消息。
尽管如此,在关闭发布确认的情况下,还是获得了 ~37,000 MPS。
 cfg.Host(builder.Configuration.GetConnectionString("Queue"), hostCfg =>{    hostCfg.PublisherConfirmation = false; // 危险!不建议!    hostCfg.ConfigureBatchPublish(batch =>    {        batch.Enabled = true;    });});
       复制代码
 扩容的主要考虑因素
虽然现在的吞吐量令人印象深刻,但在现实中实施这些技术时,还要考虑这些因素:
消费者能力:消费者跟得上吗?提高生产者吞吐量而不匹配消费者容量会造成积压。扩展时要考虑整个流水线。
交付保证:我们的优化措施能保证至少一次交付。将消费者设计为可幂等的,以处理偶尔出现的重复消息。
报文排序:使用 FOR UPDATE SKIP LOCKED 进行并行处理可能会导致报文乱序。要严格排序,可考虑在用户端使用收件箱模式来缓冲消息。即使报文到达时顺序不对,收件箱也可以帮助我们按正确的顺序处理报文。
可靠性与性能的权衡:关闭发布确认可提高速度,但有可能丢失信息。请根据具体需求权衡性能与可靠性。
通过解决这些问题,我们就能创建一个高性能的发件箱处理程序,与系统架构顺利整合。
总结
与最初的发件箱处理程序相比,我们已经取得了长足进步。以下是我们的成果:
利用智能索引优化数据库查询
通过批处理改进消息发布
利用批处理功能简化数据库更新
使用并行工作进程扩大发件箱处理规模
利用 RabbitMQ 的批量发布功能
结果如何?我们将消息处理速度从每秒 1,350 条提升到了令人印象深刻的 32,500 MPS,相当于每天处理超过 28 亿条信息!
扩展不仅仅是原始速度的问题,还涉及到识别和解决每个步骤的瓶颈问题。通过测量、优化和重新思考采用的方法,可以实现了巨大的性能提升。
你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。为了方便大家以后能第一时间看到文章,请朋友们关注公众号"DeepNoMind",并设个星标吧,如果能一键三连(转发、点赞、在看),则能给我带来更多的支持和动力,激励我持续写下去,和大家共同成长进步!
评论