写点什么

RocketMQ 实战—营销系统代码初版

作者:EquatorCoco
  • 2025-02-11
    福建
  • 本文字数:35128 字

    阅读完需:约 115 分钟

接下来实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送

 

1.基于条件和画像筛选用户的业务分析和实现


一般进行 Push 时,都是对那些活跃度比较低的用户进行推送活动消息,以激发他们登录 APP 来查看商品进行购物。会员系统需要提供给推送系统、运营系统如下需要支持分页的 RPC 接口。


@DubboService(version = "1.0.0", interfaceClass = AccountApi.class, retries = 0)public class AccountApiImpl implements AccountApi{    @Autowired    private MembershipAccountService membershipAccountService;        @Override    public JsonResult<List<MembershipAccountDTO>> listAccount() {        try {            // RPC接口返回的数据就是DTO            List<MembershipAccountDTO> accountDTOS = membershipAccountService.listAll();            return JsonResult.buildSuccess(accountDTOS);        } catch (BaseBizException e) {            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());        } catch (Exception e) {            return JsonResult.buildError(e.getMessage());        }    }
@Override public JsonResult<List<MembershipAccountDTO>> listAccountByConditions(MembershipFilterDTO membershipFilterDTO) { try { List<MembershipAccountDTO> dtos = membershipAccountService.listAccountByConditions(membershipFilterDTO); return JsonResult.buildSuccess(dtos); } catch (BaseBizException e) { return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { return JsonResult.buildError(e.getMessage()); } }}
@DubboService(version = "1.0.0", interfaceClass = MembershipPointApi.class, retries = 0)public class MembershipPointApiImpl implements MembershipPointApi { @Autowired private MembershipPointService membershipPointService; @Override public JsonResult<List<MembershipPointDTO>> listMembershipPointByConditions(MembershipFilterConditionDTO conditionDTO) { try { List<MembershipPointDTO> dtos = membershipPointService.listMembershipPointByConditions(conditionDTO); return JsonResult.buildSuccess(dtos); } catch (BaseBizException e) { return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { return JsonResult.buildError(e.getMessage()); } }}
复制代码


(1)获取全部⽤户信息接⼝


com.demo.eshop.membership.api.impl.AccountApiImpl#listAccount()
复制代码


用于获取所有⽤户数据信息,如果⽤户量超⼤,⼀般采取分⻚ + 多次调⽤的⽅式,以便当推送系统、运营系统发起全员活动和推送任务时可以查询全部账号。如果活动推送选择全部用户,此时这个接口就会被调用,需要提供分页支持实现一批一批的查询来推送。

 

从数据库查出来的是 DO 数据对象,DO 对象和数据库表是一一对应的,在 DAO 层使用的。在 Service 层,会将 DO 对象转换为 DTO 对象,也就是数据传输对象,比如 RPC 调用返回的数据就是 DTO。


@Servicepublic class MembershipAccountServiceImpl implements MembershipAccountService {    @Autowired    private MembershipAccountDAO membershipAccountDAO;        @Resource    private MembershipAccountConverter membershipAccountConverter;        @Autowired    private MembershipFilterDAO membershipFilterDAO;
//查询所有用户信息 @Override public List<MembershipAccountDTO> listAll() { //通过mybatis-plus框架自动生成的sql语句,select * from xx //从数据库查出来的是DO数据对象,DO对象和数据库表是一一对应的,在DAO层使用的 //在Service层,会将DO对象转换为DTO对象,也就是数据传输对象,比如RPC调用返回的数据就是DTO return membershipAccountConverter.listEntityToDTO(membershipAccountDAO.list()); } ...}
复制代码


(2)根据条件获取⽤户信息接⼝


com.demo.eshop.membership.api.impl.AccountApiImpl#listAccountByConditions()
复制代码


用于按照查询条件查询出用户数据信息,如果⽤户量超⼤⼀般采取的是分⻚⽅式、多次调⽤获取的⽅式。主要参数如下:


有些活动是针对不活跃的用户来推送的,以激发他们的活跃度。有些活动是针对活跃忠实用户来推送的,保持住他们对 APP 的忠诚度和粘度。


@Servicepublic class MembershipAccountServiceImpl implements MembershipAccountService {    @Autowired    private MembershipAccountDAO membershipAccountDAO;        @Resource    private MembershipAccountConverter membershipAccountConverter;        @Autowired    private MembershipFilterDAO membershipFilterDAO;        ...    //根据条件查询用户    @Override    public List<MembershipAccountDTO> listAccountByConditions(MembershipFilterDTO membershipFilterDTO) {        //构造出查询条件        LambdaQueryWrapper<MembershipFilterDO> queryWrapper = buildQueryWrapper(membershipFilterDTO);        //查询符合条件的用户,自动生成sql,select xx from xx where xx=xx and xx=xx        List<MembershipFilterDO> membershipFilterDOs = membershipFilterDAO.list(queryWrapper);
//从筛选出的记录中依次拿到accountId,去查询用户数据 return membershipFilterDOs.stream().map(membershipFilterDO -> { Long accountId = membershipFilterDO.getAccountId(); return membershipAccountConverter.entityToDTO(membershipAccountDAO.getById(accountId)); }).collect(Collectors.toList()); }
//构造查询条件 private LambdaQueryWrapper<MembershipFilterDO> buildQueryWrapper(MembershipFilterDTO membershipFilterDTO) { //mybatis-plus这种框架做的一些封装 LambdaQueryWrapper<MembershipFilterDO> queryWrapper = Wrappers.lambdaQuery(); return queryWrapper .and(Objects.nonNull(membershipFilterDTO.getAccountType()), wrapper -> wrapper.eq(MembershipFilterDO::getAccountType, membershipFilterDTO.getAccountType())) .and(Objects.nonNull(membershipFilterDTO.getActiveCount()), wrapper -> wrapper.ge(MembershipFilterDO::getActiveCount, membershipFilterDTO.getActiveCount())) .and(Objects.nonNull(membershipFilterDTO.getMembershipLevel()), wrapper -> wrapper.ge(MembershipFilterDO::getMembershipLevel, membershipFilterDTO.getMembershipLevel())) .and(Objects.nonNull(membershipFilterDTO.getTotalActiveCount()), wrapper -> wrapper.ge(MembershipFilterDO::getTotalActiveCount, membershipFilterDTO.getTotalActiveCount())) .and(Objects.nonNull(membershipFilterDTO.getTotalActiveCount()), wrapper -> wrapper.ge(MembershipFilterDO::getTotalAmount, membershipFilterDTO.getTotalAmount())); }}
复制代码


(3)查询画像匹配的用户接口


用户画像,就是每个用户都可以给他一些标签,比如王牌会员、吃货、男性、收入中等、喜欢看历史故事。根据用户在这个 APP 里的各种行为,浏览、购物、评论、活跃、登录、金额,通过一定的算法给用户去打上一些标签。这样该用户就有了一个在 APP 里的用户画像,user profile 就是对一个用户的描述。当每个用户都有一个自己的用户画像后,在进行活动推送时,可以直接根据用户画像的标签来筛选用户。

 

针对某一类用户来进行活动推送,比如全场运动服饰要搞一个 5 折的清凉夏日活动。此时需要查找在用户画像标签里,对运动服饰浏览过/购买过/感兴趣的用户,进行专门推送专项活动。或者推测什么样的用户可能会喜欢运动类服饰,对用户画像里的多个标签进行组合,比如 18~35 之间、男性/女性这种群体。


@Servicepublic class MembershipPointServiceImpl implements MembershipPointService {    @Autowired    private MembershipPointDAO membershipPointDAO;        @Resource    private MembershipPointConverter membershipPointConverter;        @Override    public List<MembershipPointDTO> listMembershipPointByConditions(MembershipFilterConditionDTO conditionDTO) {        MembershipPointDO membershipPointDO = membershipPointConverter.dtoToEntity(conditionDTO);        QueryWrapper<MembershipPointDO> wrapper = new QueryWrapper<>();        wrapper.setEntity(membershipPointDO);        return membershipPointConverter.listEntityToDTO(membershipPointDAO.list(wrapper));    }}
@Data@Builderpublic class MembershipFilterConditionDTO implements Serializable { //这里的用户画像标签就两个维度:会员等级和会员积分 private Integer memberLevel;//会员等级 -> 用户画像标签 private Long memberPoint;//会员积分 -> 用户画像标签}
复制代码


2.全量用户促销活动数据模型分析以及创建操作


假设运营人员需要创建一个促销活动:针对所有用户全场打 8 折,那么该促销活动会针对所有用户进行 Push 推送。

 

(1)首先需要提供一个创建促销活动的 HTTP 接口


运营人员对促销活动的创建通常是在 Web 界面里进行的,通过发送 HTTP 请求到 Controller 来处理,这个请求不是服务之间调用的 RPC 请求。

 

运营⼈员在维护⼀个促销活动时,需要通知所有⽤户这个活动的活动时间和活动规则,所以该 HTTP 接口的接收参数如下:



该 HTTP 接口的返回值如下:



用于处理 HTTP 请求的 Controller 接口如下:


com.demo.eshop.promotion.controlller.PromotionController#saveOrUpdatePromotion
复制代码


@RestController@RequestMapping("/demo/promotion")public class PromotionController {    @Autowired    private PromotionService promotionService;        //新增一个促销活动    @PostMapping    public JsonResult<SaveOrUpdatePromotionDTO> saveOrUpdatePromotion(@RequestBody SaveOrUpdatePromotionRequest request){        try {            SaveOrUpdatePromotionDTO saveOrUpdatePromotionDTO = promotionService.saveOrUpdatePromotion(request);            return JsonResult.buildSuccess(saveOrUpdatePromotionDTO);        } catch (BaseBizException e) {            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());        } catch (Exception e) {            return JsonResult.buildError(e.getMessage());        }    }}
复制代码


通过 HTTP 请求发送过来的请求对象如下:


//创建或更新促销活动//这个对象里的数据,就是运营人员使用营销系统(运营管理后台)在Web界面创建促销活动时,录入的信息@Data@Builderpublic class SaveOrUpdatePromotionRequest implements Serializable {    //促销活动名称    private String name;    //活动开始时间    private Date startTime;    //活动结束时间    private Date endTime;    //促销活动说明备注    private String remark;    //活动状态:1启用,2停用    private Integer status;    //活动类型:1满减,2折扣,3优惠券,4会员积分    private Integer type;    //活动规则    private PromotionRulesValue rule;        //活动规则    @Data    @Builder    public static class PromotionRulesValue implements Serializable {        //规则名,1:满减,2:折扣,3:优惠券,4:会员积分        private String key;        //规则值,其中key为条件,value为活动规则值        //例如:        //满减规则,key=200,value=30,则代表满200减30        //折扣规则,key=200,value=0.95,则代表满200打0.95折        //优惠券规则,key=200,value=10,则代表满200送一张10元优惠券        //会员积分规则,key=200,value=100,则代表满200,额外送100积分        private Map<String,String> value;    }        //活动创建/修改人    private Integer createUser;}
复制代码


(2)然后提供一个创建促销活动的服务接口


该接口不仅需要将促销活动实体对象入库,还需要发送 MQ 信息为所有用户推送促销信息。当营销系统有了一个促销活动后,而且该促销活动的状态还是启用的,那么就要立即触发对用户的推送。也就是说,创建一个促销活动就推送给所有的用户。


@Servicepublic class PromotionServiceImpl implements PromotionService {    //开启促销活动DAO    @Autowired    private SalesPromotionDAO salesPromotionDAO;        //RocketMQ生产者    @Resource    private DefaultProducer defaultProducer;        @Resource    private PromotionConverter promotionConverter;        //会员服务    @DubboReference(version = "1.0.0")    private AccountApi accountApi;       //新增或修改一个运营活动    @Transactional(rollbackFor = Exception.class)    @Override    public SaveOrUpdatePromotionDTO saveOrUpdatePromotion(SaveOrUpdatePromotionRequest request) {        //活动规则        String rule = JsonUtil.object2Json(request.getRule());
//构造促销活动实体 SalesPromotionDO salesPromotionDO = promotionConverter.convertPromotionDO(request); salesPromotionDO.setRule(rule);
//促销活动实体落库 salesPromotionDAO.saveOrUpdatePromotion(salesPromotionDO);
//当营销系统有了一个促销活动后,而且该促销活动的状态还是启用的,那么就要立即触发对用户的Push推送 //也就是创建一个促销活动就Push推送给所有的用户 //为所有用户推送促销活动,发MQ sendPlatformPromotionMessage(salesPromotionDO);
//构造响应数据 SaveOrUpdatePromotionDTO dto = new SaveOrUpdatePromotionDTO(); dto.setName(request.getName()); dto.setType(request.getType()); dto.setRule(rule); dto.setCreateUser(request.getCreateUser()); dto.setSuccess(true);
return dto; }
//为所有用户发推送促销活动 private void sendPlatformPromotionMessage(SalesPromotionDO promotionDO) { //PlatformPromotionMessage表示的是发到MQ的电商平台促销活动消息 PlatformPromotionMessage message = PlatformPromotionMessage.builder() .promotionId(promotionDO.getId()) .promotionType(promotionDO.getType()) .mainMessage(promotionDO.getName()) .message("您已获得活动资格,打开APP进入活动页面") .informType(promotionDO.getType()) .build();
//获取所有账户信息,默认就是针对平台所有的用户进行促销活动的推送push JsonResult<List<MembershipAccountDTO>> jsonResult = accountApi.listAccount();
if (!jsonResult.getSuccess()) { throw new BaseBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); }
// 循环发送消息,这个地方后面可以优化成线程池,因为消息并不需要顺序性,全部发出去即可 List<MembershipAccountDTO> accounts = jsonResult.getData(); for (MembershipAccountDTO membershipAccountDTO : accounts) { //修改同一个message数据对象的userId,对同一个数据对象设置不同的userId,避免每个用户都有一个自己的message数据对象 message.setUserAccountId(membershipAccountDTO.getId()); String msgJson = JsonUtil.object2Json(message); //消息推送时,使用的就是RocketMQ的Producer defaultProducer.sendMessage(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, msgJson, "平台发放促销活动消息"); } }}
复制代码


写入到数据库的促销活动实体对象如下:


//优惠活动实体类DO@Data@TableName("sales_promotion")public class SalesPromotionDO {    //主键ID    @TableId(value = "id", type = IdType.AUTO)    private Integer id;    //促销活动名称    private String name;    //促销活动开始时间    private Date startTime;    //促销活动结束时间    private Date endTime;    //促销活动说明备注    private String remark;    //促销活动状态,1:启用;2:停用    private Integer status;    //促销活动类型,1满减,2折扣,3优惠券,4会员积分    private Integer type;    //促销活动规则,已经由Map转换为JSON字符串    private String rule;    private Integer createUser;    private Date createTime;    private Integer updateUser;    private Date updateTime;}
复制代码


发送到 MQ 的促销活动消息如下:


//平台促销活动消息,一个用户一条消息@Data@Builderpublic class PlatformPromotionMessage implements Serializable{    //活动id    private Integer promotionId;    //主题,消息标题    private String mainMessage;    //消息内容    private String message;    //活动类型    private Integer promotionType;    //消息类型,通知类型:立即通知,定时通知    private Integer informType;    //用户id,每条促销活动消息,都是针对一个用户去进行推送的    private Long userAccountId;        @Tolerate    public PlatformPromotionMessage() {
}}
复制代码


3.Producer 和 Consumer 的工程代码实现


(1)Producer 工程代码实现


一.RocketMQ 配置信息处理


RocketMQProperties 使用了 Spring 的注解 @ConfigurationProperties,该注解会把标记的这个类实例化成一个 Bean。RocketMQProperties 这个 Bean 主要会设置在 application.yml 自定义的一些配置数据。

 

那么会放哪些配置数据呢?其实会根据 @ConfigurationProperties 注解里指定的前缀 prefix,把前缀对应的配置数据从配置文件里加载出来,注入到 RocketMQProperties 这个 Bean 里。也就是会把 application.yml 里前缀为 rocketmq 下的 name-server 值,设置到 RocketMQProperties 的 nameServer 属性里。


@ConfigurationProperties(prefix = "rocketmq")public class RocketMQProperties {    private String nameServer;        public String getNameServer() {        return nameServer;    }        public void setNameServer(String nameServer) {        this.nameServer = nameServer;    }}
复制代码


二.自定义 MQ 的 Producer


DefaultProducer 是自定义的 MQ Producer,它是 Spring 的一个 Bean。对 Spring Bean 组件的一个方法添加 @Autowired 进行注解后,在 Spring 容器进行初始化时,Spring 容器会根据方法入参类型,把需要的 Bean 给注入进来。所以这里的 DefaultProducer 在初始化时需要的入参 RocketMQProperties,会在 Spring 容器初始化时给注入进来,而且 RocketMQProperties 也要是一个 Spring Bean 才能被注入到 DefaultProducer 中。

 

RocketMQ 配置数据 Bean——RocketMQProperties,会经历:加载 application.yml 里的配置 -> 配置值注入到 RocketMQProperties 这个 Bean 实例 -> RocketMQProperties 这个 Bean 实例纳入 Spring 容器管理。

 

而 Spring 容器在初始化 DefaultProducer 这个 Bean 实例时,看到需要注入 RocketMQ 配置数据 Bean—RocketMQProperties,便会进行注入。


@Componentpublic class DefaultProducer {    private final TransactionMQProducer producer;        @Autowired    public DefaultProducer(RocketMQProperties rocketMQProperties) {        //通过RocketMQ API构建一个producer对象实例,设置配置的nameServer地址        producer = new TransactionMQProducer(RocketMqConstant.PUSH_DEFAULT_PRODUCER_GROUP);        producer.setNamesrvAddr(rocketMQProperties.getNameServer());        start();    }        ...    //发送消息    public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) {        //在进行RocketMQ消息发送时,会把topic、message内容(bytes数组)封装成Message对象        Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));        try {            //是否进行延迟消息的发送            if (delayTimeLevel > 0) {                msg.setDelayTimeLevel(delayTimeLevel);            }            //基于RocketMQ Producer API发送消息            SendResult send = producer.send(msg);            if (SendStatus.SEND_OK == send.getSendStatus()) {                log.info("发送MQ消息成功, type:{}, message:{}", type, message);            } else {                throw new BaseBizException(send.getSendStatus().toString());            }        } catch (Exception e) {            log.error("发送MQ消息失败:", e);            throw new BaseBizException("消息发送失败");        }    }    ...}
复制代码


三.使用自定义的 MQ Producer 进行消息发送


@Servicepublic class PromotionServiceImpl implements PromotionService {    ...    //RocketMQ生产者    @Resource    private DefaultProducer defaultProducer;    ...        //为所有用户发推送促销活动    private void sendPlatformPromotionMessage(SalesPromotionDO promotionDO) {        ...        //消息推送时,使用的就是RocketMQ的Producer        defaultProducer.sendMessage(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, msgJson, "平台发放促销活动消息");        ...    }    ...}
复制代码


(2)Consumer 工程代码实现


当前的版本中,当营销系统创建一个促销活动后,会查询和遍历所有用户,给每个用户创建创建一条推送消息发送到 MQ 中。然后推送系统需要监听和消费这些推送到 MQ 的消息,发起真正的推送。

 

RocketMQ 消费者的工程代码实现步骤如下:


步骤一:首先准备一个 RocketMQ 配置数据的 Bean,即 RocketMQProperties

步骤二:然后准备一个 @Configuration 标记的 Bean,这个 Bean 被 Spring 接管时会将 RocketMQ 配置数据的 Bean 注入进去

步骤三:在 @Configuration 标记的 Bean 里,通过不同的被 @Bean 注解标记方法定义一个 RocketMQ 消费者 Bean

 

RocketMQ 的消费者工程实现,通常是在工程代码里加入一个 @Configuration 注解。ConsumerBeanConfig 类一旦加了这个 Spring 提供的注解,那么系统启动时,这个类就会被 Spring 容器接管(实例化)。ConsumerBeanConfig 类在被 Spring 容器接管的过程中,Spring 也会对被 @Autowired 标记的属性进行注入。比如 RocketMQProperties 作为 RocketMQ 配置数据的 Bean,加了 @Autowired 标记,就会先被 Spring 容器接管(实例化)。

 

ConsumerBeanConfig 里面有很多方法,每个方法都加了一个 @Bean 注解。这表示着 Spring 容器在执行这些方法时能拿到自定义的 Bean 实例,然后把实例纳入到 Spring 容器管理中去。所以,ConsumerBeanConfig 类里的每个方法和一个 @Bean 注解就定义了一个系统里的 RocketMQ 消费者 Bean。

 

Spring 在调用被 @Bean 标记的 platformPromotionSendConsumer()方法实例化一个 Bean 时,会把监听的 Bean 注入进去。


@Configurationpublic class ConsumerBeanConfig {    //配置内容对象    @Autowired    private RocketMQProperties rocketMQProperties;        //平台活动推送消息消费者,Spring在调用这个方法实例化一个Bean时,会把监听的Bean注入进去    @Bean("platformPromotionSendTopic")    public DefaultMQPushConsumer platformPromotionSendConsumer(PlatFormPromotionListener platFormPromotionListener) throws MQClientException {        //基于RocketMQ API创建Consumer消费者实例对象,设置nameServer地址,定义要监听的Topic,设置对消息进行消费监听的Listener        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_PROMOTION_SEND_CONSUMER_GROUP);        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());        consumer.subscribe(PLATFORM_PROMOTION_SEND_TOPIC, "*");        consumer.registerMessageListener(platFormPromotionListener);        return consumer;    }}
复制代码


其中 PlatFormPromotionListener 是自定义的 Spring Bean,它实现了 RocketMQ 消费监听接口。在 ConsumerBeanConfig 里自定义的一个名为 platformPromotionSendTopic 的 Bean,会针对指定的 Topic 消息通过 PlatFormPromotionListener Bean 实例来进行监听和处理,PlatFormPromotionListener 和名为 platformPromotionSendTopic 的 Bean 都会纳入到 Spring 容器管理中。消费者 platformPromotionSendTopic 获取到每一条消息都会交给 platFormPromotionListener 来处理。

 

4.基于抽象工厂模式的消息推送实现


消费者 platformPromotionSendTopic 获取到每一条消息都会交给 platFormPromotionListener 处理。

 

根据不同通知类型的消息执行不同的推送,这里运用了抽象工厂设计模式,对于每一种渠道的消息推送,都设计了特定渠道消息推送的组件,并由特定渠道消息推送组件的工厂来进行创建。FactoryProducer 作为工厂生产者,会根据传入的一些参数来选择返回对应的工厂。获取到具体的工厂后,使用具体的工厂创建出工厂对应的消息发送组件,然后再使用这个组件去进行特定渠道的消息推送。

 

其中在处理消息时,首先会把 MQ 的促销推送消息模型转换为通用平台推送消息模型,然后把通用平台推送消息模型,通过具体消息推送工厂转换为具体的消息推送模型(短信、邮件、APP)。


@Componentpublic class PlatFormPromotionListener implements MessageListenerConcurrently {    //消息推送工厂提供者,负责对接第三方Push平台,把消息真正地推送到短信、邮箱、APP里去    @Autowired    private FactoryProducer factoryProducer;
//并发消费消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) { try { List<PlatformPromotionMessage> list = new ArrayList<>(); for (MessageExt messageExt : msgList) { log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody()); String msg = new String(messageExt.getBody()); PlatformPromotionMessage platformPromotionMessage = JSON.parseObject(msg , PlatformPromotionMessage.class); //TODO 增加幂等逻辑防止重复消费 list.add(platformPromotionMessage); } //推送通知 inform(list); } catch (Exception e){ log.error("consume error,平台优惠券消费失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
//根据不同通知类型的消息执行不同的推送 private void inform(List<PlatformPromotionMessage> list) { for (PlatformPromotionMessage message : list) { //获取消息服务工厂,根据消息推送渠道,抽象工厂选择了具体的工厂 MessageSendServiceFactory messageSendServiceFactory = factoryProducer.getMessageSendServiceFactory(message.getInformType());
//消息发送服务组件 MessageSendService messageSendService = messageSendServiceFactory.createMessageSendService();
//构造消息:把具体的促销推送消息模型 转换为 通用平台推送消息模型 PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder() .informType(message.getInformType()) .mainMessage(message.getMainMessage()) .userAccountId(message.getUserAccountId()) .message(message.getMessage()) .build();
//把通用平台推送消息模型,通过具体消息推送工厂 转换为 具体的消息推送模型(短信、邮件、APP) MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);
//具体的消息推送组件,推送你的具体消息模型 messageSendService.send(messageSendDTO);
log.info("消息推送完成,messageSendDTO:{}", messageSendDTO); } } ...}
@Componentpublic class FactoryProducer { //短信发送服务工厂 @Autowired private SmsSendServiceFactory smsSendServiceFactory; //app消息通知发送服务工厂 @Autowired private AppSendServiceFactory appSendServiceFactory;
//email消息通知发送服务工厂 @Autowired private EmailSendServiceFactory emailSendServiceFactory;
//根据消息类型,获取消息发送服务工厂 public MessageSendServiceFactory getMessageSendServiceFactory(Integer informType) { if (Objects.isNull(informType)) { throw new BaseBizException("参数异常"); }
if (InformTypeEnum.SMS.getCode().equals(informType)) { return smsSendServiceFactory; } else if (InformTypeEnum.APP.getCode().equals(informType)) { return appSendServiceFactory; } else if (InformTypeEnum.EMAIL.getCode().equals(informType)) { return emailSendServiceFactory; } throw new BaseBizException("参数异常"); }}
复制代码


抽象工厂方法相关类如下:


//消息推送服务工厂public interface MessageSendServiceFactory {    //创建消息推送服务组件    MessageSendService createMessageSendService();        //创建消息推送DTO,不同消息类型,可以构建不同的消息推送DTO    MessageSendDTO createMessageSendDTO(PlatformMessagePushMessage platformMessagePushMessage);}
@Componentpublic class SmsSendServiceFactory implements MessageSendServiceFactory { @Autowired private SmsSendServiceImpl smsSendServiceImpl;
@Override public MessageSendService createMessageSendService() { return smsSendServiceImpl; }
@Override public SmsMessageSendDTO createMessageSendDTO(PlatformMessagePushMessage platformMessagePushMessage) { SmsMessageSendDTO messageSendDTO = new SmsMessageSendDTO(); messageSendDTO.setMainMessage(platformMessagePushMessage.getMainMessage()); messageSendDTO.setMessage(platformMessagePushMessage.getMessage()); messageSendDTO.setInformType(platformMessagePushMessage.getInformType()); messageSendDTO.setUserAccountId(platformMessagePushMessage.getUserAccountId()); return messageSendDTO; }}
@Componentpublic class AppSendServiceFactory implements MessageSendServiceFactory { @Autowired private AppSendServiceImpl appSendServiceImpl;
@Override public MessageSendService createMessageSendService() { return appSendServiceImpl; }
@Override public MessageSendDTO createMessageSendDTO(PlatformMessagePushMessage platformMessagePushMessage) { AppMessageSendDTO messageSendDTO = new AppMessageSendDTO(); messageSendDTO.setMainMessage(platformMessagePushMessage.getMainMessage()); messageSendDTO.setMessage(platformMessagePushMessage.getMessage()); messageSendDTO.setInformType(platformMessagePushMessage.getInformType()); messageSendDTO.setUserAccountId(platformMessagePushMessage.getUserAccountId()); return messageSendDTO; }}
@Componentpublic class EmailSendServiceFactory implements MessageSendServiceFactory { @Autowired private EmailSendServiceImpl emailSendServiceImpl;
@Override public MessageSendService createMessageSendService() { return emailSendServiceImpl; }
@Override public EmailMessageSendDTO createMessageSendDTO(PlatformMessagePushMessage platformMessagePushMessage) { EmailMessageSendDTO messageSendDTO = new EmailMessageSendDTO(); messageSendDTO.setMainMessage(platformMessagePushMessage.getMainMessage()); messageSendDTO.setMessage(platformMessagePushMessage.getMessage()); messageSendDTO.setInformType(platformMessagePushMessage.getInformType()); messageSendDTO.setUserAccountId(platformMessagePushMessage.getUserAccountId()); return messageSendDTO; }}
//平台消息发送消息,PlatformMessagePushMessage是泛化、通用的消息模型,把促销活动推送消息转换为通用消息模型,保留通用的字段@Data@Builderpublic class PlatformMessagePushMessage implements Serializable { //主题 private String mainMessage; //消息内容 private String message; //消息类型 private Integer informType; //用户id private Long userAccountId; @Tolerate public PlatformMessagePushMessage() {
}}

@Datapublic class MessageSendDTO { //主题 private String mainMessage; //消息内容 private String message; //消息类型 private Integer informType; //用户id private Long userAccountId;}
复制代码


5.全量用户促销活动消息推送的流程和缺陷分析


(1)完整流程分析



(2)当前版本的缺陷分析


缺陷一:运营人员在后台界面创建促销活动时,点击提交按钮后,就直接生成所有用户推送消息发到 MQ。如果从数据库查出来的用户数量很大,不仅获取大量的用户数据耗时,而且这些数据也会大量消耗 JVM 内存。

 

缺陷二:由于对每个用户都创建出一个 JSON 字符串消息发送到 MQ,用户量一大也会对内存消耗巨大。

 

缺陷三:由于生成的每条消息都需要发送到 MQ,每条消息都要进行一次网络传输,又会导致大量的网络通信开销。这种写法,只能用于前期项目开发时,进行快速测试来跑通流程。

 

6.全量用户促销活动推送引入 MQ 进行削峰


引入 MQ 进行削峰的情形:

 

一.消息生产者和消息消费者的并发处理量不一样


营销系统可能可以做到每秒生产 1 万条消息并调用推送系统的接口。但推送系统拿到消息后,要通过 SDK 交给第三方推送平台处理,此时就不一定能每秒推送 1 万条消息给用户了。

 

二.存在明显的高峰和低谷


当运营人员在不创建促销活动时,营销系统根本不会推送消息给推送系统。当运营人员突然创建促销活动时,短时间内就要推送大量消息给推送系统。

 

7.全量用户发优惠券业务流程实现


如果运营⼈员维护⼀个发放优惠券活动,需要为全部⽤户发放优惠券。那么这种对全量用户发放优惠券,一般采用全量用户静默的方式进行发放。也就是发放优惠券过程是后台操作,不需要发送通知类消息去通知⽤户,而是直接发送优惠券到 MQ。然后消费者消费 MQ 后,⾃动保存到数据库的⽤户优惠券表中。

 

(1)创建发放优惠券活动的 HTTP 接口


该接口接收的参数如下:



该接口返回值如下:



该 HTTP 接口代码如下:


com.demo.eshop.promotion.controlller.PromotionCouponController#saveOrUpdateCoupon
复制代码


//发起活动controller@RestController@RequestMapping("/demo/promotion/coupon")public class PromotionCouponController {    //优惠活动service    @Autowired    private CouponService couponService;
//新增一个优惠券活动 @PostMapping public JsonResult<SaveOrUpdateCouponDTO> saveOrUpdateCoupon(@RequestBody SaveOrUpdateCouponRequest request){ try { log.info("新增一条优惠券:{}", JSON.toJSONString(request)); SaveOrUpdateCouponDTO dto = couponService.saveOrUpdateCoupon(request); return JsonResult.buildSuccess(dto); } catch (BaseBizException e) { log.error("biz error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getMessage()); } } ...}
//创建/更新优惠券活动时提交的请求对象@Data@Builderpublic class SaveOrUpdateCouponRequest implements Serializable { //优惠券名称 private String couponName; //优惠规则 private String couponRule; //活动开始时间 private Date activityStartTime; //活动结束时间 private Date activityEndTime; //优惠券发放数量,要发放多少张,最多只能有这么多用户领取到这个券 private Integer couponCount; //优惠券领取数量,已经有多少人领取了这个券 private Integer couponReceivedCount; //优惠券发放方式,1:仅自领取,2:仅系统发放 private Integer couponReceiveType; //优惠券类型:满减、折扣、立减 private Integer couponType; //优惠券状态:1:发放中,2:已发完,3:已过期 private Integer couponStatus; //活动创建/修改人 private Integer createUser; @Tolerate public SaveOrUpdateCouponRequest() {
}}
复制代码


(2)创建发放优惠券活动的服务接口


com.demo.eshop.promotion.service.impl.CouponServiceImpl#saveOrUpdateCoupon
复制代码


//优惠券接口实现@Servicepublic class CouponServiceImpl implements CouponService {    //开启优惠券活动DAO    @Autowired    private SalesPromotionCouponDAO salesPromotionCouponDAO;        ...    //保存/修改优惠券活动方法    @Transactional(rollbackFor = Exception.class)    @Override    public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) {        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request);        couponDO.setCouponReceivedCount(0);        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
//判断优惠券类型,系统发放类型,则针对所有用户,发送优惠券到MQ if (CouponSendTypeEnum.PLATFORM_SEND.getCode().equals(request.getCouponReceiveType())) { sendPlatformCouponMessage(couponDO); }
SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO(); dto.setCouponName(request.getCouponName()); dto.setRule(request.getCouponRule()); dto.setSuccess(true); return dto; } //为所有用户发放优惠券 private void sendPlatformCouponMessage(SalesPromotionCouponDO promotionCouponDO) { PlatformCouponMessage message = PlatformCouponMessage.builder() .couponId(promotionCouponDO.getId()) .activityStartTime(promotionCouponDO.getActivityStartTime()) .activityEndTime(promotionCouponDO.getActivityEndTime()) .couponType(promotionCouponDO.getCouponType()) .build();
//获取所有账号信息 //TODO 优化获取用户数据逻辑 JsonResult<List<MembershipAccountDTO>> jsonResult = accountApi.listAccount();
if (!jsonResult.getSuccess()) { throw new BaseBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); }
//循环发送消息 //可以优化成线程池,因为消息并不需要顺序性,全部发出去即可 List<MembershipAccountDTO> accounts = jsonResult.getData(); for (MembershipAccountDTO membershipAccountDTO : accounts) { message.setUserAccountId(membershipAccountDTO.getId()); String msgJson = JsonUtil.object2Json(message); defaultProducer.sendMessage(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, msgJson, "平台发放优惠券消息"); } } ...}
复制代码


8.全量用户发优惠券引入 MQ 削峰

 

营销系统向 RocketMQ 发送全量用户发优惠券的消息后,又会自己消费这些消息。

 

(1)营销系统配置的 RocketMQ 消费者


@Configurationpublic class ConsumerBeanConfig {    //配置内容对象    @Autowired    private RocketMQProperties rocketMQProperties;
//平台发放优惠券领取消费者 @Bean("platformCouponReceiveTopic") public DefaultMQPushConsumer receiveCouponConsumer(PlatFormCouponListener platFormCouponListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(PLATFORM_COUPON_SEND_TOPIC, "*"); consumer.registerMessageListener(platFormCouponListener); return consumer; }}
@Componentpublic class PlatFormCouponListener implements MessageListenerConcurrently { //优惠券服务service @Autowired private CouponItemService couponItemService;
//并发消费消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) { try { List<SalesPromotionCouponItemDTO> list = new ArrayList<>(); for (MessageExt messageExt : msgList) { log.debug("执行平台发放优惠券消费消息逻辑,消息内容:{}", messageExt.getBody()); String msg = new String(messageExt.getBody()); PlatformCouponMessage platformCouponMessage = JSON.parseObject(msg , PlatformCouponMessage.class); log.info("开始发放平台优惠券,couponId:{}", platformCouponMessage.getCouponId());
//幂等逻辑防止重复消费 JsonResult<Long> result = couponItemService.selectByAccountIdAndCouponId(platformCouponMessage.getUserAccountId(), platformCouponMessage.getCouponId()); //如果已经存在,直接跳过循环,不再执行优惠券保存操作 if (result.getSuccess()) { continue; } SalesPromotionCouponItemDTO itemDTO = new SalesPromotionCouponItemDTO(); itemDTO.setCouponId(platformCouponMessage.getCouponId()); itemDTO.setCouponType(platformCouponMessage.getCouponType()); itemDTO.setUserAccountId(platformCouponMessage.getUserAccountId()); itemDTO.setIsUsed(0); itemDTO.setActivityStartTime(platformCouponMessage.getActivityStartTime()); itemDTO.setActivityEndTime(platformCouponMessage.getActivityEndTime()); list.add(itemDTO); } //优惠券保存到数据库,把一批用户对这个券的持有记录,批量插入到数据库里去 couponItemService.saveCouponBatch(list); } catch (Exception e){ log.error("consume error,平台优惠券消费失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
//用户和优惠券领取关系的记录,这里的一条数据记录就代表了一个用户对一个优惠券的持有记录@Data@Builderpublic class SalesPromotionCouponItemDTO { //主键ID private Long id; //优惠券ID:一个优惠券自己就一条记录,多个用户对同一个优惠券可以有自己的券持有记录 private Long couponId; //优惠券类型:1:现金券,2:满减券 private Integer couponType; //用户ID private Long userAccountId; //是否已经使用 private Integer isUsed; //使用时间 private Date usedTime; //有效期开始时间 private Date activityStartTime; //有效期开始时间 private Date activityEndTime; //创建人 private Long createUser; //创建时间 private Date createTime; //更新人 private Long updateUser; //更新时间 private Date updateTime; @Tolerate public SalesPromotionCouponItemDTO() {
}}
复制代码


(2)全量用户发券满足引入 MQ 进行削峰的情形


情形一:生产者和消费者的并发处理量不一样

营销系统可能可以做到每秒生产 1 万条消息,但将每个用户的优惠券信息写入数据库时,能支持每秒两千就很高了。一般 8 核 16G 的标配数据库机器,每秒 TPS 可以支持写入两三千。

 

情形二:存在明显的高峰和低谷

当运营人员在不创建全量用户发券活动时,营销系统不会写全量用户的优惠券信息到数据库。当运营人员突然创建全量用户发券活动时,短时间内就要写全量用户的优惠券信息到数据库。

 

(3)目前这种方案实现的缺陷


缺陷一:查询全量用户比较耗时,而且这些数据也会大量消耗 JVM 内存。

 

缺陷二:对每个用户都创建一个 JSON 字符串消息发送到 MQ,用户量很大时会很消耗内存。

 

缺陷三:生成的每条消息都需要发送到 MQ,每条消息都要进行一次网络传输,这会导致大量的网络通信开销。

 

9.激活不活跃用户发券流程分析


如果运营⼈员需要维护这么⼀个发放优惠券的活动:首先根据条件筛选出⽤户数据,然后给筛选出来的⽤户创建推送通知消息。接着调⽤pushService 的接⼝,把推送通知消息推送给用户,通知⽤户去领取优惠券。最后用户领取优惠券后,将该用户领取到的优惠券保存到数据库的⽤户优惠券表中。

 

这其实就是一个既要进行领取优惠券的推送,也要发放优惠券的场景,对指定用户群体进行领取优惠券推送并发放优惠券。

 

(1)给特定用户发送领取优惠券推送的 HTTP 接口


该 HTTP 接口接收的参数如下:



该 HTTP 接口返回的响应如下:



具体实现如下:


com.demo.eshop.promotion.controlller.PromotionCouponController#saveOrUpdateCoupon
复制代码


//发起活动controller@RestController@RequestMapping("/demo/promotion/coupon")public class PromotionCouponController {    //优惠活动service    @Autowired    private CouponService couponService;    ...        @RequestMapping("/send")    public JsonResult<SendCouponDTO> sendCouponByConditions(@RequestBody SendCouponRequest request){        try {            log.info("发送优惠券给指定用户群体:{}", JSON.toJSONString(request));            SendCouponDTO dto = couponService.sendCouponByConditions(request);            return JsonResult.buildSuccess(dto);        } catch (BaseBizException e) {            log.error("biz error: request={}", JSON.toJSONString(request), e);            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());        } catch (Exception e) {            log.error("system error: request={}", JSON.toJSONString(request), e);            return JsonResult.buildError(e.getMessage());        }    }    ...}
//指定用户群体发送领取优惠券推送的请求@Data@Builderpublic class SendCouponRequest implements Serializable { //优惠券名称 private String couponName; //优惠券类型 private Integer couponType; //优惠规则 private String couponRule; //活动开始时间 private Date activityStartTime; //活动结束时间 private Date activityEndTime; //优惠券发放数量 private Integer couponCount; //优惠券领取数量 private Integer couponReceivedCount; //优惠券领取地址链接,领券url地址,领券是通过访问哪个url地址来领券的 private String activeUrl; //推送类型 1-定时发送,2-实时发送 private Integer pushType; //1:短信,2:app消息,3:邮箱 private Integer informType; //选人条件 private MembershipFilterDTO membershipFilterDTO; //定时发送消息任务开始时间 private Date pushStartTime; //定时发送任务结束时间 private Date pushEndTime; //每个发送周期内的发送次数,以此为依据发送消息 private Integer sendPeriodCount; //活动创建/修改人 private Integer createUser; @Tolerate public SendCouponRequest() {
}}
@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class MembershipFilterDTO implements Serializable { //账号类型 private Integer accountType; //会员等级 private Integer membershipLevel; //连续活跃天数 private Integer activeCount; //一个月内活跃天数 private Integer totalActiveCount; //订单总金额,单位:分 private Integer totalAmount;}
复制代码


(2)给特定用户发送领取优惠券推送的服务接口


//优惠券接口实现@Servicepublic class CouponServiceImpl implements CouponService {    //开启优惠券活动DAO    @Autowired    private SalesPromotionCouponDAO salesPromotionCouponDAO;        //推送服务    @DubboReference(version = "1.0.0")    private MessagePushApi messagePushApi;    ...        @Transactional(rollbackFor = Exception.class)    @Override    public SendCouponDTO sendCouponByConditions(SendCouponRequest sendCouponRequest) {        //保存优惠券信息        SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(sendCouponRequest);        couponDO.setCouponReceivedCount(0);        couponDO.setCouponStatus(CouponStatusEnum.NORMAL.getCode());        couponDO.setCouponReceiveType(CouponSendTypeEnum.SELF_RECEIVE.getCode());        salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO);
//构建messageRequest SaveOrUpdateMessageRequest messageRequest = buildSaveOrUpdateMessageRequest(sendCouponRequest); //创建消息推送 JsonResult<SaveOrUpdateMessageDTO> messageResult = messagePushApi.saveOrUpdateMessage(messageRequest);
SendCouponDTO sendCouponDTO = new SendCouponDTO(); sendCouponDTO.setSuccess(messageResult.getData().getSuccess()); sendCouponDTO.setCouponName(sendCouponRequest.getCouponName()); sendCouponDTO.setRule(sendCouponRequest.getCouponRule());
//TODO 发放数量 sendCouponDTO.setSendCount(0);
return sendCouponDTO; } ...}
//消息推送请求@Data@Builderpublic class SaveOrUpdateMessageRequest { //推送类型:1定时推送,2直接推送 private Integer pushType; //消息主题 private String mainMessage; //1:短信,2:app消息,3:邮箱 private Integer informType; //推送消息内容 private String message; //选人条件 private MembershipFilterDTO membershipFilterDTO; //定时发送消息任务开始时间 private Date pushStartTime; //定时发送任务结束时间 private Date pushEndTime; //每个发送周期内的发送次数,以此为依据发送消息 private Integer sendPeriodCount; //推送消息创建/修改人 private Integer createUser; @Tolerate public SaveOrUpdateMessageRequest() {
}}
复制代码


10.推送系统对营销系统发起的推送任务的处理


营销系统会通过 messagePushApi 的 saveOrUpdateMessage()方法调用推送系统对特定用户发送领取优惠券推送。

 

推送系统的 saveOrUpdateMessage()方法收到请求后,首先会将这次推送任务持久化到数据库中。如果发现这次推送任务属于定时调度类型,则先根据开始时间结束时间和推送次数创建定时任务推送记录,然后返回。如果发现这次推送任务属于即时调度类型,则直接发起推送。

 

对于定时调度类型的推送任务,只要将定时任务推送记录保存到数据库中,后续 XXLJob 便会进行专门的推送处理。


@DubboService(version = "1.0.0", interfaceClass = MessagePushApi.class, retries = 0)public class MessagePushApiImpl implements MessagePushApi {    @Autowired    private MessagePushService messagePushService;
@Override public JsonResult<SaveOrUpdateMessageDTO> saveOrUpdateMessage(SaveOrUpdateMessageRequest saveOrUpdateMessageRequest) { try { SaveOrUpdateMessageDTO dto = messagePushService.saveOrUpdateMessage(saveOrUpdateMessageRequest); return JsonResult.buildSuccess(dto); } catch (BaseBizException e) { log.error("biz error: request={}", JSON.toJSONString(saveOrUpdateMessageRequest), e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(saveOrUpdateMessageRequest), e); return JsonResult.buildError(e.getMessage()); } } ...}
@Servicepublic class MessagePushServiceImpl implements MessagePushService { @Autowired private MessagePushDAO messagePushDAO;
@Autowired private MessagePushCrontabDAO messagePushCrontabDAO; @Transactional(rollbackFor = Exception.class) @Override public SaveOrUpdateMessageDTO saveOrUpdateMessage(SaveOrUpdateMessageRequest saveOrUpdateMessageRequest) { SaveOrUpdateMessageDTO saveOrUpdateMessageDTO = new SaveOrUpdateMessageDTO();
//构建消息实体 MessagePushDO messagePushDO = buildMessagePushDO(saveOrUpdateMessageRequest); //无论是定时推送还是立即推送,都会把一次推送任务,持久化到数据库表里去 boolean saveFlag = messagePushDAO.save(messagePushDO);
//这次推送任务属于定时调度类型,定时发送推送消息 if (Objects.equals(saveOrUpdateMessageRequest.getPushType(), PushTypeEnum.DELAY.getCode())) { //如果是定时调度的任务,那么需要根据开始时间和结束时间得出在这段时间里,定时调度推送多少次,每次推送都要有一个定时调度任务记录 //比如对不活跃用户推送优惠券,在7天内一共要推送7次,那么就要这7天内每天推送一次领取优惠券消息给这些不活跃用户 List<MessagePushCrontabDO> messagePushCrontabDOS = generateMessagePushCrontab(saveOrUpdateMessageRequest);
if (!Collections.isEmpty(messagePushCrontabDOS)) { messagePushCrontabDAO.saveBatch(messagePushCrontabDOS); }
saveOrUpdateMessageDTO.setSuccess(saveFlag); return saveOrUpdateMessageDTO; }
//这次推送任务属于即时调度类型,马上发送推送消息 //首先构建推送消息DTO PushMessageDTO pushMessageDTO = PushMessageDTO.builder() .mainMessage(saveOrUpdateMessageRequest.getMainMessage()) .message(saveOrUpdateMessageRequest.getMessage()) .informType(saveOrUpdateMessageRequest.getInformType()) .build();
//然后立即推送消息 pushMessages(pushMessageDTO, saveOrUpdateMessageRequest.getMembershipFilterDTO());
saveOrUpdateMessageDTO.setSuccess(true); return saveOrUpdateMessageDTO; }
//生成消息发送任务实体数据 private List<MessagePushCrontabDO> generateMessagePushCrontab(SaveOrUpdateMessageRequest request) { Integer sendPeriodCount = request.getSendPeriodCount(); List<MessagePushCrontabDO> messagePushCrontabDOS = new ArrayList<>(); if (sendPeriodCount == 1) { //周期内只发送一次,直接使用startTime作为定时任务的执行时间 MessagePushCrontabDO messagePushCrontab = buildMessagePushCrontabDO(request, 1, request.getPushStartTime()); messagePushCrontabDOS.add(messagePushCrontab); } else { LocalDateTime startTime = DateUtil.convertLocalDateTime(request.getPushStartTime()); LocalDateTime endTime = DateUtil.convertLocalDateTime(request.getPushEndTime());
//开始时间和结束时间相隔分钟数 Long minutes = DateUtil.betweenMinutes(startTime, endTime);
//相邻定时任务执行周期间隔分钟数 long periodMinutes = minutes / (sendPeriodCount - 1);
for (int i = 1; i <= sendPeriodCount; i++) { //任务执行时间计算逻辑:起始时间开始作为第一次,后面每次间隔periodMinutes执行一次定时任务,最后一次使用结束时间 MessagePushCrontabDO messagePushCrontab; if (i == sendPeriodCount) { messagePushCrontab = buildMessagePushCrontabDO(request, i, request.getPushEndTime()); } else { //第一次调度,startTime;第二次调度,startTime + 每次调度耗费的分钟数;第三次调度,startTime + 2 * 每次调度耗费的分钟数;最后一次,endTime LocalDateTime crontabTime = startTime.plusMinutes(periodMinutes * (i - 1)); messagePushCrontab = buildMessagePushCrontabDO(request, i, DateUtil.convertDate(crontabTime)); } messagePushCrontabDOS.add(messagePushCrontab); } } return messagePushCrontabDOS; } ...}
复制代码


11.立即推送模式的流程、缺陷和削峰


对特定用户推送领取优惠券,会分为定时推送和即时推送,前面的实现中,定时推送其实已经没有什么问题了。但即时推送还是会存在一定问题,如果特定用户人群数量较大,如下的实现还是会有性能问题。

 

问题一:查询用户耗费时间,大量用户数据占内存

问题二:发送大量消息到 MQ 耗费网络资源和时间

 

使用下面的立即推送有一个前置条件,就是特定用户数量比较少,千级别。


@Servicepublic class MessagePushServiceImpl implements MessagePushService {    @DubboReference(version = "1.0.0")    private AccountApi accountApi;
@Autowired private DefaultProducer defaultProducer; ... @Override public void pushMessages(PushMessageDTO pushMessageDTO, MembershipFilterDTO membershipFilterDTO) { JsonResult<List<MembershipAccountDTO>> accountResult = accountApi.listAccountByConditions(membershipFilterDTO); for (MembershipAccountDTO accountDTO : accountResult.getData()) { PlatformMessagePushMessage platformMessagePushMessage = buildPlatformMessagePushMessage(pushMessageDTO, accountDTO); String msgJson = JsonUtil.object2Json(platformMessagePushMessage); defaultProducer.sendMessage(RocketMqConstant.PLATFORM_MESSAGE_SEND_TOPIC, msgJson, "平台消息推送消息"); } } ...}
复制代码


12.XXLJob 驱动定时推送模式的运行原理


(1)XXLJob 运行原理


说明一:XXLJob 首先要配置一组 Excutors,该组 Excutors 会有名字。推送系统在启动时就需要启动一个 Excutor,并且会注册到 XXLJob 里指定名字的一个 Excutors 中。XXLJob 收到推送系统 Excutor 的注册请求后,会根据注册的名字把它们划分到对应的一组 Excutors 里面。

 

说明二:然后开发人员便可以在 XXLJob 配置定时调度任务,绑定某组 Excutors 以及指定执行任务的 SpringBean。当配置好的定时任务的执行时间到达时,就会找到绑定的 Excutors,发送执行任务请求给推送系统的 Excutor;

 

说明三:推送系统的 Excutor 收到 XXLJob 发送的执行任务请求后,便会找到指定的 SpringBean 去执行任务,每个推送系统的 SpringBean 接着会从 MySQL 数据库里查询出相关的推送任务。

 

说明四:为了决定每个推送系统的 SpringBean 该执行从 MySQL 数据库查询出来的哪些推送任务。XXLJob 在发送执行任务请求给推送系统的 Excutor 时,会带上 shardIndex 和 shardNums。其中 shardNums 指的是当前执行定时任务的推送系统 Excutor 一共有多少个节点,每一个节点可以认为是任务执行的分片。shardIndex 就是对各个定时任务节点进行标号,比如发给推送系统 1 的 shardIndex=1,发送给推送系统 2 的 shardIndex=2。

 

说明五:这时推送系统的 SpringBean 从数据库查出来一批推送任务时:就会根据任务 ID 的 Hash 值对 shardNums 进行取模。通过取模结果和推送系统所属的 shardIndex 是否一样,来决定这个任务是属于哪个分片,从而实现多个节点对同一批任务的分布式调度。



(2)推送系统首先进行 XXLJob 的配置


application.yml 的 XXLJob 配置如下:


xxl:  job:    admin:      addresses: http://127.0.0.1:8080/xxl-job-admin    executor:      appname: message-push      port: 9999
复制代码


读取配置的 Bean 如下:


@Configurationpublic class XxlJobConfig {    @Value("${xxl.job.admin.addresses}")    private String adminAddresses;
@Value("${xxl.job.executor.appname}") private String appname;
@Value("${xxl.job.executor.port}") private int port;
@Bean public XxlJobSpringExecutor xxlJobExecutor() { log.info(">>>>>>>>>>> xxl-job config init."); //在推送系统上创建一个Executor XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setPort(port); return xxlJobSpringExecutor; }}
复制代码


(3)在推送系统编写执行任务的 Spring Bean


@Componentpublic class ScheduleSendMessageJobHandler {    ...        @Resource    private MessagePushCrontabDAO messagePushCrontabDAO;        @Autowired    private MessagePushService messagePushService;        //执行定时任务,筛选并执行"需要定时发送领取优惠券推送消息"的任务    @XxlJob("messagePushHandler")    public void messagePushHandler() {        log.info("messagePushHandler 开始执行");
int shardIndex = Optional.ofNullable(XxlJobHelper.getShardIndex()).orElse(0); int totalShardNum = Optional.ofNullable(XxlJobHelper.getShardTotal()).orElse(0);
//查询出当前时间需要执行的任务,小于等于当前时间且没有执行过的任务 LambdaQueryWrapper<MessagePushCrontabDO> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.le(MessagePushCrontabDO::getCrontabTime, new Date()).eq(MessagePushCrontabDO::getExecuteFlag, YesOrNoEnum.NO.getCode()); List<MessagePushCrontabDO> messagePushCrontabDOS = messagePushCrontabDAO.list(queryWrapper);
log.info("获取需要定时发送消息的优惠券等活动消息任务, messagePushCrontabDOS:{}", JsonUtil.object2Json(messagePushCrontabDOS));
for (MessagePushCrontabDO messagePushCrontabDO : messagePushCrontabDOS) { int shardNo = messagePushCrontabDO.getId().hashCode() % totalShardNum; if (shardNo == shardIndex) { sendMessage(messagePushCrontabDO); //消息发送成功,将数据变更为已执行 messagePushCrontabDO.setExecuteFlag(YesOrNoEnum.YES.getCode()); } } //已经执行过的任务,修改数据库 messagePushCrontabDAO.updateBatchById(messagePushCrontabDOS); log.info("hotGoodsPushHandler 执行结束"); }
private void sendMessage(MessagePushCrontabDO messagePushCrontabDO) { MembershipFilterDTO membershipFilterDTO = JsonUtil.json2Object(messagePushCrontabDO.getFilterCondition(), MembershipFilterDTO.class); PushMessageDTO pushMessageDTO = PushMessageDTO.builder() .mainMessage(messagePushCrontabDO.getMainMessage()) .message(messagePushCrontabDO.getMessageInfo()) .informType(messagePushCrontabDO.getInformType()) .build();
messagePushService.pushMessages(pushMessageDTO, membershipFilterDTO); } ...}
复制代码


13.不活跃用户领取优惠券流程


(1)领取优惠券接口介绍


提供给⽤户的优惠券领取链接,在⽤户点击之后,会调⽤此接⼝领取⼀张优惠券,并扣减发放的优惠券总数。在领取前会先判断优惠券的状态,避免错领、重复领取、或者领取到活动已经结束的优惠券。

 

接口参数如下:



接口响应如下:



(2)领取优惠券 HTTP 接口


@RestController@RequestMapping("/demo/promotion/coupon")public class PromotionCouponController {    //优惠活动service    @Autowired    private CouponService couponService;    ...        @RequestMapping("/receive")    public JsonResult<ReceiveCouponDTO> receiveCoupon(@RequestBody ReceiveCouponRequest request){        try {            log.info("领取优惠券:{}", JSON.toJSONString(request));            ReceiveCouponDTO dto = couponService.receiveCoupon(request);            return JsonResult.buildSuccess(dto);        } catch (BaseBizException e) {            log.error("biz error: request={}", JSON.toJSONString(request), e);            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());        } catch (Exception e) {            log.error("system error: request={}", JSON.toJSONString(request), e);            return JsonResult.buildError(e.getMessage());        }    }}
//领取优惠券请求@Data@Builderpublic class ReceiveCouponRequest implements Serializable { //优惠券ID private Long couponId; //优惠券类型:1:现金券,2:满减券 private Integer couponType; //用户账号 private Long userAccountId; //优惠规则 private String couponRule; //优惠券生效开始时间 private Date activityStartTime; //优惠券生效结束时间 private Date activityEndTime; @Tolerate public ReceiveCouponRequest() {
}}
复制代码


(3)领取优惠券服务接口


@Servicepublic class CouponServiceImpl implements CouponService {    //开启优惠券活动DAO    @Autowired    private SalesPromotionCouponDAO salesPromotionCouponDAO;
//开启优惠券活动DAO @Autowired private SalesPromotionCouponItemDAO salesPromotionCouponItemDAO; ... @Transactional(rollbackFor = Exception.class) @Override public ReceiveCouponDTO receiveCoupon(ReceiveCouponRequest receiveCouponRequest) { //获取优惠券信息 SalesPromotionCouponDO couponDO = salesPromotionCouponDAO.getById(receiveCouponRequest.getCouponId());
//检查优惠券状态 ReceiveCouponDTO dto = checkCouponStatus(couponDO); if (!dto.getSuccess()) { return dto; }
//查询用户是否已经领取过该优惠券了,如果领取过,直接返回 LambdaQueryWrapper<SalesPromotionCouponItemDO> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.eq(SalesPromotionCouponItemDO::getCouponId, receiveCouponRequest.getCouponId()); queryWrapper.eq(SalesPromotionCouponItemDO::getUserAccountId, receiveCouponRequest.getUserAccountId()); int count = salesPromotionCouponItemDAO.count(queryWrapper);
//用户已经领取过该优惠券 if (count > 0) { dto.setSuccess(false); dto.setMessage("已经领取过该优惠券,不要重复领取哦"); return dto; }
//修改优惠券领取数量 couponDO.setCouponReceivedCount(couponDO.getCouponReceivedCount() + 1); //如果领取数量与发放数量相同,将优惠券状态设置为发放完 if (Objects.equals(couponDO.getCouponCount(), couponDO.getCouponReceivedCount())) { couponDO.setCouponStatus(CouponStatusEnum.USED.getCode()); } salesPromotionCouponDAO.updateById(couponDO);
//领取一张优惠券 SalesPromotionCouponItemDO couponItemDO = buildSalesPromotionCouponItemDO(couponDO, receiveCouponRequest); //添加一条领取记录 salesPromotionCouponItemDAO.save(couponItemDO);
return dto; } //检查优惠券的状态,并返回 领取优惠券结果 对象 private ReceiveCouponDTO checkCouponStatus(SalesPromotionCouponDO couponDO) { if (Objects.isNull(couponDO)) { throw new BaseBizException("优惠券不存在"); }
ReceiveCouponDTO dto = new ReceiveCouponDTO(); Integer couponStatus = couponDO.getCouponStatus();
//领取完或已过期 if (!Objects.equals(couponStatus, CouponStatusEnum.NORMAL.getCode())) { dto.setSuccess(false); CouponStatusEnum statusEnum = CouponStatusEnum.getByCode(couponStatus); if (Objects.isNull(statusEnum)) { throw new BaseBizException("优惠券领取失败"); } dto.setMessage("优惠券"+ statusEnum.getMsg() + ",下次早点来哦"); return dto; } //发行数量小于或者等于领取数量,优惠券已经领取完 if (couponDO.getCouponCount() <= couponDO.getCouponReceivedCount()) { //修改coupon couponDO.setCouponStatus(CouponStatusEnum.USED.getCode()); salesPromotionCouponDAO.updateById(couponDO);
dto.setSuccess(false); dto.setMessage("优惠券已发放完,下次早点来哦"); return dto; }
//优惠券过期 if (couponDO.getActivityEndTime().before(new Date())) { //修改coupon couponDO.setCouponStatus(CouponStatusEnum.EXPIRED.getCode()); salesPromotionCouponDAO.updateById(couponDO);
dto.setSuccess(false); dto.setMessage("优惠券已过期,下次早点来哦"); return dto; }
dto.setSuccess(true); return dto; }}
复制代码


14.热门商品根据用户画像定时推送以及 MQ 削峰


首先推送系统通过 XXLJob 获取到热门商品后会遍历每一个热门商品,然后根据热门商品对应的用户画像标签去获取对应的用户数据,接着遍历这些用户将具体推送消息发送到 MQ 进行削峰填谷,最后推送系统从 MQ 中消费这些消息调用第三方 Push 平台的 SDK 进行推送。

 

(1)推送系统的 XXLJob 实现


@Componentpublic class ScheduleSendMessageJobHandler {    @Autowired    private ScheduleSendMessageJob scheduleSendMessageJob;        @Resource    private HotGoodsCrontabDAO hotGoodsCrontabDAO;    ...        //执行定时任务,筛选热门商品和用户发送给MQ    @XxlJob("hotGoodsPushHandler")    public void hotGoodsPushHandler() {        log.info("hotGoodsPushHandler 开始执行");
//获取shard分片数据进行分布式调度 int shardIndex = Optional.ofNullable(XxlJobHelper.getShardIndex()).orElse(0); int totalShardNum = Optional.ofNullable(XxlJobHelper.getShardTotal()).orElse(0);
//获取热门商品和用户画像,业务先简化为一对一关系 List<HotGoodsCrontabDO> crontabDOs = hotGoodsCrontabDAO.queryHotGoodsCrontabByCrontabDate(new Date()); log.info("获取热门商品和用户画像数据, crontabDOs:{}", JsonUtil.object2Json(crontabDOs));
//找出每个热门商品对应画像所匹配的用户 for (HotGoodsCrontabDO crontabDO : crontabDOs) { if (StringUtils.isEmpty(crontabDO.getPortrayal())) { continue; }
//通过判断shard分片数据来实现分布式调度 int shardNo = crontabDO.getId().hashCode() % totalShardNum; if (shardNo != shardIndex) { continue; }
//热门商品对应的画像实体 MembershipPointDO membershipPointDO = JsonUtil.json2Object(crontabDO.getPortrayal(), MembershipPointDO.class); if (Objects.isNull(membershipPointDO)) { continue; }
//获取匹配画像的用户积分实体 //从会员系统中,根据用户画像标签获取对应的用户数据 MembershipFilterConditionDTO conditionDTO = buildCondition(membershipPointDO); JsonResult<List<MembershipPointDTO>> pointDetailResult = membershipPointApi.listMembershipPointByConditions(conditionDTO); log.info("根据用户画像匹配用户实体, pointDetailResult:{}", JsonUtil.object2Json(pointDetailResult));
if (!pointDetailResult.getSuccess()) { throw new BaseBizException(pointDetailResult.getErrorCode(), pointDetailResult.getErrorMessage()); } List<MembershipPointDTO> membershipPointDOs = pointDetailResult.getData();
//发送消息到MQ,进行削峰填谷 sendMessage(crontabDO, membershipPointDOs); } } //将热门商品和对应的用户消息发送给MQ private void sendMessage(HotGoodsCrontabDO crontabDO, List<MembershipPointDTO> membershipPointDTOs) { Set<Long> accountIds = new HashSet<>(); for (MembershipPointDTO membershipPointDTO : membershipPointDTOs) { accountIds.add(membershipPointDTO.getAccountId()); } HotGoodsVO hotGoodsVO = buildHotGoodsVO(crontabDO); scheduleSendMessageJob.matchingAndSendMessage(hotGoodsVO ,accountIds); log.info("传入参数,准备发送消息到MQ, hotGoodsVO: {}, accountIds: {}", JsonUtil.object2Json(hotGoodsVO), JsonUtil.object2Json(accountIds)); } ...}
//推荐系统/大数据系统,会去计算出不同人群的爆款商品//如果某个用户是一个运动达人,那么其用户画像标签就是"运动",然后根据购买量最高/浏览量最高/好评率最高,来计算出这个用户画像群体的爆品@Data@TableName("hot_goods_crontab")public class HotGoodsCrontabDO { //主键 private Long id; //热门商品id private Long goodsId; //热门商品名 private String goodsName; //热门商品描述 private String goodsDesc; //关键字 private String keywords; //定时任务日期 private Date crontabDate; //用户画像,json格式,简化业务处理,和热门商品处于一对一关系 private String portrayal; private Integer createUser; private Date createTime; private Integer updateUser; private Date updateTime;}
@Servicepublic class ScheduleSendMessageJobImpl implements ScheduleSendMessageJob { @Resource private DefaultProducer defaultProducer;
@Override public void matchingAndSendMessage(HotGoodsVO hotGood, Set<Long> accountIds) { for (Long accountId : accountIds) { //热门商品消息 String hotMessage = buildMessage(hotGood, accountId); log.info("构造热门商品MQ消息, hotMessage: {}", hotMessage);
//发送给mq defaultProducer.sendMessage(RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC, hotMessage, "平台发送热门商品消息"); log.info("发送热门商品消息到MQ, topic: {}, hotMessage: {}", RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC, hotMessage); } } ...}
复制代码


(2)推送系统的消费者实现


//RocketMQ的消费者工程实现,通常是在工程代码里加入一个@Configuration注解//ConsumerBeanConfig类一旦加了这个Spring提供的注解,那么系统启动时,这个类就会被Spring容器接管(实例化),//ConsumerBeanConfig类在被Spring容器接管的过程中,Spring也会对被@Autowired标记的属性进行注入//比如RocketMQProperties作为RocketMQ配置数据的Bean,加了@Autowired标记,就会先被Spring容器接管(实例化)//ConsumerBeanConfig里面有很多方法,每个方法都加了一个@Bean注解//这表示着Spring容器在执行这些方法时能拿到自定义的Bean实例,然后把实例纳入到Spring容器管理中去//所以,ConsumerBeanConfig类里的每个方法和一个@Bean注解就定义了一个系统里的RocketMQ消费者Bean@Configurationpublic class ConsumerBeanConfig {    //配置内容对象    @Autowired    private RocketMQProperties rocketMQProperties;    ...        //热门商品推送    @Bean("platformHotProductSendTopic")    public DefaultMQPushConsumer platformHotProductConsumer(PlatFormHotProductListener platFormHotProductListener) throws MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_SEND_CONSUMER_GROUP);        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());        consumer.subscribe(PLATFORM_HOT_PRODUCT_SEND_TOPIC, "*");        consumer.registerMessageListener(platFormHotProductListener);        return consumer;    }    ...}
@Componentpublic class PlatFormHotProductListener implements MessageListenerConcurrently { //并发消费消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : msgList) { log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody()); String msg = new String(messageExt.getBody()); HashMap hotProductMessage = JSON.parseObject(msg , HashMap.class); //调用第三方平台SDK发送推送通知 informByPush(hotProductMessage); } } catch (Exception e) { log.error("consume error,平台优惠券消费失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
复制代码


15.营销的四大业务场景 MQ 削峰方案总结


(1)简单总结


一.大用户量推送和大用户量发券的业务挑战


营销系统的两个业务场景"大用户量推送和大用户量发券"中,都会面临瞬时大量推送和发券的处理高峰。例如大用户量推送场景需要瞬时高并发调用第三方平台的 SDK,大用户量发券需要瞬时高并发写入数据库。这时就会引入 RocketMQ 进行削峰填谷,这就是"大用户量推送和大用户量发券"的业务挑战。

 

二.多种推送方式引入抽象工厂设计模式处理


通过抽象工厂获取组件工厂,然后再由组件工厂获取具体组件。

 

三.热门商品定时推送的业务挑战


XXLJob 实现的分布式定时调度。

 

(2)详细总结


一.优惠活动场景(全量用户推送促销活动)


运营开启的优惠活动,可能包括满减活动、积分活动、双⼗⼀活动、会员⽇活动等。运营活动开启时,会针对⽤户群体发送⼤量的通知消息。运营开启⼀次运营活动后,运营活动的通知会通过多种⽅式推送到⽤户端,包括 APP 通知、短信、Email。

 

解决方案:

利用 MQ 对瞬时高并发调用通知接口进行削峰填谷 + 抽象工厂设计模式完成多种类型消息的发送。

 

二.优惠券场景(全量用户发放静默优惠券)


优惠券场景,主要有两种。

 

第⼀种:系统需要给⽤户发放优惠券,其中平台发放类型的优惠券,正常都是后台保存进数据库中。当⽤户打开系统,或者打开 APP 后,就会显示这张优惠券,并且在下单购物时可以选择优惠券抵扣。

 

第⼆种:系统发放优惠券后,设置领取数量,只有点击了领取链接的⽤户才能领取到这个优惠券。

 

针对第⼀种情况,在发放优惠券的活动开始后,系统需要⼤量的后台数据库操作,保存优惠券到数据库。这个过程如果活动次数较多,会对数据库产⽣巨⼤的压⼒。为了让这个功能快速完成,所以需要借助 MQ 来削峰,即把每个⽤户的优惠券都发到 MQ。由消费者慢慢消费后,再保存到数据库,以避免⼤量活动开启时,对数据库造成过⼤的压⼒。

 

针对第⼆种情况,由于⽤户需要点击链接才能完成优惠券的领取,所以对数据库的压⼒不⼤。但是活动的内容需要让⽤户知道,因此需要发送⼤量的通知消息通知⽤户。此时也有⼤量的发通知操作需要调⽤通知服务,或者调⽤第三⽅平台推送这个通知消息。优惠券通过系统后台的⽅式,及⽤户点击活动⻚⾯的⽅式保存到数据库中。

 

解决方案:

利用 MQ 对瞬时高并发写库进行削峰填谷。

 

三.消息通知场景及其解决方案:(特定用户发送领取优惠券推送)


单独的通知场景也是公司种⽐较常⻅的场景,⽐如会员⽣⽇、⼴告推送、热⻔商品推送,以及借助⼤数据系统计算出⼀些结果发送个性化通知和推荐消息。此时也会⼤量调⽤通知服务或者第三⽅通知平台来推送相应的通知消息给⽤户。

 

四.定时任务自动推送场景:(热门商品定时推送)


在通知场景中,还有⼀种周期性质的消息通知。⽐如每⽇的热⻔商品推送、双⼗⼀活动通知、在双⼗⼀开始前每隔 3⽇推送⼀份⼉双⼗⼀活动通知等。

 

解决方案:即时推送使用 MQ 对瞬时高并发调用第三方接口进行削峰填谷,定时推送使用 XXLJob 实现分布式调度。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18708978

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ实战—营销系统代码初版_数据库_EquatorCoco_InfoQ写作社区