写点什么

Reactive Spring 实战 -- 响应式 Redis 交互

用户头像
binecy
关注
发布于: 2021 年 01 月 31 日

本文分享 Reactive Spring 中如何实现响应式​Redis 交互模式。


本文将模拟一个用户服务,并使用 Redis 作为数据存储服务器。

本文涉及两个 java bean,用户与权益

public class User {    private long id;    private String name;    // 标签    private String label;    // 收货地址经度    private Double deliveryAddressLon;    // 收货地址维度    private Double deliveryAddressLat;    // 最新签到日    private String lastSigninDay;    // 积分    private Integer score;    // 权益    private List<Rights> rights;    ...}
public class Rights { private Long id; private Long userId; private String name; ...}
复制代码


启动

引入依赖

    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-data-redis-reactive</artifactId>    </dependency>
复制代码


添加 Redis 配置

spring.redis.host=192.168.56.102spring.redis.port=6379spring.redis.password=spring.redis.timeout=5000
复制代码


SpringBoot 启动

@SpringBootApplicationpublic class UserServiceReactive {    public static void main(String[] args) {        new SpringApplicationBuilder(                UserServiceReactive.class)                .web(WebApplicationType.REACTIVE).run(args);    }}
复制代码


应用启动后,Spring 会自动生成 ReactiveRedisTemplate(它的底层框架是 Lettuce)。

ReactiveRedisTemplate 与 RedisTemplate 使用类似,但它提供的是异步的,响应式 Redis 交互方式。

这里再强调一下,响应式编程是异步的,ReactiveRedisTemplate 发送 Redis 请求后不会阻塞线程,当前线程可以去执行其他任务。

等到 Redis 响应数据返回后,ReactiveRedisTemplate 再调度线程处理响应数据。

响应式编程可以通过优雅的方式实现异步调用以及处理异步结果,正是它的最大的意义。


序列化

ReactiveRedisTemplate 默认使用的序列化是 Jdk 序列化,我们可以配置为 json 序列化

@Beanpublic RedisSerializationContext redisSerializationContext() {    RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();    builder.key(StringRedisSerializer.UTF_8);    builder.value(RedisSerializer.json());    builder.hashKey(StringRedisSerializer.UTF_8);    builder.hashValue(StringRedisSerializer.UTF_8);
return builder.build();}
@Beanpublic ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { RedisSerializationContext serializationContext = redisSerializationContext(); ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext); return reactiveRedisTemplate;}
复制代码

builder.hashValue 方法指定 Redis 列表值的序列化方式,由于本文 Redis 列表值只存放字符串,所以还是设置为 StringRedisSerializer.UTF_8。


基本数据类型

ReactiveRedisTemplate 支持 Redis 字符串,散列,列表,集合,有序集合等基本的数据类型。

本文使用散列保存用户信息,列表保存用户权益,其他基本数据类型的使用本文不展开。

public Mono<Boolean>  save(User user) {    ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));    if(user.getRights() != null) {        ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {            logger.info("add rights:{}", l);        });    }    return userRs;}
复制代码

beanToMap 方法负责将 User 类转化为 map。


HyperLogLog

Redis HyperLogLog 结构可以统计一个集合内不同元素的数量。

使用 HyperLogLog 统计每天登录的用户量

public Mono<Long>  login(User user) {    ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());}
复制代码


BitMap

Redis BitMap(位图)通过一个 Bit 位表示某个元素对应的值或者状态。由于 Bit 是计算机存储中最小的单位,使用它进行储存将非常节省空间。

使用 BitMap 记录用户本周是否有签到

public void addSignInFlag(long userId) {    String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);    redisTemplate.opsForValue().setBit(            key, userId & 0xffff , true)    .subscribe(b -> logger.info("set:{},result:{}", key, b));}
复制代码

userId 高 48 位用于将用户划分到不同的 key,低 16 位作为位图偏移参数 offset。

offset 参数必须大于或等于 0,小于 2^32(bit 映射被限制在 512 MB 之内)。


Geo

Redis Geo 可以存储地理位置信息,并对地理位置进行计算。

如查找给定范围内的仓库信息

public Flux getWarehouseInDist(User u, double dist) {    ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);    RedisGeoCommands.GeoRadiusCommandArgs args =            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();    return geo.radius("warehouse:address", circle, args);}
复制代码

warehouse:address这个集合中需要先保存好仓库地理位置信息。

ReactiveGeoOperations#radius 方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。


Lua

ReactiveRedisTemplate 也可以执行 Lua 脚本。

下面通过 Lua 脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加 1,如果用户今天已签到,则拒接操作。

public Flux<String> addScore(long userId) {    DefaultRedisScript<String> script = new DefaultRedisScript<>();    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));    List<String> keys = new ArrayList<>();    keys.add(String.valueOf(userId));    keys.add(LocalDateTime.now().toString().substring(0, 10));    return redisTemplate.execute(script, keys);}
复制代码


signin.lua 内容如下

local score=redis.call('hget','user:'..KEYS[1],'score')local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay')if(day==KEYS[2])    then    return '0'else    redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2])    return '1'end
复制代码


Stream

Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。


Redis 借鉴了 kafka 的设计,一个 Stream 内可以存在多个消费组,一个消费组内可以存在多个消费者。

如果一个消费组内某个消费者消费了 Stream 中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。


下面定义一个 Stream 消费者,负责处理接收到的权益数据

@Componentpublic class RightsStreamConsumer implements ApplicationRunner, DisposableBean {    private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);
@Autowired private RedisConnectionFactory redisConnectionFactory;
private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container; // Stream队列 private static final String STREAM_KEY = "stream:user:rights"; // 消费组 private static final String STREAM_GROUP = "user-service"; // 消费者 private static final String STREAM_CONSUMER = "consumer-1";
@Autowired @Qualifier("reactiveRedisTemplate") private ReactiveRedisTemplate redisTemplate;
public void run(ApplicationArguments args) throws Exception {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(100) //一批次拉取的最大count数 .executor(Executors.newSingleThreadExecutor()) //线程池 .pollTimeout(Duration.ZERO) //阻塞式轮询 .targetType(Rights.class) //目标类型(消息内容的类型) .build(); // 创建一个消息监听容器 container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
// prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP) .subscribe(stream -> { // 为Stream创建一个消费者,并绑定处理类 container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()), new StreamMessageListener()); container.start(); }); }
@Override public void destroy() throws Exception { container.stop(); }
// 查找Stream信息,如果不存在,则创建Stream private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) { // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。 return ops.info(stream).onErrorResume(err -> { logger.warn("query stream err:{}", err.getMessage()); // createGroup方法创建Stream return ops.createGroup(stream, group).flatMap(s -> ops.info(stream)); }); }
// 消息处理对象 class StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> { public void onMessage(ObjectRecord<String, Rights> message) { // 处理消息 RecordId id = message.getId(); Rights rights = message.getValue(); logger.info("receive id:{},rights:{}", id, rights); redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> { logger.info("add rights:{}", l); }); } }}
复制代码


下面看一下如何发送信息


public Mono<RecordId> addRights(Rights r) {    String streamKey = "stream:user:rights";//stream key    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);    return mono;}
复制代码

创建一个消息记录对象 ObjectRecord,并通过 ReactiveStreamOperations 发送信息记录。


Sentinel、Cluster

ReactiveRedisTemplate 也支持 Redis Sentinel、Cluster 集群模式,只需要调整配置即可。

Sentinel 配置如下

spring.redis.sentinel.master=mymasterspring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379spring.redis.sentinel.password=
复制代码

spring.redis.sentinel.nodes配置的是 Sentinel 节点 IP 地址和端口,不是 Redis 实例节点 IP 地址和端口。


Cluster 配置如下

spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379spring.redis.lettuce.cluster.refresh.period=10000spring.redis.lettuce.cluster.refresh.adaptive=true
复制代码

如 Redis Cluster 中 node2 是 node1 的从节点,Lettuce 中会缓存该信息,当 node1 宕机后,Redis Cluster 会将 node2 升级为主节点。但 Lettuce 不会自动将请求切换到 node2,因为它的缓冲没有刷新。

开启spring.redis.lettuce.cluster.refresh.adaptive配置,Lettuce 可以定时刷新 Redis Cluster 集群缓存信息,动态改变客户端的节点情况,完成故障转移。


暂时未发现 ReactiveRedisTemplate 实现 pipeline,事务的方案。


官方文档:https://docs.spring.io/spring-data/redis/docs/current/reference/html/#redis:reactive

文章完整代码:https://gitee.com/binecy/bin-springreactive/tree/master/user-service


如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力!


发布于: 2021 年 01 月 31 日阅读数: 26
用户头像

binecy

关注

还未添加个人签名 2020.08.26 加入

还未添加个人简介

评论

发布
暂无评论
Reactive Spring实战 -- 响应式Redis交互