消息队列存储消息数据的 MySQL 表格设计
msg 表如下:
基本流程为:
select * from msg order by id asc, retry_times asc limit 1,读取消息
处理消息
处理成功,则删除消息
处理失败,更新 retry_times
一、检测消息的丢失
这里使用消息队列的有序性来验证是否有消息丢失。原理是使用 id 字段,每个消息都是连续递增的序号,消费的时候 Consumer 端检查这个序号的连续性。如果没有消息丢失,Consumer 收到的序号是联系的。假如说收到的序号不是上一条消息的序号加 1,那就是丢消息了。同时通过缺失的序号可以定位是哪条消息丢失了。
二、消息过程中的重复消息
通过行锁来解决重复消息。批量读取数据,随机选择一行,然后讲该行数据的 lockid 更新为 hostname-pid,之后再根据 lockid 去查询一行数据。流程如下:
select * from msg where lockid=' ' order by id asc, retrytimes asc limit 100
随机选择一行数据,得到 id
为该行数据加锁 update msg set lockid = hostname-pid, uptdatedat = now()
读取对应数据进行消费
消费成功,删除改行数据
消费失败,释放锁,更新 retry_times
三、序列化和反序列化
使用 mysql blob 类型来进行消息的序列化和反序列化
四、失败处理
消费消息有几种异常情况:
消息消费超时
释放锁失败
从队列中移除消息失败
对于消息消费超时和释放锁失败,直接重新释放锁,将 lock_id 设置为空字符串,等待下一次去处理。第三种情况是消费成功了,但是没有从队列中移除。把删除消息和消费消息做成一个事务,保证消息一定是消费成功才被删除。
评论