写点什么

java 培训 Redis 的库存扣减操作

作者:@零度
  • 2022 年 4 月 24 日
  • 本文字数:3571 字

    阅读完需:约 12 分钟

以下文章来源于 Java 必修指南

在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。

解决方案

  1. 使用 mysql 数据库,使用一个字段来存储库存,每次扣减库存去更新这个字段。

  2. 还是使用数据库,但是将库存分层多份存到多条记录里面,扣减库存的时候路由一下,这样子增大了并发量,但是还是避免不了大量的去访问数据库来更新库存。

  3. 将库存放到 redis 使用 redis 的 incrby 特性来扣减库存。

分析

在上面的第一种和第二种方式都是基于数据来扣减库存。


基于数据库单库存

第一种方式在所有请求都会在这里等待锁,获取锁有去扣减库存。在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。

基于数据库多库存

第二种方式其实是第一种方式的优化版本,在一定程度上提高了并发量,但是在还是会大量的对数据库做更新操作大量占用数据库资源_java培训

基于数据库来实现扣减库存还存在的一些问题:

  • 用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先 selec 在 update,这样在并发下会出现超扣的情况。如:

update number set x=x-1 where x > 0

  • MySQL 自身对于高并发的处理性能就会出现问题,一般来说,MySQL 的处理性能会随着并发 thread 上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单 thread 的性能还要差。

  • 当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢 InnoDB 行锁的问题,导致出现互相等待甚至死锁,从而大大降低 MySQL 的处理性能,最终导致前端页面出现超时异常。

基于 redis

针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用 redis 的 incrby 特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到 MQ 消息消费完了才能重启 redis 初始化库存,否则也存在库存不一致的问题。

基于 redis 实现扣减库存的具体实现

  • 我们使用 redis 的 lua 脚本来实现扣减库存

  • 由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存

  • 需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存

初始化库存回调函数(IStockCallback )

/**

* 获取库存回调

* @author yuhao.wang

*/

public interface IStockCallback {

/**

* 获取库存

* @return

*/

int getStock();

}

扣减库存服务(StockService)

/**

* 扣库存

*

* @author yuhao.wang

*/

@Service

public class StockService {

Logger logger = LoggerFactory.getLogger(StockService.class);

/**

* 不限库存

*/

public static final long UNINITIALIZED_STOCK = -3L;

/**

* Redis 客户端

*/

@Autowired

private RedisTemplate<String, Object> redisTemplate;

/**

* 执行扣库存的脚本

*/

public static final String STOCK_LUA;

static {

/**

*

* @desc 扣减库存 Lua 脚本

* 库存(stock)-1:表示不限库存

* 库存(stock)0:表示没有库存

* 库存(stock)大于 0:表示剩余库存

*

* @params 库存 key

* @return

* -3:库存未初始化

* -2:库存不足

* -1:不限库存

* 大于等于 0:剩余库存(扣减之后剩余的库存)

* redis 缓存的库存(value)是-1 表示不限库存,直接返回 1

*/

StringBuilder sb = new StringBuilder();

sb.append("if (redis.call('exists', KEYS[1]) == 1) then");

sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));");

sb.append(" local num = tonumber(ARGV[1]);");

sb.append(" if (stock == -1) then");

sb.append(" return -1;");

sb.append(" end;");

sb.append(" if (stock >= num) then");

sb.append(" return redis.call('incrby', KEYS[1], 0 - num);");

sb.append(" end;");

sb.append(" return -2;");

sb.append("end;");

sb.append("return -3;");

STOCK_LUA = sb.toString();

}

/**

* @param key 库存 key

* @param expire 库存有效时间,单位秒

* @param num 扣减数量

* @param stockCallback 初始化库存回调函数

* @return -2:库存不足; -1:不限库存; 大于等于 0:扣减库存之后的剩余库存

*/

public long stock(String key, long expire, int num, IStockCallback stockCallback) {

long stock = stock(key, num);

// 初始化库存

if (stock == UNINITIALIZED_STOCK) {

RedisLock redisLock = new RedisLock(redisTemplate, key);

try {

// 获取锁

if (redisLock.tryLock()) {

// 双重验证,避免并发时重复回源到数据库

stock = stock(key, num);

if (stock == UNINITIALIZED_STOCK) {

// 获取初始化库存

final int initStock = stockCallback.getStock();

// 将库存设置到 redis

redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);

// 调一次扣库存的操作

stock = stock(key, num);

}

}

} catch (Exception e) {

logger.error(e.getMessage(), e);

} finally {

redisLock.unlock();

}

}

return stock;

}

/**

* 加库存(还原库存)

*

* @param key 库存 key

* @param num 库存数量

* @return

*/

public long addStock(String key, int num) {

return addStock(key, null, num);

}

/**

* 加库存

*

* @param key 库存 key

* @param expire 过期时间(秒)

* @param num 库存数量

* @return

*/

public long addStock(String key, Long expire, int num) {

boolean hasKey = redisTemplate.hasKey(key);

// 判断 key 是否存在,存在就直接更新

if (hasKey) {

return redisTemplate.opsForValue().increment(key, num);

}

Assert.notNull(expire,"初始化库存失败,库存过期时间不能为 null");

RedisLock redisLock = new RedisLock(redisTemplate, key);

try {

if (redisLock.tryLock()) {

// 获取到锁后再次判断一下是否有 key

hasKey = redisTemplate.hasKey(key);

if (!hasKey) {

// 初始化库存

redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);

}

}

} catch (Exception e) {

logger.error(e.getMessage(), e);

} finally {

redisLock.unlock();

}

return num;

}

/**

* 获取库存

*

* @param key 库存 key

* @return -1:不限库存; 大于等于 0:剩余库存

*/

public int getStock(String key) {

Integer stock = (Integer) redisTemplate.opsForValue().get(key);

return stock == null ? -1 : stock;

}

/**

* 扣库存

*

* @param key 库存 key

* @param num 扣减库存数量

* @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于 0:扣减库存之后的剩余库存】

*/

private Long stock(String key, int num) {

// 脚本里的 KEYS 参数

List<String> keys = new ArrayList<>();

keys.add(key);

// 脚本里的 ARGV 参数

List<String> args = new ArrayList<>();

args.add(Integer.toString(num));

long result = redisTemplate.execute(new RedisCallback<Long>() {

@Override

public Long doInRedis(RedisConnection connection) throws DataAccessException {

Object nativeConnection = connection.getNativeConnection();

// 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行

// 集群模式

if (nativeConnection instanceof JedisCluster) {

return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);

}

// 单机模式

else if (nativeConnection instanceof Jedis) {

return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);

}

return UNINITIALIZED_STOCK;

}

});

return result;

}

}

调用

/**

* @author yuhao.wang

*/

@RestController

public class StockController {

@Autowired

private StockService stockService;

@RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)

public Object stock() {

// 商品 ID

long commodityId = 1;

// 库存 ID

String redisKey = "redis_key:stock:" + commodityId;

long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));

return stock >= 0;

}

/**

* 获取初始的库存

*

* @return

*/

private int initStock(long commodityId) {

// TODO 这里做一些初始化库存的操作

return 1000;

}

@RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)

public Object getStock() {

// 商品 ID

long commodityId = 1;

// 库存 ID

String redisKey = "redis_key:stock:" + commodityId;

return stockService.getStock(redisKey);

}

@RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)

public Object addStock() {

// 商品 ID

long commodityId = 2;

// 库存 ID

String redisKey = "redis_key:stock:" + commodityId;

return stockService.addStock(redisKey, 2);

}

}

用户头像

@零度

关注

关注尚硅谷,轻松学IT 2021.11.23 加入

IT培训 www.atguigu.com

评论

发布
暂无评论
java培训Redis的库存扣减操作_redis_@零度_InfoQ写作社区