/**
* 消息监听
* <p>
* 可以传入多个 MessageListenerAdapter
*/ @Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 监听所有库的key过期事件 container.setConnectionFactory(connectionFactory); // 可以添加多个 messageListener,配置不同的通道 container.addMessageListener(messageListenerAdapter, new PatternTopic("user"));
return container;
}
复制代码
所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(“user”),表示发布的主题信息
@Value("${spring.redis.database}")
public String redisDatabaseIndex;
复制代码
先拿到我们项目中使用的 Redis 的库索引
// 监听当前库的key过期 container.addMessageListener(messageListenerAdapter, new PatternTopic("__keyevent@" + redisDatabaseIndex + "__:expired"));
复制代码
然后使用发布/订阅模式,订阅主题为:keyevent@0:expired 的消息,则表示订阅数据库索引为 0 的 key 过期事件,监听所有的库则为:keyevent@*:expired
/**
* 消息监听器适配器,绑定消息处理器
* <p>
* 可以配置多个 listenerAdapter,监听不同的通道
*/ @Bean
MessageListenerAdapter listenerAdapter(RedisMessageListener receiver) {
return new MessageListenerAdapter(receiver, "onMessage");
}
复制代码
也就是说,当我们订阅的频道,当有消息进来时,指定它的处理类以及处理方法
三、注入消息处理器
上面我们已经注入了 RedisMessageListener 消息处理器,并指定了处理方法 onMessage(),代码如下:
package com.zyxx.common.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component; /**
* Redis 消息接收
*
* @Author Lizhou
**/ @Slf4j
@Component
public class RedisMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) { // 接收的topic log.info("channel:" + new String(pattern)); // 消息的POJO log.info("message:" + message.toString());
}
}
复制代码
需要实现 MessageListener 接口,重写 onMessage() 方法,然后就可以获取到通道以及消息了,从而进行我们的一些业务逻辑处理
四、操作 API
在 RedisUtils 中,我们增加一个操作方法
/**
* 向通道发布消息
*/ public boolean convertAndSend(String channel, Object message) {
if (StringUtils.isBlank(channel)) {
return false;
}
try {
## 总结:绘上一张Kakfa架构思维大纲脑图(xmind)
![image](https://static001.geekbang.org/infoq/41/41200c51356163fab4609dafbcf19f67.jpeg)
其实关于Kafka,能问的问题实在是太多了,扒了几天,最终筛选出44问:基础篇17问、进阶篇15问、高级篇12问,个个直戳痛点,不知道如果你不着急看答案,又能答出几个呢?
若是对Kafka的知识还回忆不起来,不妨先看我手绘的知识总结脑图(xmind不能上传,文章里用的是图片版)进行整体架构的梳理
**[CodeChina开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频】](https://ali1024.coding.net/public/P7/Java/git)**
梳理了知识,刷完了面试,如若你还想进一步的深入学习解读kafka以及源码,那么接下来的这份《手写“kafka”》将会是个不错的选择。
* Kafka入门
* 为什么选择Kafka
* Kafka的安装、管理和配置
* Kafka的集群
* 第一个Kafka程序
* Kafka的生产者
* Kafka的消费者
* 深入理解Kafka
* 可靠的数据传递
* Spring和Kafka的整合
* SpringBoot和Kafka的整合
* Kafka实战之削峰填谷
* 数据管道和流式处理(了解即可)
![image](https://static001.geekbang.org/infoq/35/35179b11d194aa72ab751ca02e0cac4d.png)
![image](https://static001.geekbang.org/infoq/59/590a9b0b2087f29418154626c33a3141.png)
复制代码
评论