写点什么

Redis 应用—7. 大 Value 处理方案

作者:EquatorCoco
  • 2024-12-20
    福建
  • 本文字数:10515 字

    阅读完需:约 34 分钟

1.⽅案设计



步骤一:首先需要配置一个 crontab 定时调度 shell 脚本,然后该脚本每天凌晨会通过 rdbtools⼯具解析 Redis 的 RDB⽂件,接着对解析出的内容进行过滤,把 RDB⽂件中的⼤key 导出到 CSV⽂件。

 

步骤二:使⽤SQL 导⼊CSV⽂件到 MySQL 数据库中,同时使⽤Canal 监听 MySQL 的 binlog⽇志。

 

步骤三:Canal 会发送增量的大 key 数据消息到 RocketMQ,RocketMQ 的消费者系统会对增量的大 key 数据消息进⾏消费,消息中便会包含⼤key 的详情信息。这样消费者就可以将⼤key 的信息通过邮件等⽅式,通知开发⼈员。

 

为什么要把⼤key 的 CSV⽂件导⼊到 MySQL 存储?为什么不直接监听⼤key 的 CSV⽂件进⾏通知?

 

原因一:如果不导⼊MySQL,那么就⽆法使⽤Canal 来监听。这样就要开发⼀个程序,定时去扫描 Redis 节点下解析出来的 CSV⽂件。如果 Redis 集群中有多个节点,那么每⼀个节点都要去扫描。⽽将 CSV 导⼊到 MySQL 后,只需要使⽤Canal 去监听 MySQL 表的 binlog,就可以把增量数据同步到 RocketMQ 中,由消费者统⼀进⾏处理。

 

原因二:解析 CSV⽂件⽐直接从 MySQL 中查询复杂很多,尤其是需要进行信息过滤。导⼊到 MySQL 后可以通过 SQL 轻松的对⼤key 的记录进⾏条件筛选,并且可以对每天产⽣的⼤key 数据进⾏存储分析。

 

RDB 解析⽣成的 CSV⽂件结构如下:


database,type,key,size_in_bytes,encoding,num_elements,len_largest_element, expiry 0,string,key1-string,20536,string,17280,17280, 0,list,key1-list,4006,quicklist,24,1530,
复制代码


2.安装与配置环境


(1)依赖环境


Python3、pip3、rdb-tools、Redis、MySQL、JDK、RocketMQ、Canal。

 

rdb-tools 是开源的⼀个 python 项⽬,它可以⽤来解析 Redis 的 RDB⽂件,但是要先安装 Python 环境。


连接地址:https://github.com/sripathikrishnan/redis-rdb-tools
复制代码


pip 是 Python 的包管理⼯具,安装 Python 后,这个⼯具就会配套安装好。


(2)安装 Python3 & pip3


# 安装编译⼯具$ yum -y groupinstall "Development tools"$ yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel $ yum install libffi-devel -y# 下载python3.7.0$ cd /usr/local$ wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tar.xz$ tar -xvJf Python-3.7.0.tar.xz# 编译$ mkdir /usr/local/python3$ cd Python-3.7.0$ ./configure --prefix=/usr/local/python3$ make && make install
复制代码


(3)安装 rdb-tools


# 使⽤pip包管理程序安装rdb-tools$ pip3 install rdbtools python-lzf# 配置环境变量$ vim /etc/profile# 在⽂件底部最末尾,追加如下两⾏内容PATH=/usr/local/python3/bin:$PATHexport PATH
复制代码


验证安装:


# 验证rdbtools是否安装成功$ rdb -h
复制代码


(4)安装 RocketMQ


一.下载安装包


# 下载安装包,要注意版本与项⽬中依赖的RocketMQ版本兼容$ wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
复制代码


二.修改默认配置


# 解压$ unzip rocketmq-all-4.7.1-bin-release.zip# 切换⽬录$ cd /usr/local/rocketmq-all-4.7.1-bin-release# 修改nameserver默认堆栈⼤⼩$ vim ./bin/runserver.sh# 修改brokerserver默认堆栈⼤⼩$ vim bin/runbroker.sh
复制代码


NameServer 默认配置如下:


 -server -Xms4g -Xmx4g -Xmn2g  -XX:MetaspacesSize=128m -XX:MaxMetaspaceSize=320m
复制代码


BrokerServer 默认配置如下:


 -server -Xms8g -Xmx8g -Xmn4g
复制代码


三.修改 BrokerServer 的 IP 地址


$ vim /usr/local/rocketmq-all-4.7.1-bin-release/conf/broker.conf
复制代码


在 broker.conf⽂件中追加如下内容:


# brokerserver所在机器的公⽹IP地址 brokerIP1=192.168.95.129
复制代码


四.启动 RocketMQ


# 启动nameserver$ nohup sh ./bin/mqnamesrv &# 查看nameserver启动⽇志$ tailf ~/logs/rocketmqlogs/namesrv.log# 启动brokerserver$ nohup sh bin/mqbroker -n 127.0.0.1:9876 -c conf/broker.conf &# 查看brokerserver启动⽇志$ tailf ~/logs/rocketmqlogs/broker.log
复制代码


五.启动 RocketMQ 控制台


github地址:https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0
复制代码


可以从 github 上 clone 下载,然后使⽤maven 命令打包,然后如下启动:


$ nohup java -jar -server -Xms256m -Xmx256m \-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Dserver.port=8080 \/usr/local/rocketmq-console-ng-1.0.1.jar &
复制代码


(5)安装 Canal


一.下载安装包


# 下载canal-admin$ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz# 下载canal-deployer$ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
复制代码


二.安装 canal-admin


# 创建解压⽬录$ mkdir /usr/local/canal-admin# 解压$ tar -zxvf canal.admin-1.1.5.tar.gz -c /usr/local/canal-admin
复制代码


目录结构如下:



三.初始化 Canal 数据库


执⾏conf⽬录下的 canal_manager.sql⽂件


# 执⾏conf⽬录下的canal_manager.sql⽂件$ cd /usr/local/canal-admin/conf$ mysql -u⽤户名 -p密码 -hIP地址 -P端⼝号 < canal_manager.sql
复制代码


canal_manger.sql⽂件内容如下:


CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULTCHARACTER SET utf8 COLLATE utf8_bin */;USE `canal_manager`;SET NAMES utf8;SET FOREIGN_KEY_CHECKS = 0;-- ------------------------------ Table structure for canal_adapter_config-- ----------------------------DROP TABLE IF EXISTS `canal_adapter_config`;CREATE TABLE `canal_adapter_config` (    `id` bigint(20) NOT NULL AUTO_INCREMENT,    `category` varchar(45) NOT NULL,    `name` varchar(45) NOT NULL,    `status` varchar(45) DEFAULT NULL,    `content` text NOT NULL,    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE    CURRENT_TIMESTAMP,    PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Table structure for canal_cluster-- ----------------------------DROP TABLE IF EXISTS `canal_cluster`;CREATE TABLE `canal_cluster` (    `id` bigint(20) NOT NULL AUTO_INCREMENT,    `name` varchar(63) NOT NULL,    `zk_hosts` varchar(255) NOT NULL,    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE    CURRENT_TIMESTAMP,    PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Table structure for canal_config-- ----------------------------DROP TABLE IF EXISTS `canal_config`;CREATE TABLE `canal_config` (    `id` bigint(20) NOT NULL AUTO_INCREMENT,    `cluster_id` bigint(20) DEFAULT NULL,    `server_id` bigint(20) DEFAULT NULL,    `name` varchar(45) NOT NULL,    `status` varchar(45) DEFAULT NULL,    `content` text NOT NULL,    `content_md5` varchar(128) NOT NULL,    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE    CURRENT_TIMESTAMP,    PRIMARY KEY (`id`),    UNIQUE KEY `sid_UNIQUE` (`server_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Table structure for canal_instance_config-- ----------------------------DROP TABLE IF EXISTS `canal_instance_config`;CREATE TABLE `canal_instance_config` (    `id` bigint(20) NOT NULL AUTO_INCREMENT,    `cluster_id` bigint(20) DEFAULT NULL,    `server_id` bigint(20) DEFAULT NULL,    `name` varchar(45) NOT NULL,    `status` varchar(45) DEFAULT NULL,    `content` text NOT NULL,    `content_md5` varchar(128) DEFAULT NULL,    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE    CURRENT_TIMESTAMP,    PRIMARY KEY (`id`),    UNIQUE KEY `name_UNIQUE` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Table structure for canal_node_server-- ----------------------------DROP TABLE IF EXISTS `canal_node_server`;CREATE TABLE `canal_node_server` (    `id` bigint(20) NOT NULL AUTO_INCREMENT,    `cluster_id` bigint(20) DEFAULT NULL,    `name` varchar(63) NOT NULL,    `ip` varchar(63) NOT NULL,    `admin_port` int(11) DEFAULT NULL,    `tcp_port` int(11) DEFAULT NULL,    `metric_port` int(11) DEFAULT NULL,    `status` varchar(45) DEFAULT NULL,    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE    CURRENT_TIMESTAMP,    PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Table structure for canal_user-- ----------------------------DROP TABLE IF EXISTS `canal_user`;CREATE TABLE `canal_user` (    `id` bigint(20) NOT NULL AUTO_INCREMENT,    `username` varchar(31) NOT NULL,    `password` varchar(128) NOT NULL,    `name` varchar(31) NOT NULL,    `roles` varchar(31) NOT NULL,    `introduction` varchar(255) DEFAULT NULL,    `avatar` varchar(255) DEFAULT NULL,    `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE    CURRENT_TIMESTAMP,    PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;SET FOREIGN_KEY_CHECKS = 1;-- ------------------------------ Records of canal_user-- ----------------------------BEGIN;INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');COMMIT;SET FOREIGN_KEY_CHECKS = 1;     
复制代码


四.修改 conf⽬录下的 application.yml⽂件


# 修改conf⽬录下的application.yml⽂件$ vim /usr/local/canal-admin/conf/application.yml
复制代码


application.yml⽂件内容如下:


server:  port: 8089spring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8
spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1
canal: adminUser: admin adminPasswd: admin
复制代码


五.启动 canal-admin


$ cd /usr/local/canal-admin $ bin/startup.sh
复制代码


然后访问 Canal 管理控制台:http://ip:8089,⽤户名和密码分别是:admin | 123456。

 

此时登录进⼊后,会发现⽬前什么数据都没有。但这没有关系,接着会启动 canal-server,因为要⽤canal-admin 来管理每个 canal-server 的实例。

 

采⽤canal-admin 来管理 canal-server:当 canal-server 启动时,canal-server 是会⾃动注册到 canal-admin 上的。

 

六.关闭 canal-admin


$ cd /usr/local/canal.admin-1.1.5$ bin/stop.sh
复制代码


注意:关闭时,请不要使⽤kill 进程号的⽅式来关闭,而使⽤执⾏脚本的⽅式关闭。因为如果 kill 进程后,下次再次执⾏启动脚本时,会出现 found admin.pid , Please run stop.sh first ,then startup.sh 的提示。当然,出现这种情况的时候,可以到 bin⽬录下将 admin.pid 删除掉。

 

七.安装 canal-server


# 创建解压⽬录$ mkdir /usr/local/canal-server# 解压$ tar -zxvf canal.deployer-1.1.5.tar.gz -c /usr/local/canal-server
复制代码


目录结构如下:



八.修改 conf⽬录下的 canal_local.properties


# 修改conf⽬录下的canal_local.properties⽂件$ vim /usr/local/canal-server/conf/canal_local.properties
复制代码


canal_local.properties⽂件内容如下:


# register ip 这⾥的ip选择您本机的ip(也就是启动canal-server机器的所在ip地址) canal.register.ip = 192.168.95.129 
# canal admin config 这⾥是部署canal-admin的所在机器的ip,当然也可以把canal-admin和canal-server部署到⼀台机器 canal.admin.manager = 192.168.95.129:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # admin auto register 这⾥⼀定得是true,否则⽆法在启动canal-server时候注册到canal-admin上 canal.admin.register.auto = true canal.admin.register.cluster = # canal-server注册到canal-admin控制台的名称,这⾥⽤canal-server的ip地址,注意改ip canal.admin.register.name = 192.168.95.129
复制代码


九.启动 canal-server


$ cd /usr/local/canal-server# 切记后⾯加⼊local的参数$ bin/startup.sh local
复制代码


这时可以到 canal-admin 的界面中查看 canal-server 是否已经注册成功。

 

十.在 canal-admin 中配置 Canal Instance 来监听数据库


⾸先点击界面右侧的 Instance 管理,再点击新建 Instance 来创建实例。


################################################### mysql serverId , v1.0.26+ will autoGen# canal.instance.mysql.slaveId=0# enable gtid use true/falsecanal.instance.gtidon=false 
# position info# 需要修改# 需要监听数据库的ip加端⼝canal.instance.master.address=192.168.95.129:3306
# mysql主库连接时起始的binlog⽂件,这⾥可以不写,默认为mysql-bincanal.instance.master.journal.name=
# binlog⽇志的位置canal.instance.master.position=
# 开始同步binlog⽇志的时间戳,也就是从哪个时间点开始同步binlog⽇志,13位时间戳格式canal.instance.master.timestamp=
# 如果数据库开启了gtid模式,这⾥填写master节点的gtid我们这⾥不写也是可以的,如果要开启,记得将canal.instance.gtidon改为true canal.instance.master.gtid=
# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=
# table meta tsdb infocanal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=
# 需要修改# username/password这⾥填写要连接数据库的⽤户名和密码canal.instance.dbUsername=root
# 需要修改# canal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8
# enable druid Decrypt database passwordcanal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# 需要修改# table regex,这⾥填写订阅的数据库的库与表的相关正则表达式canal.instance.filter.regex=careerplan_eshop_redis.redis_large_key_log
# table black regexcanal.instance.filter.black.regex=
# table field filter(format:schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format:schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# 需要修改 # MQ Config,这⾥填写实例名称即可# 如果canal.serverMode选择的不是tcp模式,这⾥填写相关的topic的名称,kafka和rocketmq默认的主题为example# 同时需要注意,如果期望使⽤的canal-server的⼯作模式是MQ的⽅式来运⾏,那么需要修改canal.properties的配置canal.mq.topic=binlog_monitor_large_key_topic
# dynamic topic route by schema or table regexcanal.mq.partition=0
# hash partition config#canal.mq.partitionsNum=3#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################
复制代码


接着点击保存,就会跳转到列表页,然后在新创建的 canal-instance 后⾯点击启动,接着点击操作⽇志去查看相关⽇志。⾄此,⼀个 canal-instance 就启动成功了。

 

十一.关闭 canal-server


$ cd /usr/local/canal-server$ bin/stop.sh
复制代码


(6)rdbtools 扫描 RDB⽂件


# 切换⽬录[root@localhost bin]# cd /usr/local/redis/bin
# 在这个⽬录下存放着Redis的rdb⽂件[root@localhost bin]# lsdump.rdb redis-benchmark redis-check-aof redis-check-rdb redis-cli redis-sentinel redis-server
# 使⽤rdbtools⼯具过滤dump.rdb⽂件中的⼤key,⽣成dump.csv⽂件[root@localhost bin]# rdb -c memory dump.rdb --bytes 10240 -f dump.csv# 可以看到⽣成的dump.csv⽂件
[root@localhost bin]# lsdump.csv dump.rdb redis-benchmark redis-check-aof redis-check-rdbredis-cli redis-sentinel redis-server
# 查看dump.csv⽂件的内容[root@localhost bin]# vim dump.csv
复制代码


rdb 参数说明:


rdb -c memory dump.rdb --bytes 10240 -f dump.csv
复制代码


dump.rdb 是指定 Redis 的 rdb⽂件的路径,--bytes 10240 表示过滤出 key 值⼤⼩超过 10240B 的 key,也就是 10K。

 

dump.csv⽂件内容如下:


database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry 0,string,key1-string,20536,string,17280,17280,
复制代码


(7)将 CSV⽂件导⼊MySQL


一.先查看 secure_file_priv 属性是否开启


secure_file_priv 属性指定导⼊⽂件的位置,只有在该属性指定的⽬录下的⽂件才可以导⼊MySQL。


mysql> show variables like '%secure%'; +--------------------------+------------------------------------------------+ | Variable_name            | Value                                          |+--------------------------+------------------------------------------------+ | require_secure_transport | OFF                                            || secure_auth              | ON                                             | | secure_file_priv         | ...                                            | +--------------------------+------------------------------------------------+
复制代码


二.修改 MySQL 配置⽂件⽂件


# 找到mysqld⽂件的位置[root@localhost bin]# find / -name "mysqld"/run/mysqld/usr/sbin/mysqld
# 找到mysql的默认配置⽂件位置[root@localhost bin]# /usr/sbin/mysqld --verbose --help |grep -A 1'Default options'Default options are read from the following files in the given order:/etc/my.cnf /etc/mysql/my.cnf /usr/etc/my.cnf ~/.my.cn
# 修改mysql的默认配置⽂件[root@localhost etc]# vim /etc/my.cnf
复制代码


在[mysqld]模块下,如果存在下列属性就修改,如果不存在就追加


[mysqld]# 关闭安全⽂件导⼊路径secure-file-priv=""# 开启binloglog-bin=mysql-binbinlog-format=ROWserver_id=1 
复制代码


三.重启 MySQL 服务


# 检查mysql服务运⾏状态$ service mysqld status# 重启服务$ service mysqld restart
复制代码


重启服务后重新连接 MySQL,查看 secure_file_priv 和 log_bin 的值:


mysql> show variables like '%secure%';+--------------------------+-------+ | Variable_name            | Value |+--------------------------+-------+| require_secure_transport | OFF   || secure_auth              | ON    || secure_file_priv         |       |+--------------------------+-------+
mysql> show variables like 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin | ON |+---------------+-------+
复制代码


四.创建 redis_large_key_log 表


create table redis_large_key_log (    `id` bigint primary key auto_increment,    `database` tinyint comment 'Redis数据库索引',    `type` varchar(20) comment 'Redis数据类型',    `key` varchar(256) comment 'Redis key',    `size_in_bytes` int comment 'value对于的bytes',    `encoding` varchar(30) comment '编码',    `num_elements` int comment '元素数量',    `len_largest_element` int comment '元素⻓度',    `expiry` varchar(30) comment '过期时间',    `create_time` datetime DEFAULT CURRENT_TIMESTAMP,    `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE    CURRENT_TIMESTAMP) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
复制代码


五.编写导⼊csv⽂件的 SQL 脚本


$ cd /usr/local/redis/bin/# 创建SQL脚本⽂件 $ touch csv-transfer-db.sql# 编辑SQL脚本vim csv-transfer-db.sql
复制代码


csv-transfer-db.sql⽂件内容如下:


# 指定数据库USE `careerplan_eshop_redis`;load data infile '/usr/local/redis/bin/dump.csv' into table redis_large_key_log fields terminated by',' lines terminated by'\n' ignore 1 lines (`database`,`type`,`key`,size_in_bytes,encoding,num_elements,len_largest_element,expiry);
复制代码


六.编写定时任务脚本


脚本职能:


调⽤rdbtools⼯具,扫描⼤key,dump 出 csv⽂件。


调⽤SQL 脚本,将 csv⽂件导⼊数据库。


$ touch monitor-large-key-to-db.sh $ vim monitor-large-key-to-db.sh
复制代码


monitor-large-key-to-db.sh⽂件内容如下:


# crontab没有环境变量给你运⾏,所以要在shell开头⼿动添加环境source /etc/profile. ~/.bash_profile#!/bin/bashecho "开始执⾏monitor-large-key-to-db.sh脚本" >> /usr/local/redis/monitor-large-key-log.txtrdb -c memory /usr/local/redis/bin/dump.rdb --bytes 102400 -f /usr/local/redis/bin/dump.csvecho "扫描redis过滤出⼤key,⼤key数据保存到/usr/local/redis/bin/dump.csv⽂件" >> /usr/local/redis/monitor-large-key-log.txtmysql -u⽤户名 -p密码 -hIP地址 -P端⼝号 < /usr/local/redis/bin/csv-transfer-db.sqlecho "csv⽂件数据已导⼊mysql" >> /usr/local/redis/monitor-large-key-log.txt
复制代码


七.创建调度任务


$ crontab -e# 每天凌晨3点进⾏⼀次调度,将会扫描rdb⽂件,将⼤key存储到MySQL0 3 * * * sh /usr/local/redis/bin/monitor-large-key-to-db.sh
复制代码


(8)binlog 数据消费者


一.接⼝说明


消费 redis_large_key_log 表的 binlog 数据,该数据包含 Redis 的⼤key 信息。

 

二.代码位置


com.demo.eshop.monitor.mq.consumer.ConsumerBeanConfig#receiveLargeKeyMonitorConsumer
复制代码


具体实现如下:


@Configurationpublic class ConsumerBeanConfig {    //配置内容对象    @Autowired    private RocketMQProperties rocketMQProperties;
//Redis大key binlog 消费者 @Bean("cookbookLargeKeyMonitorTopic") public DefaultMQPushConsumer receiveLargeKeyMonitorConsumer(CookbookLargeKeyMonitorListener cookbookLargeKeyMonitorListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.BINLOG_MONITOR_LARGE_KEY_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.BINLOG_MONITOR_LARGE_KEY_TOPIC, "*"); consumer.registerMessageListener(cookbookLargeKeyMonitorListener); consumer.start(); return consumer; } ...}
复制代码


三.参数说明


CookbookLargeKeyMonitorListener 表示针对 BINLOG_MONITOR_LARGE_KEY_GROUP 的 Listener,它会监听 Canal 推送的 BINLOG_MONITOR_LARGE_KEY_TOPIC 消息,然后对消息解析,通过邮件、钉钉等推送给开发⼈员。具体实现如下:


@Componentpublic class CookbookLargeKeyMonitorListener implements MessageListenerConcurrently {    @Override    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {        try {            for (MessageExt messageExt : list) {                String msg = new String(messageExt.getBody());                // 解析binlog数据模型                BinlogDataDTO binlogData = BinlogUtils.getBinlogData(msg);                log.info("消费到binlog消息, binlogData: {}", binlogData);                // 推送通知                informByPush(binlogData);            }        } catch (Exception e) {            // 本次消费失败,下次重新消费            return ConsumeConcurrentlyStatus.RECONSUME_LATER;        }        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }
//第三方平台推送消息到app private void informByPush(BinlogDataDTO binlogData) { log.info("消息推送中:消息内容:{}", binlogData); }}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18618040

体验地址:http://www.jnpfsoft.com/?from=infoq

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Redis应用—7.大Value处理方案_数据库_EquatorCoco_InfoQ写作社区