本文分享自华为云社区《秒杀系统架构设计都有哪些关键点?》,作者:JavaEdge 。
0、目标
重复排队:一个人抢购商品,若未支付,不准重复排队抢购
1 个商品卖给多个人:1 商品多订单
秒杀支付:支付流程调整
1.RabbitMQ 延时队列
2.利用延时队列实现支付订单的监听,根据订单支付状况进行订单数据库回滚
1、防止重复排队
用户每次抢单时,一旦排队,设置个自增值,让该值的初始值为 1。
每次进入抢单时,对其递增,若值>1,则表明已排队,为禁止重复排队,直接对外抛异常信息 xxx 表示已在排队。
1.1 后台排队记录
修改 SeckillOrderServiceImpl#add 方法,新增递增值判断是否已排队:
//递增,判断是否排队Long userQueueCount = redisTemplate.boundHashOps("UserQueueCount").increment(username, 1);if(userQueueCount>1){ // 有重复抢单 throw new RuntimeException(String.valueOf(StatusCode.REPERROR));}
复制代码
2、超卖问题
多人抢购同一商品时,多人同时判断是否有库存,若只剩一个,则都会判断有库存,此时会导致超卖,即一个商品被下了多个订单。
2.1 思路分析
利用 Redis list 队列,给每件商品创建一个独立的商品个数队列,如:A 商品有 2 个,A 商品的 ID 为 1001,则创建一个 list,key=SeckillGoodsCountList_1001,往该队列中塞 2 次该商品 ID。
每次给用户下单时,先从队列取数据:
有库存
无库存
这就防止了超卖。
操作 Redis 大部分都是先查出数据查,在内存中修改,然后存入 Redis。高并发下就有数据错乱问题,为控制数量准确,单独将商品数量整个自增键,自增键是线程安全的,无需担心并发问题。
2.2 代码
每次将商品压入 Redis 时,创建一个商品队列。
修改 SeckillGoodsPushTask,添加一个 pushIds 方法,用于将指定商品 ID 放入到指定数字:
/*** * 将商品ID存入到数组中 * @param len:长度 * @param id :值 * @return */public Long[] pushIds(int len,Long id){ Long[] ids = new Long[len]; for (int i = 0; i <ids.length ; i++) { ids[i]=id; } return ids;}
复制代码
SeckillGoodsPushTask#loadGoodsPushRedis,添加队列操作:
2.3 防止超卖
修改多线程下单方法,分别修改数量控制,以及售罄后用户抢单排队信息的清理:
3、订单支付
完成秒杀下订单后,进入支付页面,此时前端会每 3s 向后台发送一次请求,判断当前用户订单是否完成支付:
若完成支付,则清理排队信息,并修改订单状态。
3.1 创建支付二维码
下单成功后,会跳转到支付选择页面,在支付选择页面要显示订单编号和订单金额,所以我们需要在下单的时候,将订单金额以及订单编号信息存储到用户查询对象。
选择微信支付后,会跳转到微信支付页面,微信支付页面会根据用户名查看用户秒杀订单,并根据用户秒杀订单的 ID 创建预支付信息并获取二维码信息,展示给用户看,此时页面每 3 秒查询一次支付状态,如果支付成功,需要修改订单状态信息。
3.1.1 回显订单号、金额
下单后,进入支付选择页面,需显示订单号和订单金额,所以需要在用户下单后将该数据传入到 pay.html 页面,所以查询订单状态时,需要将订单号和金额封装到查询的信息中,修改查询订单装的方法加入他们。
修改 SeckillOrderController#queryStatus:
测试:
3.1.2 创建二维码
用户创建二维码,可以先查询用户的秒杀订单抢单信息,然后再发送请求到支付微服务中创建二维码,将订单编号以及订单对应的金额传递到支付微服务:/weixin/pay/create/native。
使用 Postman 测试效果如下:
3.2 支付流程分析
用户抢单,经过秒杀系统实现抢单,下单后会将向 MQ 发送一个延时消息,包含抢单信息
秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断 Redis 缓存中是否存在订单信息,若存在,则回滚
秒杀系统启动监听支付回调信息。若支付完成,则将订单持久化到 MySQL,如果没完成,清理排队信息,回滚库存
每次秒杀下单后调用支付系统,创建二维码。若用户支付成功,微信系统会将支付信息发给支付系统指定的回调地址,支付系统收到信息后,将信息发给 MQ,第 3 个步骤就能监听到消息。
3.3 支付回调更新
支付回调这一块代码已经实现了,但之前实现的是订单信息的回调数据发送给 MQ,指定了对应队列,不过现在需要实现的是秒杀信息发送给指定队列,所以之前的代码那块需要动态指定队列。
3.3.1 支付回调队列指定
创建支付二维码需要指定队列
回调地址回调时,获取支付二维码指定的队列,将支付信息发到指定队列
在微信支付统一下单 API 中,有个附加参数:
attach:附加数据,String(127),在查询API和支付通知中原样返回,可作为自定义参数使用。
复制代码
可在创建二维码时,设置该参数以指定回调支付信息的对应队列,每次回调时,会获取该参数,然后将回调信息发到该参数对应的队列。
3.3.1.1 改造支付方法
修改支付微服务的 WeixinPayController#createNative:
修改支付微服务的 WeixinPayService#createNative:
修改支付微服务的 WeixinPayServiceImpl#createNative:
创建二维码时,传递如下参数
可根据用户名,查询用户排队信息
下单必需
支付必需
回调时,可知将支付信息发送到哪个队列
修改 WeixinPayApplication,添加对应队列以及对应交换机绑定,代码如下:
@SpringBootApplication@EnableEurekaClient@EnableFeignClientspublic class WeixinPayApplication {
public static void main(String[] args) { SpringApplication.run(WeixinPayApplication.class,args); }
@Autowired private Environment env; // 创建DirectExchange交换机 @Bean public DirectExchange basicExchange(){ return new DirectExchange(env.getProperty("mq.pay.exchange.order"), true,false); }
// 创建队列 @Bean(name = "queueOrder") public Queue queueOrder(){ return new Queue(env.getProperty("mq.pay.queue.order"), true); }
// 创建秒杀队列 @Bean(name = "queueSeckillOrder") public Queue queueSeckillOrder(){ return new Queue(env.getProperty("mq.pay.queue.seckillorder"), true); }
// 队列绑定到交换机上 @Bean public Binding basicBindingOrder(){ return BindingBuilder .bind(queueOrder()) .to(basicExchange()) .with(env.getProperty("mq.pay.routing.orderkey")); }
// 队列绑定到交换机 @Bean public Binding basicBindingSeckillOrder(){ return BindingBuilder .bind(queueSeckillOrder()) .to(basicExchange()) .with(env.getProperty("mq.pay.routing.seckillorderkey")); }}
复制代码
修改 application.yml,添加如下配置
#位置支付交换机和队列mq: pay: exchange: order: exchange.order seckillorder: exchange.seckillorder queue: order: queue.order seckillorder: queue.seckillorder routing: key: queue.order seckillkey: queue.seckillorder
复制代码
3.3.1.2 测试
创建二维码测试
以后每次支付,都需要带上对应的参数,包括前面的订单支付。
3.3.1.3 改造支付回调方法
修改 WeixinPayController#notifyUrl,获取自定义参数,并转成 Map,获取 queue 地址,并将支付信息发送到绑定的 queue:
3.3.2 支付状态监听
支付状态通过回调地址发给 MQ 后,需要在秒杀系统中监听支付信息:
支付成功,修改用户订单状态
支付失败,删除订单,回滚库存。
SeckillOrderPayMessageListener 监听消息:
@Component@RabbitListener(queues = "${mq.pay.queue.seckillorder}")public class SeckillOrderPayMessageListener {
// 监听消费消息 @RabbitHandler public void consumeMessage(@Payload String message) { System.out.println(message); // 将消息转换成Map对象 Map<String,String> resultMap = JSON.parseObject(message,Map.class); System.out.println("监听到的消息:"+resultMap); }}
复制代码
修改 SeckillApplication 创建对应的队列以及绑定对应交换机。
@SpringBootApplication@EnableEurekaClient@EnableFeignClients@MapperScan(basePackages = {"com.changgou.seckill.dao"})@EnableScheduling@EnableAsyncpublic class SeckillApplication { public static void main(String[] args) { SpringApplication.run(SeckillApplication.class,args); }
@Bean public IdWorker idWorker(){ return new IdWorker(1,1); }
@Autowired private Environment env; /*** * 创建DirectExchange交换机 * @return */ @Bean public DirectExchange basicExchange(){ return new DirectExchange(env.getProperty("mq.pay.exchange.order"), true,false); }
/*** * 创建队列 * @return */ @Bean(name = "queueOrder") public Queue queueOrder(){ return new Queue(env.getProperty("mq.pay.queue.order"), true); }
/*** * 创建秒杀队列 * @return */ @Bean(name = "queueSeckillOrder") public Queue queueSeckillOrder(){ return new Queue(env.getProperty("mq.pay.queue.seckillorder"), true); }
/**** * 队列绑定到交换机上 * @return */ @Bean public Binding basicBindingOrder(){ return BindingBuilder .bind(queueOrder()) .to(basicExchange()) .with(env.getProperty("mq.pay.routing.orderkey")); } /**** * 队列绑定到交换机上 * @return */ @Bean public Binding basicBindingSeckillOrder(){ return BindingBuilder .bind(queueSeckillOrder()) .to(basicExchange()) .with(env.getProperty("mq.pay.routing.seckillorderkey")); }}
复制代码
添加配置:
#位置支付交换机和队列mq: pay: exchange: order: exchange.order seckillorder: exchange.seckillorder queue: order: queue.order seckillorder: queue.seckillorder routing: key: queue.order seckillkey: queue.seckillorder
复制代码
3.3.3 修改订单状态
监听到支付信息后,根据支付信息判断,如果用户支付成功,则修改订单信息,并将订单入库,删除用户排队信息,如果用户支付失败,则删除订单信息,回滚库存,删除用户排队信息。
3.3.3.1 业务层
修改 SeckillOrderService,添加修改订单方法:
/*** * 更新订单状态 */@Overridepublic void updatePayStatus(String out_trade_no, String transaction_id,String username) { //订单数据从Redis数据库查询出来 SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username); //修改状态 seckillOrder.setStatus("1");
//支付时间 seckillOrder.setPayTime(new Date()); //同步到MySQL中 seckillOrderMapper.insertSelective(seckillOrder);
//清空Redis缓存 redisTemplate.boundHashOps("SeckillOrder").delete(username);
//清空用户排队数据 redisTemplate.boundHashOps("UserQueueCount").delete(username);
//删除抢购状态信息 redisTemplate.boundHashOps("UserQueueStatus").delete(username);}
复制代码
3.3.3.2 修改订单对接
修改微信支付状态监听的代码,当用户支付成功后,修改订单状态:
3.3.4 删除订单回滚库存
如果用户支付失败,我们需要删除用户订单数据,并回滚库存。关闭订单:
/*** * 关闭订单,回滚库存 */@Overridepublic void closeOrder(String username) { //将消息转换成SeckillStatus SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps("UserQueueStatus").get(username); //获取Redis中订单信息 SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
//如果Redis中有订单信息,说明用户未支付 if(seckillStatus!=null && seckillOrder!=null){ //删除订单 redisTemplate.boundHashOps("SeckillOrder").delete(username); //回滚库存 //1)从Redis中获取该商品 SeckillGoods seckillGoods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).get(seckillStatus.getGoodsId());
//2)如果Redis中没有,则从数据库中加载 if(seckillGoods==null){ seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillStatus.getGoodsId()); }
//3)数量+1 (递增数量+1,队列数量+1) Long surplusCount = redisTemplate.boundHashOps("SeckillGoodsCount").increment(seckillStatus.getGoodsId(), 1); seckillGoods.setStockCount(surplusCount.intValue()); redisTemplate.boundListOps("SeckillGoodsCountList_" + seckillStatus.getGoodsId()).leftPush(seckillStatus.getGoodsId());
//4)数据同步到Redis中 redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).put(seckillStatus.getGoodsId(),seckillGoods);
//清理排队标示 redisTemplate.boundHashOps("UserQueueCount").delete(seckillStatus.getUsername());
//清理抢单标示 redisTemplate.boundHashOps("UserQueueStatus").delete(seckillStatus.getUsername()); }}
复制代码
3.3.4.1 调用删除订单
SeckillOrderPayMessageListener,在用户支付失败后调用关闭订单:
//支付失败,删除订单seckillOrderService.closeOrder(attachMap.get("username"));
复制代码
支付微服务
server: port: 9022spring: application: name: pay main: allow-bean-definition-overriding: true rabbitmq: host: 127.0.0.1 #mq的服务器地址 username: guest #账号 password: guest #密码eureka: client: service-url: defaultZone: http://127.0.0.1:6868/eureka instance: prefer-ip-address: truefeign: hystrix: enabled: true#hystrix 配置hystrix: command: default: execution: timeout: #如果enabled设置为false,则请求超时交给ribbon控制 enabled: true isolation: strategy: SEMAPHORE
#微信支付信息配置weixin: appid: wx8397f8696b538317 partner: 1473426802 partnerkey: T6m9iK73b0kn9g5v426MKfHQH7X8rKwb notifyurl: http://2cw4969042.wicp.vip:36446/weixin/pay/notify/url
#位置支付交换机和队列mq: pay: exchange: order: exchange.order queue: order: queue.order seckillorder: queue.seckillorder routing: orderkey: queue.order seckillorderkey: queue.seckillorder
复制代码
秒杀微服务配置
server: port: 18084spring: application: name: seckill datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/changgou_seckill?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC username: root password: itcast rabbitmq: host: 127.0.0.1 #mq的服务器地址 username: guest #账号 password: guest #密码 main: allow-bean-definition-overriding: trueeureka: client: service-url: defaultZone: http://127.0.0.1:6868/eureka instance: prefer-ip-address: truefeign: hystrix: enabled: truemybatis: configuration: map-underscore-to-camel-case: true mapper-locations: classpath:mapper/*Mapper.xml type-aliases-package: com.changgou.seckill.pojo
#hystrix 配置hystrix: command: default: execution: timeout: #如果enabled设置为false,则请求超时交给ribbon控制 enabled: true isolation: thread: timeoutInMilliseconds: 10000 strategy: SEMAPHORE#位置支付交换机和队列mq: pay: exchange: order: exchange.order queue: order: queue.order seckillorder: queue.seckillorder routing: orderkey: queue.order seckillorderkey: queue.seckillorder
复制代码
4、库存回滚
4.1 秒杀流程回顾
1.用户抢单,经过秒杀系统实现抢单,下单后会将向 MQ 发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到 2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断 Redis 缓存中是否存在订单信息,如果存在,则回滚 3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单吃句话到 MySQL,如果没完成,清理排队信息回滚库存 4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,微信系统会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给 MQ,第 3 个步骤就可以监听到消息了。
延时队列实现订单关闭回滚库存:
1.创建一个过期队列 Queue12.接收消息的队列 Queue23.中转交换机4.监听Queue2 1)SeckillStatus->检查Redis中是否有订单信息 2)如果有订单信息,调用删除订单回滚库存->[需要先关闭微信支付] 3)如果关闭订单时,用于已支付,修改订单状态即可 4)如果关闭订单时,发生了别的错误,记录日志,人工处理
复制代码
4.2 关闭支付
用户如果半个小时没有支付,我们会关闭支付订单,但在关闭之前,需要先关闭微信支付,防止中途用户支付。
修改支付微服务的 WeixinPayService,添加关闭支付方法,代码如下:
/*** * 关闭支付 * @param orderId * @return */Map<String,String> closePay(Long orderId) throws Exception;
复制代码
修改 WeixinPayServiceImpl,实现关闭微信支付方法,代码如下:
/*** * 关闭微信支付 * @param orderId * @return * @throws Exception */@Overridepublic Map<String, String> closePay(Long orderId) throws Exception { //参数设置 Map<String,String> paramMap = new HashMap<String,String>(); paramMap.put("appid",appid); //应用ID paramMap.put("mch_id",partner); //商户编号 paramMap.put("nonce_str",WXPayUtil.generateNonceStr());//随机字符 paramMap.put("out_trade_no",String.valueOf(orderId)); //商家的唯一编号
//将Map数据转成XML字符 String xmlParam = WXPayUtil.generateSignedXml(paramMap,partnerkey);
//确定url String url = "https://api.mch.weixin.qq.com/pay/closeorder";
//发送请求 HttpClient httpClient = new HttpClient(url); //https httpClient.setHttps(true); //提交参数 httpClient.setXmlParam(xmlParam);
//提交 httpClient.post();
//获取返回数据 String content = httpClient.getContent();
//将返回数据解析成Map return WXPayUtil.xmlToMap(content);}
复制代码
4.3 关闭订单回滚库存
4.3.1 配置延时队列
在 application.yml 文件中引入队列信息配置,如下:
#位置支付交换机和队列mq: pay: exchange: order: exchange.order queue: order: queue.order seckillorder: queue.seckillorder seckillordertimer: queue.seckillordertimer seckillordertimerdelay: queue.seckillordertimerdelay routing: orderkey: queue.order seckillorderkey: queue.seckillorder
复制代码
配置队列与交换机,在 SeckillApplication 中添加如下方法
/** * 到期数据队列 * @return */@Beanpublic Queue seckillOrderTimerQueue() { return new Queue(env.getProperty("mq.pay.queue.seckillordertimer"), true);}
/** * 超时数据队列 * @return */@Beanpublic Queue delaySeckillOrderTimerQueue() { return QueueBuilder.durable(env.getProperty("mq.pay.queue.seckillordertimerdelay")) .withArgument("x-dead-letter-exchange", env.getProperty("mq.pay.exchange.order")) // 消息超时进入死信队列,绑定死信队列交换机 .withArgument("x-dead-letter-routing-key", env.getProperty("mq.pay.queue.seckillordertimer")) // 绑定指定的routing-key .build();}
/*** * 交换机与队列绑定 * @return */@Beanpublic Binding basicBinding() { return BindingBuilder.bind(seckillOrderTimerQueue()) .to(basicExchange()) .with(env.getProperty("mq.pay.queue.seckillordertimer"));}
复制代码
4.3.2 发送延时消息
修改 MultiThreadingCreateOrder,添加如下方法:
/*** * 发送延时消息到RabbitMQ中 * @param seckillStatus */public void sendTimerMessage(SeckillStatus seckillStatus){ rabbitTemplate.convertAndSend(env.getProperty("mq.pay.queue.seckillordertimerdelay"), (Object) JSON.toJSONString(seckillStatus), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000"); return message; } });}
复制代码
在 createOrder 方法中调用上面方法,如下代码:
//发送延时消息到MQ中sendTimerMessage(seckillStatus);
复制代码
4.3.3 库存回滚
创建 SeckillOrderDelayMessageListener 实现监听消息,并回滚库存,代码如下:
@Component@RabbitListener(queues = "${mq.pay.queue.seckillordertimer}")public class SeckillOrderDelayMessageListener {
@Autowired private RedisTemplate redisTemplate;
@Autowired private SeckillOrderService seckillOrderService;
@Autowired private WeixinPayFeign weixinPayFeign;
/*** * 读取消息 * 判断Redis中是否存在对应的订单 * 如果存在,则关闭支付,再关闭订单 * @param message */ @RabbitHandler public void consumeMessage(@Payload String message){ //读取消息 SeckillStatus seckillStatus = JSON.parseObject(message,SeckillStatus.class);
//获取Redis中订单信息 String username = seckillStatus.getUsername(); SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
//如果Redis中有订单信息,说明用户未支付 if(seckillOrder!=null){ System.out.println("准备回滚---"+seckillStatus); //关闭支付 Result closeResult = weixinPayFeign.closePay(seckillStatus.getOrderId()); Map<String,String> closeMap = (Map<String, String>) closeResult.getData();
if(closeMap!=null && closeMap.get("return_code").equalsIgnoreCase("success") && closeMap.get("result_code").equalsIgnoreCase("success") ){ //关闭订单 seckillOrderService.closeOrder(username); } } }}
复制代码
点击关注,第一时间了解华为云新鲜技术~
评论