写点什么

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

用户头像
极客good
关注
发布于: 刚刚


Pulsar 协议本身是一个很轻量级的东西,即上图中的 Pulsar protocol handler。它主要是处理 TCP 过来的请求格式,然后将请求转化和读取的操作。所以 Pulsar 协议最核心部分在存储层面、分布式均衡层面等。


将 Pulsar protocol handler 抽象出来,变成一个框架/接口。利用这个框架,可以直接访问 Pulsar 已经构建好的存储系统,剩下要做的只是协议的解析和转换。


所以依据这个构想,将 Kafka 协议带入去实践。在 Pulsar 2.5 版本时新加了一个「Pluggable protocol handler」的概念(PIP-41),将接口单独抽离了出来。


Pulsar protocol handler 的使用是类似 Pulsar function/connector,只需将其插入到 Pulsar broker 中,就可以让 Pulsar 具有读取和解析其他协议的能力。这个机制只需要调整两个配置:



配置完成后,重启集群即可支持「其他类型协议」的处理能力。当然这个特性只在 Pulsar 2.5 版本后才支持,所以如需尝试,可以先将 Pulsar 系统升级到 2.5 版本。



所以在此机制下过程就会变得更加明了简单。只需在 Pulsar 里实现 Kafka protocol handler 即可,剩下的上图实线绿色部分是 Kafka 原生客户端。只需将数据接入到 Pulsar 集群,就可以处理 Kafka 请求。


为什么选取 Kafka 作为实践对象?


======================================================================================


应为 Pulsar 和 Kafka 在一些层面有很多相似之处。比如日志层,Pulsar 和 Kafka 都采用非常相似的数据模型,用于发布/订阅消息和事件流,Pulsar 和 Kafka 都采用分布式日志。


通过对比 Pulsar 和 Kafka,我们发现这两种系统有很多相似之处。这两种系统都包括以下操作:


  • Topic 查找所有客户端都连接到任一 broker 以查找 Topic 的元数据(即 owner broker)。获取元数据之后,客户端与 owner broker 建立持久的 TCP 连接。

  • 发布客户端与 Topic 区的 owner broker 进行对话,以将消息追加到分布式日志中。

  • 消费客户端与 Topic 分区的 owner broker 进行对话,以便从分布式日志中读取消息。

  • 偏移量为发布给 Topic 分区的消息分配偏移量。在 Pulsar 中,偏移量被称为 MessageId。consumer 可以使用偏移量来查找日志中的给定位置,以便读取消息。

  • 消费状态这两个系统都维护订阅中的 consumer( Kafka 称之为消费组)的消费状态。Kafka 将消费状态存储在 __offsets Topic,而 Pulsar 将消费状态存储在 cursors


实现方式


1. Topic


============================================================================


Kafka 将所有 Topic 存储在扁平的命名空间。但是,Pulsar 将 Topic 存储在层次化、多租户的命名空间。我们在 broker 配置中添加了 kafkaNamespace 配置,这样管理员就可以将 Kafka Topic 映射到 Pulsar Topic。


为了方便 Kafka 用户使用 Apache Pulsar 的多租户特性,当 Kafka 用户使用 SASL 验证机制来验证 Kafka 客户端的时候,可以指定一个 Pulsar 租户和命名空间作为其 SASL 用户名。


2. 消息 ID 和偏移量


=================================================================================


Kafka 为每条被成功发布到 Topic 分区的消息都指定了一个偏移量。Pulsar 为每条消息指定了一个 MessageID。消息 ID 由 ledger-identry-idbatch-index 组成。我们在 Pulsar-Kafka wrapper 中使用相同的方法将 Pulsar 的消息 ID 转换为偏移量,反之亦然。


3. 消息


=========================================================================


Kafka 和 Pulsar 的消息都包含键、值、时间戳和 header(在 Pulsar 中被称作 ‘properties’)。我们自动在 Kafka 消息和 Pulsar 消息之间转换这些字段。


4. Topic 查找


===============================================================================


我们为 Kafka 和 Pulsar 的请求处理插件提供相同的 Topic 查找方法。请求处理插件发现 Topic,查找所请求的 Topic 分区的全部所有权,然后将包含所有权信息的 Kafka TopicMetadata 返回给 Kafka 客户端。


5. 发布消息


===========================================================================


当收到 Kafka 客户端发布的消息后,Kafka 请求处理插件逐一将多个字段(例如键、值、时间戳和 headers)进行映射,从而将 Kafka 消息转换为 Pulsar 消息。


同时,Kafka 请求处理插件利用 ManagedLedger append API 将这些已转化的 Pulsar 消息存储在 BookKeeper。Kafka 请求处理插件将 Kafka 消息转换为 Pulsar 消息后,现有的 Pulsar 应用程序就可以接收 Kafka 客户端发布的消息。


6. 消费消息


===========================================================================


当收到 Kafka 客户端的 consumer 请求时,Kafka 请求处理插件打开一个非持久 cursor,然后从请求的偏移量开始读取 entries。


Kafka 请求处理插件将 Pulsar 消息转换回 Kafka 消息后,现有的 Kafka 应用程序就可以接收 Pulsar 客户端发布的消息。


7. Group coordinator & 偏移量管理


================================================================================================


最大的挑战是实现 group coordinator 和偏移量管理。Pulsar 不支持集中的 group coordinator,无法为消费组里的 consumer 分配分区,也无法管理每个消费组的偏移量。


Pulsar broker 基于分区来管理分区分配,而分区的 owner broker 通过将确认信息存储在 cursors 来管理偏移量。


我们很难让 Pulsar 模型与 Kafka 模型保持一致。因此,为了完全兼容 Kafka 客户端,我们将 coordinator group 的更改和偏移量存储在 Pulsar 名为 public/kafka/__offsets 系统 Topic 中,从而实现 Kafka coordinator group。


这样,我们能够在 Pulsar 和 Kafka 之间建立桥梁,并允许用户使用现有的 Pulsar 工具和策略来管理订阅并监控 Kafka consumer。我们在已实现的 coordinator group 中添加一个后台线程,定期将偏移量更新从系统 Topic 同步到 Pulsar cursor。


因此,实际上 Kafka 消费组被认为是 Pulsar 订阅。所有现有的 Pulsar 工具也可以用于管理 Kafka 消费组。


KoP 生产化


==========================================================================


如果将 KoP 应用到实际场景中,就需要考虑以下多个方面:


  • 多租户

  • 安全性

  • 跨机房复制

  • 分层存储

  • Schema

  • 与已有的数据环境(如 Flink、Spark、Presto)集成


Q & A


========================================================================


1. Pulsar 有多种扩展,这些扩展有统一的管理方式吗?


目前在做一个项目:Pulsar Registry,类似于 DocHub。也可以看作一个应用商店,会集中一些组件/插件合集,可以期待一下。


2. Kafka 0.11 以下的版本是否能平滑升级到高版本?如果消息格式变了,是不是没法平滑升级?


不能,0.10/0.11 版本以上才可以平滑升级。


总 结


======================================================================


KoP 最终的目的,是方便用户将 Kafka 上已有的应用迁移到 Pulsar 上,同时通过 KoP 的方式让用户可以更方便地构建产品。未来 KoP 也会加大对 schema 和 Kafka 版本的支持与多兼容性。


使用 KoP


KoP 使用 Apache License V2 许可证,项目地址如下:

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?