写点什么

直播回顾| Apache Pulsar 2.10.0 新特性概览

作者:Apache Pulsar
  • 2022 年 4 月 07 日
  • 本文字数:5427 字

    阅读完需:约 18 分钟

关于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。GitHub 地址:http://github.com/apache/pulsar/


导语:本文是 Apache Pulsar PMC 成员,StreamNative 首席架构师李鹏辉在 TGIP-CN 037 直播活动的文字整理版本。Pulsar 2.10.0 版本即将发布,本场直播为大家带来 Apache Pulsar 2.10.0 的主要新特性及版本解读,解答大家对新版本对于技术细节的疑问。


点击查看回顾视频


Pulsar 2.10.0 包含来自于 99 位贡献者的 1000+ commits,其中诸多贡献来自于国内的贡献者,感谢大家对 Pulsar 的支持与贡献。本次版本发布是一次新的里程碑,如此多的 commit 数量也为文档带来了升级;Apache Pulsar 网站升级中,新网站 Beta 版本对文档进行了重新归档与完善,欢迎大家试用并提出宝贵意见


Apache Pulsar 2.10.0 版本新特性内容包括:


  • 去除对 ZooKeeper 的强依赖;

  • 新的消费类型 TableView;

  • 多集群自动故障转移;

  • Producer Lazy Loading + Partial RoundRobin;

  • Redeliver Backoff;

  • Init Subscription for DLQ;

  • 引入多集群全局 Topic Policy 设置支持以及 Topic 级别的跨地域复制配置;

  • ChunkMessageId;

  • 增加批量操作 Metadata 服务的支持:可以在大量 Topic 的场景下提升 Pulsar 稳定性;

  • ...


去除对 ZooKeeper API 强依赖

ZooKeeper 是 Pulsar 中使用非常广泛的一个 API,旧版对该 API 的依赖无处不在,但这种依赖不利于用户选择其他类型的元数据服务。为了解决这一问题,Pulsar 经过多个版本的迭代,做了大量准备和测试工作后终于在 2.10.0 版本去除了对 ZooKeeper 的强依赖。


目前新版支持三种元数据服务:


  • ZooKeeper

  • Etcd

  • RocksDB(standalone)


其中需要注意的是 Etcd 目前没有很好的 Java 客户端,综合考量使用要慎重。此外从 benchmark 测试成绩来看 ZooKeeper 与 Etcd 的性能是相近的,用户可以根据自身情况来选择。


该特性的提案是 PIP 45 : Pluggable metadata interface(Metadata Store + Coordination Service)。顾名思义,这里的 Metadata Store 是一个元数据存储,而 Coordination Service 则提供了一个中心化服务来获得全局锁。


2.10 版本还增加了 Metadata 批量操作支持,降低客户端与服务端交互需求,大幅减轻了 metadata 的操作压力。


# The metadata store URL# Examples:# \* zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181# \* my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not specified)# \* zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path) metadataStoreUrl=
# The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl configurationMetadataStoreUrl=
复制代码


以上就是新特性的 API 实现,可以看到上述配置参数已经不再需要 ZooKeeper。但去除对 ZooKeeper 的依赖并不代表会将其移除。考虑到 ZooKeeper 还有大量用户,使用较为广泛,所以短期内官方并不会考虑删除实现,而只是将其插件化来方便使用。


更多减轻 ZooKeeper 依赖细节,可以阅读博客 Apache Pulsar 轻装上阵:迈向轻 ZooKeeper 时代


新的消费类型 TableView

Pulsar 的消费模式比较多样化,如今 2.10 版本再引入了 TableView,这是一个类似于 KV 的表格服务。它是一个不支持写入的纯视图,可以直接在客户端内存构建表格视图,适合数据量不大的场景生成视图。但 TableView 不太合适数据量较大、单台机器的内存都难以承受的场景。


TableView 可用于配合 Topic Compaction,后者可以在服务端做 Key 的压缩。这一特性的原理是只在一个 snapshot 中保存 key 的最新状态,consumer 需要时只需查看这个 snapshot,而无需去消耗更多成本读取原始 backlog。TableView 可以与该特性无缝衔接,恢复 TableView 时就可以直接利用 broker 生成的 snapshot,从而减小恢复开销。原视频 22 分处具体介绍了这种压缩的机制和使用场景



try (TableView<byte[]> tv = client. newTableViewBuilder (Schema.BYTES)        .topic ("public/default/tableview-test")        .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS)        .create()) {    System.out.println("start tv size: " + tv.size());    tv. forEachAndListen((k, v) -> System.out.println(k + "->"+ Arrays. toString(v)));
while (true) f Thread. sleep (20000) ; System.out.println(tv.size)): tv. forEach((k, v) -> System, out.println("checkpoint: "+ k+ "->" + Arrays.toString(v))); }
} catch (Exception ex) { System.out.println("Table view failed: " + ex. getCause());}
复制代码


Cluster 自动故障转移



ServiceUrlProvider provider = AutoClusterFailover.builder()        .primary(primary)        .secondary(Collections.singletonList(secondary))        .failoverDelay(failoverDelay, TimeUnit.SECONDS)        .switchBackDelay(switchBackDelay, TimeUnit.SECONDS)        .checkInterval(checkInterval, TimeUnit.MILLISECONDS)        .build();
复制代码


Pulsar 支持多集群,集群间可同步数据,因此用户经常会有集群间的故障转移需求,所以引入自动故障转移特性。过去做故障转移时需要通过域名切换,或者使用自制的辅助节点,但这些方法往往都需要人工介入,SLA 难以保障。新特性的优势在于自动化与可配置,可设置 primary 与 secondary 集群,配置延迟等参数,通过探活实现集群自动按预期切换。但目前该特性在探活时只探测 Pulsar 端口是否接通,未来版本将继续改进探活方式。


Producer Lazy loading + Partial RoundRobin



当前在一个规模较大的集群中,如果 partition 比较多,则发送消息时 producer 需要轮询所有 partition,且 partition 可能分布在不同 broker 上可能产生巨大的连接压力,如上图上半部分。为此,该新特性实现了 producer 懒加载,这样一来如果不用某个 partition 就不会创建它,减轻了系统负担。而部分轮询会先 List 所有 partition 再将它们 shuffle,实现不同客户端写入不同的 partition,减少 producer 实例与 broker 的连接数量,同样可以降低系统压力。需要注意的是 Shared consumer 暂时不支持这种机制,未来需要社区共同探索这一方面能否实现类似的机制。


PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()        .topic(topic)        .enableLazyStartPartitionedProducers(true)        .enableBatching(false)        .messageRoutingMode(MessageRoutingMode.CustomPartition)        .messageRouter(new PartialRoundRobinMessageRouterImpl(3))        .create();  
复制代码

Redeliver Backoff

client.newConsumer().negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()        .minDelayMs(1000)        .maxDelayMs(60 \* 1000)        .build()).subscribe();
client.newConsumer().ackTimeout(10, TimeUnit.SECOND)        .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()        .minDelayMs(1000)        .maxDelayMs(60 \* 1000)        .build()).subscribe();
复制代码


Pulsar 现有 ackTimeout 机制,如果使用 shared 订阅,消费数据时可能在一段时间内无法签收,则 ackTimeout 可以保证超过一定时间仍未签收时客户端自动重新投递消息,将消息重新分发到其他 consumer 上。


消息重新投递的时间很难确定,长短不一,且随着消息处理失败次数越来越多,延迟需要越来越长。为此引入了该 API,可以逐渐延长延迟。相比现有方法,这个特性的优势在于开销更小,不需要经过另一个 topic,且更加灵活。不足之处是一旦客户端宕机会导致消息立即重试。另外该特性使用成本很低,API 简洁,很容易掌握。需要注意的是目前只有 Java 客户端支持该特性,另外它可以配合死信队列使用。


初始化死信队列订阅

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)    .topic(topic)    .subscriptionName(subscriptionName)    .subscriptionType(SubscriptionType.Shared)    .ackTimeout(1, TimeUnit.SECONDS)    .deadLetterPolicy(DeadLetterPolicy.builder()        .maxRedeliverCount(maxRedeliveryCount)        .initialSubscriptionName(my-sub)        .build())    .receiverQueueSize(100)    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest )    .subscribe();
复制代码


死信队列的创建是懒创建策略,这就出现了一个问题:死信消息尚未发生、topic 尚未创建时,就无法给 topic 指定数据保留策略。为此只能给 namespace 创建策略,粒度会很大。新版引入 InitialSubscriptionName,在设置死信队列时可以在 create 时同时创建一个订阅,这样数据就可以保留下来。且对于死信队列,大部分场景仅需一个订阅处理即可,于是 订阅就会和 InitialSubscriptionName 对应,这样无需设置 retention,就可以保留发到死信队列的消息。


跨集群 topic 策略

bin/pulsar-admin topics set-retention -s 1G -t 1d --global my-topic
Message message = MessageBuilder.create()    ...    .setReplicationClusters(restrictDatacenters)    .build();producer.send(message);
复制代码


该特性可以跨集群应用 topic policy,通过 -global 参数对所有集群生效。表面上来看一个全局参数很容易实现,其实背后做了很多工作。主要是底层需要将 schema 同步到所有集群,才能做到跨集群应用。需要注意的是 broker 没有重试策略,以下两种方式任选其一:


  • 主动告知 broker 重试;

  • 断开客户端。


新的消息 ID 类型 ChunkMessageId

public class ChunkMessageIdImpl extends MessageIdImpl implements Messaged {    private final MessageIdImpl firstChunkMsgId;
public ChunkMessageIdImpl(MessageIdImplfirstChunkMsgId,MessageIdImpllastChunkMsgId){ super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex()); this. firstChunkMsgId = firstChunkMsgId; } public MessageIdImplgetFirstChunkMessageId(){ return firstChunkMsgId; } public MessageIdImplgetLastChunkMessageId(){ return this; }
}
复制代码


之前版本引入的 ChunkMessage 可以有效减轻系统压力,但存在的问题是 Chunk 中只有最后的 MessageId 返回给客户端,这样客户端就无法知晓前面的 Id。这个特性解决了 ChunkMessage 开始到结束对应的 Id 缺失问题,方便用户 seek MessageId 并消费。目前该特性只有 Java 客户端支持。


其他特性

  • Topic properties:给 Topic 附加 name 之外的更多信息,与 metadata 一并存储;

  • Metadata batch operation:提升性能;

  • CPP client:提供 chunk message 支持;

  • Broker graceful shutdown:支持 REST API 优雅地关闭 Broker,先将 Broker 从集群拿掉后再关闭 Topic,避免继续发起对 Broker 的连接;

  • Support creat a consumer in the paused state:在 create consumer 时可以指定暂停状态,不向服务端获取消息;

  • ...


精选 Q&A

新特性介绍完毕后,李老师还对观众弹幕问题一一作了解答,以下为 QA 精选内容概要,详情见视频 54 分钟后。


Q:Pulsar 是否支持 failover?

  • Pulsar 支持 failover 模式,且一个 partition 可以有多个 consumer,开销主要存在于消息签收上。Pulsar 支持维护单条消息签收状态,因此会有一定开销;


Q:Pulsar 对 ack 的操作是 exactly once 吗?

  • Pulsar 对不开启 transaction 的情况默认是 at least once 实现,而不是 exactly once;


Q:ChunkMessage 支持事务吗?

  • 目前暂不支持事务;


Q:Pulsar 的消息发送是否会有中间失败后面成功的情况?

  • Pulsar 中所有消息的发送不会出现中间失败后面成功的情况,其中一个失败后面都会失败;


Q:元数据导致的集群规模限制问题如何解决?

  • 2.10 版本暂未解决元数据导致的集群规模限制问题,未来考虑解决;


Q:KoP 支持 Kafka format 吗?

  • 现在 KoP 可以支持 Pulsar format 与 Kafka format,避免服务端的序列化/反序列化,把工作交给客户端。客户端加载一个 formatter 就可以解析 Kafka format 数据,减轻对 broker 的压力。


关于 PPT

请复制链接到浏览器下载 PPT:https://pan.baidu.com/s/1sqt99KVF7n0jBS_aue2SXw

密码: 6wtk


相关阅读


关注公众号「Apache Pulsar」,获取更多技术干货


加入 Apache Pulsar 中文交流群👇🏻


点击立即观看 TGIP-CN 37:Apache Pulsar 2.10.0 新特征解析 回顾视频!

用户头像

Apache Pulsar

关注

下一代云原生分布式消息流平台 2017.10.17 加入

Apache 软件基金会顶级项目,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展流数据存储特性。

评论

发布
暂无评论
直播回顾| Apache Pulsar 2.10.0 新特性概览_开源_Apache Pulsar_InfoQ写作平台