写点什么

商品中心—商品消息处理系统的技术文档

  • 2025-07-04
    福建
  • 本文字数:21069 字

    阅读完需:约 69 分钟

1.商品消息处理系统


(1)商品消息处理系统架构设计



(2)商品系统内部需要消费的消息


一.item_info 表变更

topic:data_change_topic

MQ 消息体:

{    "action": "UPDATE",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE    "table": "item_info",//更新表的表名    "updateColumn": [        "item_status"//item_info表item_status变更    ],    "data": [{        "column": "id",//item_info表主键id        "value": 1    }, {        "column": "itemId",//item_info表item_id        "value": "100000476748"    }]}
复制代码


二.sku_info 表变更

topic:data_change_topic

MQ 消息体:

{    "action": "UPDATE",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE    "table": "sku_info",//更新表的表名    "updateColumn": [        "sku_name"//sku_info表sku_name变更    ],    "data": [{        "column": "id",//sku_info表主键id        "value": 1    }, {        "column": "itemId",//sku_info表item_id        "value": "100000476748"    }, {        "column": "skuId",//sku_info表sku_id        "value": "8000476872"    }]}
复制代码


三.attribute_extend 表变更

topic:data_change_topic

MQ 消息体:

{    "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE    "table": "attribute_extend",//更新表的表名    "data": [{        "column": "id",//attribute_extend表主键id        "value": 1    }]}
复制代码


四.front_category_relation 表变更

topic:data_change_topic

MQ 消息体:

{    "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE    "table": "front_category_relation",//更新表的表名    "data": [{        "column": "id",//front_category_relation表主键id        "value": 1    }]}
复制代码


五.sku_seller_relation 表变更

topic:data_change_topic

MQ 消息体:

{    "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE    "table": "sku_seller_relation",//更新表的表名    "data": [{        "column": "id",//sku_seller_relation表主键id        "value": 1    }]}
复制代码


六.item_period_stage 表变更

topic:interior_item_expri_result_topic

MQ 消息体:

{    "action": "update",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE    "table": "item_period_stage",//更新表的表名    "data": [{        "column": "period_stage",//sku_seller_relation表主键id        "value": 2    }]}
复制代码


(3)商品系统外部需要消费的消息


新建或编辑商品发送 MQ 消息

topic:open_product_topic

MQ 消息体:

{    "action": "INSERT", //执⾏发送消息的触发动作:INSERT、UPDATE、DELETE    "data": {        "itemId": "100000476748",        "itemStatus": 3,        "skuId": "8000476872"    }}
复制代码


(4)消息处理相关表


一.数据变更监听表

数据变更监听表中配置哪些表需要监听。

create table data_change_listen_config (    id int unsigned auto_increment comment '主键' primary key,    table_name varchar(40) null comment '数据表名称',    key_column varchar(40) default 'id' null comment '数据表对应的主键或业务id列名',    filter_flag tinyint(1) null comment '是否过滤',    del_flag tinyint(1) null comment '是否删除',    create_user int null comment '创建⼈',    create_time datetime null comment '创建时间',    update_user int null comment '更新⼈',    update_time datetime null comment '更新时间') comment '数据变更监听表';//一个表只有一条记录
复制代码


二.监听表变化字段配置表

监听表变化字段配置表中配置哪些字段变更需要监听,只要满⾜其中配置的有⼀个字段值变更,就需要发送内部消息通知订阅⽅。

create table data_change_column_config (    id int unsigned auto_increment primary key,    listen_id int null comment '监听表id',    listen_column varchar(40) null comment '监听字段',    del_flag tinyint(1) null comment '删除标记',    create_user int null comment '创建⼈',    create_time datetime null comment '创建时间',    update_user int null comment '更新⼈',    update_time datetime null comment '更新时间') comment '监听表变化字段配置表';
复制代码


三.监听表消息模型表

监听表消息模型表中配置的是数据变更后,消息是内部消息还是外部消息,消息发送的 topic,消息延迟等级,消息需要通知的字段。

create table data_change_message_config (    id int unsigned null comment '主键',    listen_id int null comment '监听表id',    notify_column varchar(2000) null comment '变更通知字段,逗号分隔',    message_topic varchar(256) null comment '变更通知消息主题',    delay_level int null comment '延迟等级',    message_type tinyint(3) null comment '消息类型',    del_flag tinyint(1) null comment '删除标记',    create_user int null comment '创建⼈',    create_time datetime null comment '创建时间',    update_user int null comment '更新⼈',    update_time datetime null comment '更新时间') comment '监听表消息模型表';
复制代码


四.外部消息记录表

外部消息记录表中记录的是内部消息发送后⽣成的消息编号,通过回调消息查找外部消息的消息内容。

create table data_message_detail (    id int unsigned auto_increment comment '主键' primary key,    message_no varchar(64) null comment '消息编号',    table_data_json text null comment '变化的表信息内容',    diff_data_arr varchar(2000) null comment '消息变化字段数组,多个,分割',    table_name varchar(64) null comment '更新表的表名 ',    action varchar(64) null comment '执⾏发送消息的触发动作:INSERT、UPDATE、DELETE ',    del_flag tinyint(1) null comment '删除标记',    create_user int null comment '创建⼈',    create_time datetime null comment '创建时间',    update_user int null comment '更新⼈',    update_time datetime null comment '更新时间') comment '外部消息记录表';
复制代码


(5)消息处理流程设计


 

2.消费和处理 binlog 消息


(1)消费 Canal 发送到 MQ 的 binlog 消息


@Configurationpublic class ConsumerBeanConfig {    //配置内容对象    @Autowired    private RocketMQProperties rocketMQProperties;        //消费系统内部消息——处理binlog消息    @Bean("dataChangeTopic")    public DefaultMQPushConsumer createItemStageConsumer(DataChangeListener dataChangeListener) throws MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.DATA_CHANGE_CONSUMER_GROUP);        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());        consumer.subscribe(RocketMqConstant.DATA_CHANGE_TOPIC, "*");        consumer.registerMessageListener(dataChangeListener);        consumer.start();        return consumer;    }    ...}
@Componentpublic class DataChangeListener implements MessageListenerConcurrently { @Autowired private MessageService messageService; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String msg = new String(messageExt.getBody()); log.info("数据变更消息通知,消息内容:{}", msg); //获取binlog对象 BinlogData binlogData = BinlogUtils.getBinlogData(msg); if (Objects.isNull(binlogData) || Objects.isNull(binlogData.getDataMap())) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //操作类型不是insert,delete,update的,不作处理 String operateType = binlogData.getOperateType(); if (!BinlogType.INSERT.getValue().equals(operateType) && !BinlogType.DELETE.getValue().equals(operateType) && !BinlogType.UPDATE.getValue().equals(operateType)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //处理binlog消息 messageService.processBinlogMessage(binlogData); } } catch (Exception e) { log.error("consume error, 消费数据变更消息失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
复制代码


(2)解析 MQ 消息字符串成 binlog 对象


//MySQL的binlog对象@Datapublic class BinlogData implements Serializable {    //binlog对应的表名    private String tableName;    //操作时间    private Long operateTime;    //操作类型    private String operateType;    //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串    private List<Map<String, Object>> dataMap;    //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串    private List<Map<String, Object>> oldMap;}
//MySQL binlog解析工具类public abstract class BinlogUtils { //解析binlog json字符串 public static BinlogData getBinlogData(String binlogStr) { //isJson方法里面会判断字符串是不是为空,所以这里不需要重复判断 if (JSONUtil.isJson(binlogStr)) { JSONObject binlogJson = JSONUtil.parseObj(binlogStr); //不处理DDL的binlog,只处理数据变更 if (binlogJson.getBool("isDdl")) { return null; } BinlogData binlogData = new BinlogData(); //表名 String tableName = binlogJson.getStr("table"); binlogData.setTableName(tableName); //操作类型 String operateType = binlogJson.getStr("type"); binlogData.setOperateType(operateType); //操作时间 Long operateTime = binlogJson.getLong("ts"); binlogData.setOperateTime(operateTime); //data数据 JSONArray dataArray = binlogJson.getJSONArray("data"); List<Map<String, Object>> dataMap = jsonArrayToMapList(dataArray); binlogData.setDataMap(dataMap); if (!binlogJson.isNull("old")) { //old数据 JSONArray oldArray = binlogJson.getJSONArray("old"); List<Map<String, Object>> oldMap = jsonArrayToMapList(oldArray); binlogData.setOldMap(oldMap); } return binlogData; } return null; } private static List<Map<String, Object>> jsonArrayToMapList(JSONArray jsonArray) { if (null != jsonArray) { Iterable<JSONObject> arrayIterator = jsonArray.jsonIter(); //遍历data节点或old节点并返回Map if (null != arrayIterator) { //binlog的data数组或old数组里数据的类型为Map List<Map<String, Object>> dataMap = new ArrayList<>(); while (arrayIterator.iterator().hasNext()) { JSONObject jsonObject = arrayIterator.iterator().next(); Map<String, Object> data = new HashMap<>(jsonObject.size()); jsonObject.keySet().forEach(key -> { data.put(key, jsonObject.get(key)); }); dataMap.add(data); } return dataMap; } } return null; }}
复制代码


(3)处理 binlog 消息


步骤一:通过缓存组件获取当前表的配置监听信息

步骤二:将 Binlog 对象封装为数据变更对象

步骤三:通过缓存组件获取配置的消息模型对象

步骤四:组装需要发送的数据变更消息对象

步骤五:发送内部消息


@Servicepublic class MessageServiceImpl implements MessageService {    ...    //处理binlog消息    @Override    public void processBinlogMessage(BinlogData binlogData) {        //1.通过缓存组件获取当前表的配置监听信息        DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());        //未配置监听信息的表,不作处理        if (Objects.isNull(listenConfigDO)) {            return;        }                //2.将binlog对象封装为数据变更对象        List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);        //不需要监听,或者要监听的字段值未变动        if (CollectionUtils.isEmpty(dataChangeMessages)) {            return;        }                //3.通过缓存组件获取配置的消息模型对象        List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());        //不需要发送消息        if (CollectionUtils.isEmpty(messageConfigBOS)) {            return;        }                //4.组装需要发送的数据变更消息对象        List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);        //待发送的消息为空,无需处理        if (CollectionUtils.isEmpty(sendDataMessageList)) {            return;        }                //5.发送内部消息        sendDataMessage(sendDataMessageList);                //6.如果存在外部消息配置则保存数据变更对象详情        if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {            //保存外部消息详细信息            saveDataMessageDetail(dataChangeMessages, binlogData);        }    }    ...}
复制代码


(4)商品系统自研缓存组件 Redis + DB 读写实现


一.通过缓存组件获取监听的配置信息

@Repositorypublic class DataChangeRepository {    @Resource    private RedisReadWriteManager redisReadWriteManager;    ...        //根据表名获取监听配置信息    public DataChangeListenConfigDO getListenConfigByTable(String tableName) {        Optional<DataChangeListenConfigDO> optional = redisReadWriteManager.getRedisStringDataByCache(            tableName,            DataChangeListenConfigDO.class,            AbstractRedisKeyConstants::getListenConfigStringKey,            this::getListenConfigByTableFromDB        );        return optional.orElse(null);    }        //获取监听变更字段配置表信息    public List<DataChangeColumnConfigDO> getColumnConfigByListenId(Long id) {        Optional<List<DataChangeColumnConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(            id,            DataChangeColumnConfigDO.class,            AbstractRedisKeyConstants::getColumnConfigStringKey,            this::getColumnConfigByListenIdFromDB        );        return optional.orElse(null);    }        //获取监听表消息模型配置    public List<DataChangeMessageConfigDO> getMessageConfigByListenId(Long id, MessageTypeEnum messageTypeEnum) {        //获取监听表消息模型配置        Optional<List<DataChangeMessageConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(            id,            DataChangeMessageConfigDO.class,            AbstractRedisKeyConstants::getMessageConfigStringKey,            this::getMessageConfigByListenIdFromDB        );                //如果未指定是内部消息还是外部消息,则不需要过滤        if (Objects.isNull(messageTypeEnum)) {            return optional.orElse(null);        }        return optional.map(dataChangeMessageConfigDOS -> dataChangeMessageConfigDOS.stream()            .filter(messageConfigBO -> messageTypeEnum.getCode().equals(messageConfigBO.getMessageType()))            .collect(Collectors.toList())).orElse(null);    }        //根据表名获取监听配置信息    public Optional<DataChangeListenConfigDO> getListenConfigByTableFromDB(String tableName) {        LambdaQueryWrapper<DataChangeListenConfigDO> queryWrapper = Wrappers.lambdaQuery();        queryWrapper.eq(DataChangeListenConfigDO::getTableName, tableName).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());        DataChangeListenConfigDO listenConfigDO = dataChangeListenConfigMapper.selectOne(queryWrapper);        return Objects.isNull(listenConfigDO) ? Optional.empty() : Optional.of(listenConfigDO);    }        //获取监听变更字段配置表信息    public Optional<List<DataChangeColumnConfigDO>> getColumnConfigByListenIdFromDB(Long id) {        LambdaQueryWrapper<DataChangeColumnConfigDO> queryWrapper = Wrappers.lambdaQuery();        queryWrapper.eq(DataChangeColumnConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());        List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeColumnConfigMapper.selectList(queryWrapper);        return CollectionUtils.isEmpty(columnConfigDOS) ? Optional.empty() : Optional.of(columnConfigDOS);    }        //查询数据变更对象列表    public Optional<List<DataChangeMessageConfigDO>> getMessageConfigByListenIdFromDB(Long id) {        LambdaQueryWrapper<DataChangeMessageConfigDO> queryWrapper = Wrappers.lambdaQuery();        queryWrapper.eq(DataChangeMessageConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());        List<DataChangeMessageConfigDO> messageConfigDOS = dataChangeMessageConfigMapper.selectList(queryWrapper);        return CollectionUtils.isEmpty(messageConfigDOS) ? Optional.empty() : Optional.of(messageConfigDOS);    }    ...}
复制代码


二.缓存组件 RedisReadWriteManager 的实现


//缓存读写管理@Servicepublic class RedisReadWriteManager {    @Resource    private RedisCache redisCache;        @Resource    private RedisLock redisLock;        //批量获取缓存数据    //@param key                 关键字列表    //@param clazz               需要将缓存JSON转换的对象    //@param getRedisKeyFunction 获取Redis key的方法    //@param getDbFunction       获取数据源对象的方法    //@return java.util.Optional<java.util.List<T>>    public <T> Optional<List<T>> listRedisStringDataByCache(Long key, Class<T> clazz, Function<Long, String> getRedisKeyFunction, Function<Long, Optional<List<T>>> getDbFunction) {        try {            String redisKey = getRedisKeyFunction.apply(key);            //过滤无效缓存            String cache = redisCache.get(redisKey);            if (EMPTY_OBJECT_STRING.equals(cache)) {                return Optional.empty();            }            if (StringUtils.isNotBlank(cache)) {                List<T> list = JSON.parseArray(cache, clazz);                return Optional.of(list);            }            //缓存没有则读库            return listRedisStringDataByDb(key, getRedisKeyFunction, getDbFunction);        } catch (Exception e) {            log.error("获取缓存数据异常 key={},clazz={}", key, clazz, e);            throw e;        }    }        //读取数据库表数据赋值到Redis    public <T> Optional<List<T>> listRedisStringDataByDb(Long key, Function<Long, String> getRedisKeyFunction, Function<Long, Optional<List<T>>> getDbFunction) {        if (Objects.isNull(key) || Objects.isNull(getDbFunction)) {            return Optional.empty();        }        try {            if (!redisLock.lock(String.valueOf(key))) {                return Optional.empty();            }            String redisKey = getRedisKeyFunction.apply(key);            Optional<List<T>> optional = getDbFunction.apply(key);            putCacheString(redisKey, optional);            return optional;        } finally {            redisLock.unlock(String.valueOf(key));        }    }        private void putCacheString(String redisKey, Optional optional) {        if (!optional.isPresent()) {            //把空对象暂存到Redis            redisCache.setex(redisKey, EMPTY_OBJECT_STRING, RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_ONE_DAY, TimeUnit.HOURS, NUMBER_24));            log.warn("发生缓存穿透 redisKey={}", redisKey);            return;        }        //把表数据对象存到Redis        redisCache.setex(redisKey, JSON.toJSONString(optional.get()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));        log.info("表数据对象存到Redis redisKey={}, data={}", redisKey, JSON.toJSONString(optional.get()));    }        //批量获取缓存数据    //@param key                 关键字列表    //@param clazz               需要将缓存JSON转换的对象    //@param getRedisKeyFunction 获取redis key的方法    //@param getDbFunction       获取数据源对象的方法    //@return java.util.Optional<java.util.List < T>>    public <T> Optional<T> getRedisStringDataByCache(String key, Class<T> clazz, Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {        try {            String redisKey = getRedisKeyFunction.apply(key);            String cache = redisCache.get(redisKey);            //过滤无效缓存            if (EMPTY_OBJECT_STRING.equals(cache)) {                return Optional.empty();            }            if (StringUtils.isNotBlank(cache)) {                T t = JSON.parseObject(cache, clazz);                return Optional.of(t);            }            //缓存没有则读库            return getRedisStringDataByDb(key, getRedisKeyFunction, getDbFunction);        } catch (Exception e) {            log.error("获取缓存数据异常 key={},clazz={}", key, clazz, e);            throw e;        }    }        //读取数据库表数据赋值到Redis    public <T> Optional<T> getRedisStringDataByDb(String key, Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {        if (StringUtils.isBlank(key) || Objects.isNull(getDbFunction)) {            return Optional.empty();        }        try {            if (!redisLock.lock(key)) {                return Optional.empty();            }            String redisKey = getRedisKeyFunction.apply(key);            Optional<T> optional = getDbFunction.apply(key);            putCacheString(redisKey, optional);            return optional;        } finally {            redisLock.unlock(key);        }    }}
@Componentpublic class RedisCache { private RedisTemplate redisTemplate; public RedisCache(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } ... //缓存获取 public String get(String key) { ValueOperations<String, String> vo = redisTemplate.opsForValue(); return vo.get(key); } //缓存存储并设置过期时间 public void setex(String key, String value, long time) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } ...}
复制代码


(5)将 binlog 对象封装为数据变更对象


@Servicepublic class MessageServiceImpl implements MessageService {    ...    //处理binlog消息    @Override    public void processBinlogMessage(BinlogData binlogData) {        //1.通过缓存组件获取当前表的配置监听信息        DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());        //未配置监听信息的表,不作处理        if (Objects.isNull(listenConfigDO)) {            return;        }                //2.将binlog对象封装为数据变更对象        List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);        //不需要监听,或者要监听的字段值未变动        if (CollectionUtils.isEmpty(dataChangeMessages)) {            return;        }                //3.通过缓存组件获取配置的消息模型对象        List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());        //不需要发送消息        if (CollectionUtils.isEmpty(messageConfigBOS)) {            return;        }                //4.组装需要发送的数据变更消息对象        List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);        //待发送的消息为空,无需处理        if (CollectionUtils.isEmpty(sendDataMessageList)) {            return;        }                //5.发送内部消息        sendDataMessage(sendDataMessageList);                //6.如果存在外部消息配置则保存数据变更消息对象详情        if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {            //保存外部消息详细信息            saveDataMessageDetail(dataChangeMessages, binlogData);        }    }        //将binlog对象封装为数据变更对象    public List<DataChangeMessage> getDataChangeMessage(BinlogData binlogData, DataChangeListenConfigDO listenConfigDO) {        //获取监听变更字段配置表信息        List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeRepository.getColumnConfigByListenId(listenConfigDO.getId());        //要监听的字段为空,不作处理        if (CollectionUtils.isEmpty(columnConfigDOS)) {            return null;        }        //封装数据变更对象        return buildChangeColumn(binlogData, columnConfigDOS, listenConfigDO);    }        //封装数据变更对象    private List<DataChangeMessage> buildChangeColumn(BinlogData binlogData, List<DataChangeColumnConfigDO> columnConfigDOS, DataChangeListenConfigDO listenConfigDO) {        List<DataChangeMessage> dataChangeMessages = new ArrayList<>();        //操作类型        String operateType = binlogData.getOperateType();        for (int i = 0; i < binlogData.getDataMap().size(); i++) {            Map<String, Object> data = binlogData.getDataMap().get(i);//新值            if (BinlogType.INSERT.getValue().equals(operateType) || BinlogType.DELETE.getValue().equals(operateType)) {                //如果是新增或者删除,则所有监听字段都变更                List<String> updateColumns = columnConfigDOS.stream().map(DataChangeColumnConfigDO::getListenColumn).collect(Collectors.toList());                DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));                dataChangeMessages.add(dataChangeMessage);            } else {                Map<String, Object> old = binlogData.getOldMap().get(i);//旧值                List<String> updateColumns = new ArrayList<>();                for (DataChangeColumnConfigDO columnConfigDO : columnConfigDOS) {                    String column = columnConfigDO.getListenColumn();                    Object columnOldValue = old.get(column);                    //旧的字段值有数据,就表示该字段变更了,添加至修改的字段集合                    if (!Objects.isNull(columnOldValue)) {                        updateColumns.add(column);                    }                }                //监听的字段有数据变更                if (!CollectionUtils.isEmpty(updateColumns)) {                    DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));                    dataChangeMessages.add(dataChangeMessage);                }            }        }        return dataChangeMessages;    }        //构建数据变更对象    private DataChangeMessage buildDataChangeMessage(BinlogData binlogData, List<String> updateColumns, Object keyId) {        DataChangeMessage dataChangeMessage = new DataChangeMessage(binlogData.getOperateType(), binlogData.getTableName(), updateColumns);        dataChangeMessage.setMessageNo(SnowflakeIdWorker.getCode());//雪花算法设置消息编号        dataChangeMessage.setKeyId(keyId);        return dataChangeMessage;    }    ...}
//数据变更对象@Datapublic class DataChangeMessage implements Serializable { //内部消息编号 private String messageNo; //操作行为,INSERT、UPDATE、DELETE private String action; //表名 private String tableName; //主键或业务id private Object keyId; //变更的列 private List<String> updateColumns; //唯一确定当前数据的字段以及字段值 private List<ColumnValue> columnValues; //消息处理成功之后的回调topic private String callbackTopic = RocketMqConstant.DATA_EXTERNAL_CHANGE_TOPIC; @Data @NoArgsConstructor @AllArgsConstructor public static class ColumnValue { private String column;//列 private Object value;//值 } public DataChangeMessage(String operateType, String tableName, List<String> updateColumns) { this.action = operateType; this.tableName = tableName; this.updateColumns = updateColumns; }}
复制代码


(6)根据数据变更对象组装成内部消息对象并发送


@Servicepublic class MessageServiceImpl implements MessageService {    ...    //组装需要发送的数据变更消息对象    public List<DataSendMessageBO> getInternalSendDataMessage(List<DataChangeMessage> dataChangeMessages, List<Map<String, Object>> dataMap, List<DataChangeMessageConfigBO> dataChangeMessageConfigBOS) {        List<DataSendMessageBO> dataSendMessageBOS = new ArrayList<>();        for (DataChangeMessageConfigBO messageConfigBO : dataChangeMessageConfigBOS) {            //不是内部消息的不处理            if (!MessageTypeEnum.INTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType())) {                continue;            }            String notifyColumn = messageConfigBO.getNotifyColumn();            String[] columns = notifyColumn.split(CoreConstant.COMMA);            for (int i = 0; i < dataChangeMessages.size(); i++) {                DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);                List<DataChangeMessage.ColumnValue> columnValues = new ArrayList<>();                dataChangeMessage.setColumnValues(columnValues);                Map<String, Object> data = dataMap.get(i);                for (String column : columns) {                    columnValues.add(new DataChangeMessage.ColumnValue(column, data.get(column)));                }                dataSendMessageBOS.add(new DataSendMessageBO(messageConfigBO, dataChangeMessage));            }        }        return dataSendMessageBOS;    }        //发送内部消息    private void sendDataMessage(List<DataSendMessageBO> sendDataMessageList) {        for (DataSendMessageBO dataChangeMessage : sendDataMessageList) {            DataChangeMessage dataMessage = dataChangeMessage.getDataChangeMessage();            DataChangeMessageConfigBO dataChangeMessageConfigBO = dataChangeMessage.getDataChangeMessageConfigBO();            //发送一个延迟队列的消息出去            dataMessageProducer.send(dataMessage, dataChangeMessageConfigBO.getMessageTopic(), dataChangeMessageConfigBO.getDelayLevel());        }    }    ...}
复制代码


(7)如果存在外部消息配置则保存数据变更对象详情


//消息业务实现类@Servicepublic class MessageServiceImpl implements MessageService {    @Resource    private DataChangeRepository dataChangeRepository;    ...        //处理binlog消息    @Override    public void processBinlogMessage(BinlogData binlogData) {        //获取当前表的监听信息        DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());        //未配置监听信息的表,不作处理        if (Objects.isNull(listenConfigDO)) {            return;        }          //获取数据变更对象列表,也就是将一条binlog数据转换成可能多个的数据变更对象        List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);        //不需要监听,或者要监听的字段值未变动        if (CollectionUtils.isEmpty(dataChangeMessages)) {            return;        }          //获取配置的消息对象        //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里        List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());        //不需要发送消息        if (CollectionUtils.isEmpty(messageConfigBOS)) {            return;        }          //封装成需要发送的消息对象        //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里        List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);        //待发送的消息为空,无需处理        if (CollectionUtils.isEmpty(sendDataMessageList)) {            return;        }          //发送消息        sendDataMessage(sendDataMessageList);          //配置的消息对象列表中,如果包含外部消息类型的消息对象,就需要保存        if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {            //保存外部消息详细信息            saveDataMessageDetail(dataChangeMessages, binlogData);        }    }        //保存消息详细信息    public void saveDataMessageDetail(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {        List<DataMessageBO> dataMessageBOS = converterDataMessageBOList(dataChangeMessages, binlogData);        dataChangeRepository.saveDataMessageDetail(dataMessageBOS);    }        //转换消息详细信息    private List<DataMessageBO> converterDataMessageBOList(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {        List<DataMessageBO> dataMessageBOS = new ArrayList<>(dataChangeMessages.size());        for (int i = 0; i < dataChangeMessages.size(); i++) {            DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);            DataMessageBO dataMessageBO = dataMessageConverter.converterBO(dataChangeMessage);            dataMessageBO.setDiffDataArr(String.join(CoreConstant.COMMA, dataChangeMessage.getUpdateColumns()));            dataMessageBO.setTableDataJson(JSON.toJSONString(binlogData.getDataMap().get(i)));            dataMessageBOS.add(dataMessageBO);        }        return dataMessageBOS;    }    ...}
@Repositorypublic class DataChangeRepository { ... //存储外部消息的数据信息 public void saveDataMessageDetail(List<DataMessageBO> dataMessageBOS) { List<DataMessageDetailDO> dataMessageDetailDOS = dataMessageConverter.converterDOList(dataMessageBOS); int count = dataMessageDetailMapper.insertBatch(dataMessageDetailDOS); if (count <= 0) { throw new BaseBizException(CommonErrorCodeEnum.SQL_ERROR); } }}
//外部消息处理对象@Datapublic class DataMessageBO implements Serializable { //内部消息编号 private String messageNo; //变化的表信息内容 private String tableDataJson; //消息变化字段数组 private String diffDataArr; //表名 private String tableName; //操作类型 private String action;}
复制代码


(8)消息编号的监听处理 + 发送外部消息


整个内部消息和外部消息的流程如下:

 

说明一:商品中心系统的开发人员会对商品中心的表配置内部消息对象。

 

说明二:非商品中心系统的开发人员会对商品中心的表配置外部消息对象。

 

说明三:商品中心的表发生变更后,binlog 消息应该先被商品中心系统自己消费。

 

说明四:binlog 消息被商品中心系统自己消费后,再继续被非商品中心系统消费。

 

说明五:当商品消息处理系统消费 binlog 消息时,会先获取配置的内部消息对象。然后根据配置的信息,将 binlog 消息对应的变更数据发送到指定 topic。接着获取配置的外部消息对象,把这次发送的变更数据保存到数据库中。此过程中,发送到指定 topic 和保存到 DB 的变更数据会由消息编号来关联。

 

说明六:商品中心的系统消费完 binlog 消息对应的变更数据消息后,会将消息编号发送到 MQ 中以外部消息的形式由商品消息系统去消费处理。

 

说明七:当商品消息系统在消费外部消息时,首先会将消息编号提取出来,然后根据消息编号去数据库查询关联的 binlog 消息对应的变更数据。由于这些保存的变更数据已经配置好完整的外部消息,包括发到那些 topic。所以接着可以把变更数据发送到非商品中心系统的开发人员配置的 topic,从而实现非商品中心系统对商品中心的表的 binlog 变化的监听,而且严格保证了 binlog 消息的消费顺序:先商品中心->再非商品中心。发送消息后,最后便会把消息编号相关的变更消息数据从数据库中删除。


@Configurationpublic class ConsumerBeanConfig {    //配置内容对象    @Autowired    private RocketMQProperties rocketMQProperties;        //消费系统外部消息——处理业务消息    @Bean("dataExternalChangeTopic")    public DefaultMQPushConsumer dataExternalChangeTopic(DataExternalChangeListener dataExternalChangeListener) throws MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.DATA_EXTERNAL_CHANGE_CONSUMER_GROUP);        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());        consumer.subscribe(RocketMqConstant.DATA_EXTERNAL_CHANGE_TOPIC, "*");        consumer.registerMessageListener(dataExternalChangeListener);        consumer.start();        return consumer;    }}
@Componentpublic class DataExternalChangeListener implements MessageListenerConcurrently { @Autowired private MessageService messageService; @Autowired private DataMessageProducer dataMessageProducer; @Autowired private RedisLock redisLock; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { ... try { DataMessageBO dataMessageDetail = messageService.getDataMessageDetail(messageNo); //未命中到外部消息的数据,默认不处理 if (Objects.isNull(dataMessageDetail)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //获取外部消息的报文对象 List<DataSendMessageBO> sendDataMessageList = messageService.getSendDataMessage(dataMessageDetail); if (CollectionUtils.isEmpty(sendDataMessageList)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //发送外部消息 sendDataMessage(sendDataMessageList); //删除这条记录 messageService.deleteMessage(dataMessageDetail); } catch (Exception e) { log.error("consume error, 消费外部数据变更消息失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } finally { redisLock.unlock(messageNo); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //发送外部的消息 private void sendDataMessage(List<DataSendMessageBO> sendDataMessageList) { for (DataSendMessageBO dataChangeMessage : sendDataMessageList) { DataChangeMessage dataMessage = dataChangeMessage.getDataChangeMessage(); DataChangeMessageConfigBO dataChangeMessageConfigBO = dataChangeMessage.getDataChangeMessageConfigBO(); //发送一个延迟队列的消息出去 dataMessageProducer.send(dataMessage, dataChangeMessageConfigBO.getMessageTopic(), dataChangeMessageConfigBO.getDelayLevel()); } }}
@Servicepublic class MessageServiceImpl implements MessageService { ... //获取对应的外部消息对象信息 @Override public DataMessageBO getDataMessageDetail(String messageNo) { DataMessageDetailDO dataMessageDetail = dataChangeRepository.getDataMessageDetail(messageNo); return dataMessageConverter.converterBO(dataMessageDetail); } //获取需要发送的消息对象 @Override public List<DataSendMessageBO> getSendDataMessage(DataMessageBO messageBO) { List<DataSendMessageBO> dataChangeMessageConfig = dataChangeRepository.getDataChangeMessageConfig(messageBO); return dataChangeMessageConfig; } ...}
@Repositorypublic class DataChangeRepository { ... //获取某个消息对应的外部消息对象 public DataMessageDetailDO getDataMessageDetail(String messageNo) { LambdaQueryWrapper<DataMessageDetailDO> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.eq(DataMessageDetailDO::getMessageNo, messageNo); return dataMessageDetailMapper.selectOne(queryWrapper); } //获取需要发送的外部消息对象 public List<DataSendMessageBO> getDataChangeMessageConfig(DataMessageBO dataMessageBO) { List<DataSendMessageBO> dataChangeMessageList = new ArrayList<>(); DataChangeListenConfigDO dataChangeListenConfig = getDataChangeListenConfig(dataMessageBO.getTableName()); if (!Objects.isNull(dataChangeListenConfig)) { //获取配置的需要发送的外部消息信息 List<DataChangeMessageConfigDO> dataChangeMessageConfigList = getMessageConfigByListenId(dataChangeListenConfig.getId(), MessageTypeEnum.EXTERNAL_MESSAGE); if (!CollectionUtils.isEmpty(dataChangeMessageConfigList)) { DataSendMessageBO dataSendMessageBO = new DataSendMessageBO(); JSONObject tableDataJson = JSONObject.parseObject(dataMessageBO.getTableDataJson()); List<String> updateColumns = converterList(dataMessageBO.getDiffDataArr()); for (DataChangeMessageConfigDO messageConfigDO : dataChangeMessageConfigList) { DataChangeMessage dataChangeMessage = dataMessageConverter.converter(dataMessageBO); dataChangeMessage.setUpdateColumns(updateColumns); //获取得到需要发送的字段信息 String[] notifyColumnArr = messageConfigDO.getNotifyColumn().split(CoreConstant.COMMA); List<DataChangeMessage.ColumnValue> columnValueList = new ArrayList<>(); for (String notifyColumn : notifyColumnArr) { columnValueList.add(new DataChangeMessage.ColumnValue(notifyColumn, tableDataJson.getString(notifyColumn))); } dataChangeMessage.setColumnValues(columnValueList); dataSendMessageBO.setDataChangeMessage(dataChangeMessage); dataSendMessageBO.setDataChangeMessageConfigBO(dataMessageConverter.converterBO(messageConfigDO)); dataChangeMessageList.add(dataSendMessageBO); } } } return dataChangeMessageList; } //获取对应的表配置信息 public DataChangeListenConfigDO getDataChangeListenConfig(String tableName) { LambdaQueryWrapper<DataChangeListenConfigDO> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.eq(DataChangeListenConfigDO::getTableName, tableName); queryWrapper.eq(DataChangeListenConfigDO::getFilterFlag, DelFlagEnum.EFFECTIVE.getCode()); return dataChangeListenConfigMapper.selectOne(queryWrapper); } //获取监听表消息模型配置 public List<DataChangeMessageConfigDO> getMessageConfigByListenId(Long id, MessageTypeEnum messageTypeEnum) { //获取监听表消息模型配置 Optional<List<DataChangeMessageConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache( id, DataChangeMessageConfigDO.class, AbstractRedisKeyConstants::getMessageConfigStringKey, this::getMessageConfigByListenIdFromDB ); //如果未指定是内部消息还是外部消息,则不需要过滤 if (Objects.isNull(messageTypeEnum)) { return optional.orElse(null); } return optional.map(dataChangeMessageConfigDOS -> dataChangeMessageConfigDOS.stream() .filter(messageConfigBO -> messageTypeEnum.getCode().equals(messageConfigBO.getMessageType())) .collect(Collectors.toList())).orElse(null); } ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18926438

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
商品中心—商品消息处理系统的技术文档_架构_量贩潮汐·WholesaleTide_InfoQ写作社区