前言
2020 年 4 月 30 日,Redis 6.0.0 正式发布,标志着 redis 从此告别单线程。在此之前,在大数据生产环境中使用的是一个 30 个节点的 Codis 集群,SparkStreaming 以此作为缓存,QPS 高峰大概在 2000w/s。
因为 Codis 不再更新迭代,于是在 Redis 6.0.6 版本发布的时候搭建了 Redis Cluster,新的应用将不再使用 Codis。之前连接 Codis 使用的 Java 客户端是 Jedis,通过 Pipeline 方式批次执行命令,以此来提高效率。而 Redis Cluster 的客户端 JedisCluster 没有提供 Pipeline 方式,只能单条执行命令,于是开始考虑其他的 Java 客户端。
这里备选了两个客户端:lettuce 和 Redisson
pipeline 原理
这里先说一下 Jedis 的 pipeline 的原理。通过 pipeline 对 redis 的所有操作命令,都会先放到一个 List 中,当 pipeline 直接执行或者通过 jedis.close()调用 sync()的时候,所有的命令都会一次性地发送到客户端,并且每个操作命令返回一个 response,通过 get 来获取操作结果。
lettuce
lettuce 提供了 async 异步方式来实现 pipeline 的功能,来测试一下是否可按批次处理命令。
测试代码:
public static void main(String[] args) throws Exception {
RedisURI uri = RedisURI.builder()
.withHost("47.102.xxx.xxx")
.withPassword("Redis6.0.6".toCharArray())
.withPort(10001)
.build();
RedisClusterClient client = RedisClusterClient.create(uri);
StatefulRedisClusterConnection<String, String> connect = client.connect();
RedisAdvancedClusterAsyncCommands<String, String> async = connect.async();
// 断点1
async.set("key1", "v1");
Thread.sleep(1000 * 3);
// 断点2
async.set("key2", "v2");
// 断点3
async.flushCommands();
Thread.sleep(1000 * 3);
connect.close();
client.shutdown();
}
复制代码
在程序中设置三个断点。如果是 pipeline 的话,只有执行完断点 3,两条 set 命令才会执行。运行结果:
结果表明还未到 flushCommands(),第一个 set 命令已经执行。到这你可能就会以为 lettuce 其实还是逐条命令执行,只是开启了异步请求模式。其实不然,在 lettuce 异步操作中,默认开启了命令自动刷新功能,所以给你的假象还是逐条执行,在此需要禁用自动刷新来开启 pipeline 功能。
在 set()之前加上一行代码:
async.setAutoFlushCommands(false);
复制代码
运行结果:
Redisson
redisson 提供了 batch 来实现 pipeline 的功能。
测试代码:
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://47.102.219.86:10001")
.setPassword("Redis@6.0.6");
RedissonClient redisson = Redisson.create(config);
RBatch batch = redisson.createBatch();
String key = "test";
for (int i = 1; i < 3; i++) {
batch.getMap(key + i).putAsync(String.valueOf(i), String.valueOf(i));
}
// 打上断点
batch.execute();
redisson.shutdown();
复制代码
这里我们在 execute()处打上断点,debug 运行程序。运行结果:
结果表明 Redisson 会将命令放在一个 batch 中,当执行 execute()时,会将命令一次性发送到 redis 执行。虽然 Redisson 实现了 pipeline 的功能,但是我最后还是放弃了它。原因很简单,它的方法不像 jedis 和 lettuce 一样简单明了,和 redis 的操作命令相差太多,导致使用起来比较繁琐。
Jedis Cluster Pipeline
原因
开头也提到了,Jedis 对 Redis Cluster 提供了 JedisCluster 客户端,但是没有 Pipeline 模式,那么 JedisCluster 为什么不支持 Pipeline?
在 redis 中一共有 16384 个 Slot,每个节点负责一部分 Slot,当对 Key 进行操作时,redis 会通过 CRC16 计算出 key 对应的 Slot,将 Key 映射到 Slot 所在节点上执行操作。
因为不同 Key 映射的节点不同,所以 JedisCluster 需要持有 Redis Cluster 每个节点的连接才能执行操作,而 Pipeline 是面向于一个 redis 连接的执行模式,所以 JedisCluster 无法支持 Pipeline。
那么我们自己有没有办法利用 JedisCluster 去封装一个具有 Pipeline 模式的客户端?
思路
刚刚提到,JedisCluster 会持有 Redis Cluster 所有节点的连接。那么,如果我们可以获取到所有节点的连接,对每个节点的连接都开启 Pipeline。首先计算出每个 Key 所在的 Slot,再找到 Slot 对应节点,就可以将 Key 放到对应节点连接的 Pipeline 上,这样不就实现了集群版的 Pipeline 了么!
我们要做的工作就是找到对应关系,将每个 Key 分配到对应的节点连接中。
秉着不重复造轮子的观点,我们先看看 JedisCluster 是如何执行命令的?
JedisCluster
先写样例,并在 get()处打断点。
CRC16
进入 run(),可以看到 JedisClusterCRC16 提供了 getSlot()方法,可以计算出 Key 所在的 Slot。
run()里面调用了 runWithRetries(),这是核心方法之一,Step into
// 据方法调用参数删除了部分代码
private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
Jedis connection = null;
try {
// false
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
// 重点:从方法名看,是根据slot来获取jedis连接!!
connection = connectionHandler.getConnectionFromSlot(slot);
}
return execute(connection);
} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
// 释放连接
releaseConnection(connection);
connection = null;
if (attempts <= 1) {
// 刷新slots
this.connectionHandler.renewSlotCache();
}
return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
}
}
复制代码
从 runWithRetries()可以看到,JedisCluster 通过调用 getConnectionFromSlot(slot)来获取 jedis 连接,这里实现了 Slot 和 Jedis 的关系。
那么 connectionHandler 为什么可以提供 redis 连接?
connectionHandler
查看 connectionHandler 变量信息
可以看到它有一个 JedisClusterInfoCache 类型的成员变量 cache,cache 有两个 HashMap 类型的成员变量 nodes 和 slots,nodes 保存节点和 JedisPool 的映射关系,slots 保存 16384 个 slot 和 JedisPool 的映射关系,这里 slot 和节点实现了映射关系。
接着看一下 getConnectionFromSlot():
可以看出,cache 调用 getSlotPool(),从成员变量 slots 中通过 slot 取到了相应节点的 JedisPool。
简单的画一下流程图:
至此,所有轮子都已经具备,开始造车。
实现 Pipeline
我们只要获取到 connectionHandler 变量,就可以使用它的成员变量 cache 来获取 Jedis。
connectionHandler 是 JedisCluster 的成员变量,在其父类 BinaryJedisCluster 中找到了此变量。
cache 是 connectionHandler 的成员变量,在其父类 JedisClusterConnectionHandler 找到了此变量。
connectionHandler 和 cache 都是 protected 变量,外部类无法直接访问,所以需要定义子类访问变量。
自定义 ConnectionHandler
目的:使用 cache 保存的 Cluster 信息,用其来获取 JedisPool。
public class JedisSlotConnectionHandlerImp extends JedisSlotBasedConnectionHandler implements Serializable {
public JedisSlotConnectionHandlerImp(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
super(nodes, poolConfig, connectionTimeout, soTimeout, password);
}
// 自定义通过slot获取JedisPool的方法
// 为了保证后面一个JedisPool只取一个Jedis
public JedisPool getJedisPoolFromSlot(int slot) {
JedisPool jedisPool = cache.getSlotPool(slot);
if (jedisPool != null) {
return jedisPool;
} else {
renewSlotCache();
jedisPool = cache.getSlotPool(slot);
if (jedisPool != null) {
return jedisPool;
} else {
throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
}
}
}
}
复制代码
自定义 ClusterPipeline
目的:使用 connectionHandler 来建立 key、slot 以及 JedisPool 之间关系映射
public class JedisClusterPipeline extends JedisCluster implements Serializable {
// 覆盖父类中的connectionHandler
protected JedisSlotConnectionHandlerImp connectionHandler;
public JedisClusterPipeline(HashSet node, int connectionTimeout, int soTimeout, int maxAttempts, String password, GenericObjectPoolConfig poolConfig) {
super(node, connectionTimeout, soTimeout, maxAttempts, password, poolConfig);
connectionHandler = new JedisSlotConnectionHandlerImp(node, poolConfig, connectionTimeout, soTimeout, password);
}
// 通过key转换成slot,再获取JedisPool
public JedisPool getJedisPoolFromSlot(String key) {
return connectionHandler.getJedisPoolFromSlot(JedisClusterCRC16.getSlot(key));
}
}
复制代码
使用
使用自定义的 JedisClusterPipeline,需要自己实现 set、get、hget 等方法来覆盖父类 JedisCluster 对应的方法。最初的目的是应用于 Spark 将维度信息存入 Redis Cluster,当时是用 scala 面向 RDD 的 partition 实现了集群版的 hmset()方法。
这里临时用 Java 实现一下 Pipeline 的 set()方法。
实现 set()
public class JedisClusterPipelineCommand {
/**
* 自定义的pipeline模式set方法
* @param key 存放的key
* @param value 存放的value
* @param clusterPipeline 用来获取JedisPool
* @param pipelines 建立JedisPool和pipeline映射,保证一个JedisPool只开启一个pipeline
* @param jedisMap 建立pipeline和Jedis映射,用来释放Jedis
* @param nums 记录每个pipeline放入key的条数
* @param threshold pipeline进行sync的阈值
*/
public static void setByPipeline(String key, String value, JedisClusterPipeline clusterPipeline, ConcurrentHashMap<JedisPool, Pipeline> pipelines, ConcurrentHashMap<Pipeline, Jedis> jedisMap, ConcurrentHashMap<Pipeline, Integer> nums, int threshold) {
JedisPool jedisPool = clusterPipeline.getJedisPoolFromSlot(key);
// 查看对应节点是否已经开启了pipeline
Pipeline pipeline = pipelines.get(jedisPool);
if (pipeline == null) {
Jedis jedis = jedisPool.getResource();
pipeline = jedis.pipelined();
// 构建映射关系,保证每个节点只有一个jedis来开启pipeline
jedisMap.put(pipeline, jedis);
pipelines.put(jedisPool, pipeline);
nums.put(pipeline, 0);
}else {
int num = nums.get(pipeline);
nums.put(pipeline, num + 1);
if (num % threshold == 0) {
pipeline.sync();
}
}
pipeline.set(key, value);
}
/**
* 释放jedis并强制pipeline sync
*/
public static void releaseConnection(ConcurrentHashMap<Pipeline, Jedis> jedisMap) {
for (Jedis jedis : jedisMap.values()) {
jedis.close();
}
}
}
复制代码
执行类
public static void main(String[] args) throws Exception {
JedisPoolConfig config = new JedisPoolConfig();
HashSet jedisClusterNodes = new java.util.HashSet<HostAndPort>();
jedisClusterNodes.add(new HostAndPort("47.102.xxx.xx", 10001));
JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(jedisClusterNodes, 1000, 1000, 10, "Redis6", config);
ConcurrentHashMap<JedisPool, Pipeline> pipelines = new ConcurrentHashMap<>();
ConcurrentHashMap<Pipeline, Jedis> jedisMap = new ConcurrentHashMap<>();
ConcurrentHashMap<Pipeline, Integer> nums = new ConcurrentHashMap<>();
for (int i = 0; i < 1000; i++) {
JedisClusterPipelineCommand.setByPipeline("k" + i, "v" + i, jedisClusterPipeline, pipelines, jedisMap, nums, 100 );
}
JedisClusterPipelineCommand.releaseConnection(jedisMap);
}
复制代码
执行结果
性能测试
本机环境 1000 条数据
pipeline 模式:2.32s
JedisCluster:68.6s
Spark on Yarn 128w 条 Hash
本机环境测试结果受限于网络和主机配置,仅供比较参考。
结语
最后选择自己实现 pipeline,首先是因为比较了解 pipeline 的原理,说白了就是用习惯了。其次是在本机测试 letttuce 时,出现了一些意料之外的问题,目前还在探索中。下一步的工作就是慢慢的将 Pipeline 其他的方法实现,逐步优化,用于生产。
95 后小程序员,写的都是日常工作中的亲身实践,置身于初学者的角度从 0 写到 1,保证能够真正让大家看懂。文章会在公众号 [入门到放弃之路] 首发,期待你的关注。
评论