写点什么

深入浅出 Apache Pulsar(2):Pulsar 消息机制

  • 2022 年 1 月 19 日
  • 本文字数:4787 字

    阅读完需:约 16 分钟

深入浅出Apache Pulsar(2):Pulsar消息机制

消息机制

Pulsar 采用发布-订阅(pub-sub)的设计模式 。 该设计模式中,producer 发布消息到 topic, Consumer 订阅 topic、处理发布的消息,并在处理完成后发送确认。


一旦创建订阅,即使 consumer 断开连接,Pulsar 仍然可以保存所有消息。 在 consumer 确认消息已处理成功后,才会删除消息。

主题(Topic)

逻辑上一个 Topic 是日志结构,每个消息都在这个日志结构中有一个偏移量。Apache Pulsar 使用游标来跟踪偏移量(Cursor Tracking)。


Pulsar 支持两种基本的 topic 类型:持久 topic 与非持久 topic。


{persistent|non-persistent}://tenant/namespace/topic
复制代码


  • Non-Partitioned topics


$ $PULSAR_HOME/bin/pulsar-admin topics \list public/default$ $PULSAR_HOME/bin/pulsar-admin topics \create persistent://public/default/input-seed-avro-topic$ $PULSAR_HOME/bin/pulsar-admin topics \lookup persistent://public/default/input-seed-avro-topic$ $PULSAR_HOME/bin/pulsar-admin topics \delete persistent://public/default/input-seed-avro-topic$ $PULSAR_HOME/bin/pulsar-admin topics \stats persistent://public/default/input-seed-avro-topic$ curl http://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats | python -m json.tool
复制代码

Partitioned topics

$ $PULSAR_HOME/bin/pulsar-admin topics \create-partitioned-topic persistent://public/default/output-seed-avro-topic \--partitions 2$ $PULSAR_HOME/bin/pulsar-admin topics \list-partitioned-topics public/default$ $PULSAR_HOME/bin/pulsar-admin topics \get-partitioned-topic-metadata persistent://public/default/output-seed-avro-topic$ $PULSAR_HOME/bin/pulsar-admin topics \delete-partitioned-topic persistent://public/default/output-seed-avro-topic
复制代码

消息(Message)

Messages are the basic "unit" of Pulsar.


public interface Message<T> {    Map<String, String> getProperties();    boolean hasProperty(String var1);    String getProperty(String var1);    byte[] getData();    T getValue();    MessageId getMessageId();    long getPublishTime();    long getEventTime();    long getSequenceId();    String getProducerName();    boolean hasKey();    String getKey();    boolean hasBase64EncodedKey();    byte[] getKeyBytes();    boolean hasOrderingKey();    byte[] getOrderingKey();    String getTopicName();    Optional<EncryptionContext> getEncryptionCtx();    int getRedeliveryCount();    byte[] getSchemaVersion();    boolean isReplicated();    String getReplicatedFrom();}
复制代码

生产者(Producer)

public void send() throws PulsarClientException {    final String serviceUrl = "pulsar://server-100:6650";    // final String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650";    // http://pulsar.apache.org/docs/en/client-libraries-java/#client    final PulsarClient client = PulsarClient.builder()            .serviceUrl(serviceUrl)            .connectionTimeout(10000, TimeUnit.MILLISECONDS)            .build();    final String topic = "persistent://public/default/topic-sensor-temp";    // http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer    final Producer<byte[]> producer = client.newProducer()            .producerName("sensor-temp")            .topic(topic)            .compressionType(CompressionType.LZ4)            .enableChunking(true)            .enableBatching(true)            .batchingMaxBytes(1024)            .batchingMaxMessages(10)            .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)            .blockIfQueueFull(true)            .maxPendingMessages(512)            .sendTimeout(1, TimeUnit.SECONDS)            .create();    MessageId mid = producer.send("sensor-temp".getBytes());    System.out.printf("\nmessage with ID %s successfully sent", mid);    mid = producer.newMessage()            .key("sensor-temp-key")            .value("sensor-temp-key".getBytes())            .property("my-key", "my-value")            .property("my-other-key", "my-other-value")            .send();    System.out.printf("message-key with ID %s successfully sent", mid);    producer.close();    client.close();}
复制代码

消费者(Consumer)

public void consume() throws PulsarClientException {    final String serviceUrl = "pulsar://server-101:6650";    final String topic = "input-seed-avro-topic";    final PulsarClient client = PulsarClient.builder()            .serviceUrl(serviceUrl)            .enableTcpNoDelay(true)            .build();    final Consumer<byte[]> consumer = client            .newConsumer()            .consumerName("seed-avro-consumer")            .subscriptionName("seed-avro-subscription")            .subscriptionType(SubscriptionType.Exclusive)            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)            .topic(topic)            .receiverQueueSize(10)            .subscribe();    final AvroSchema<SeedEvent> schema = AvroSchema.of(SeedEvent.class);    while (true) {        try {            final Message<byte[]> msg = consumer.receive();            LOG.info("接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}",                    Thread.currentThread().getId(),                    msg.getTopicName(),                    msg.getMessageId(),                    msg.getSequenceId(),                    msg.getEventTime(),                    msg.getPublishTime(),                    msg.getProducerName(),                    msg.getKey(), schema.decode(msg.getValue()));            try {                consumer.acknowledge(msg);            } catch (final PulsarClientException e) {                consumer.negativeAcknowledge(msg);                LOG.error("acknowledge:" + e.getLocalizedMessage(), e);            }        } catch (final PulsarClientException e) {            LOG.error("receive:" + e.getLocalizedMessage(), e);        }    }}
复制代码

订阅(Subscriptions)

消费者通过订阅来消费 Topic 中的消息。订阅是游标(跟踪偏移量)的逻辑实体,一个 Topic 可以添加多个订阅。订阅不包含消息的数据,只包含元数据和游标。


每个 Subscription 都存储一个 Cursor。Cursor 是日志中的当前偏移量。Subscription 将其 Cursor 存储至 BookKeeper 的 Ledger 中。这使 Cursor 跟踪可以像 Topic 一样进行扩展。

订阅类型(subscription-type)


  • Exclusive 独享


一个订阅只能有一个消息者消费消息。



  • Failover 灾备


一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。



  • Shared 共享


一个订阅中同时可以有多个消费者,多个消费者共享 Topic 中的消息。



  • Key_Shared


有序性保证(Ordering guarantee)

如果对顺序性有要求,可以使用 Exclusive 和 Failover 的订阅模式,这样同一个 Topic 只有一个 Consumer 在消费,可以保证顺序性。


如果使用 Shared 订阅模式,多个 Consumer 可以并发消费同一个 Topic。通过动态增加 Consumer 的数量,可以加速 Topic 的消费,减少消息在服务端的堆积。


KeyShared 模式保证在 Shared 模式下同一个 Key 的消息也会发送到同一个 Consumer,在并发的同时也保证了顺序性。

多主题订阅(Multi-topic subscriptions)

Pattern:


  • persistent://public/default/.*

  • persistent://public/default/foo.*

Reader

public void read() throws IOException {    final String serviceUrl = "pulsar://server-101:6650";    final PulsarClient client = PulsarClient.builder()            .serviceUrl(serviceUrl)            .build();    // http://pulsar.apache.org/docs/en/client-libraries-java/#reader    final Reader<byte[]> reader = client.newReader()            .topic("my-topic")            .startMessageId(MessageId.earliest()) // MessageId.latest            .create();    while (true)         final Message<byte[]> message = reader.readNext();        System.out.println(new String(message.getData()));    }}
复制代码


分片主题(Partitioned topics)

消息保留和过期(Message retention and expiry)

如果没有对 Topic 设置数据保留策略,一旦一个 Topic 的所有订阅的游标都已经成功消费到一个偏移量时,此偏移量前面的消息就会被自动删除。


如果 Topic 设置了数据保留策略,已经消费确认的消息超过保留策略阈值(Topic 的消息存储大小、Topic 中消息保留的时间)后会被删除。


conf/broker.conf

# Default message retention time# 默认0, 修改为3天=60*24*3defaultRetentionTimeInMinutes=4320# Default retention size# 默认为0, 修改为10GdefaultRetentionSizeInMB=10240# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)ttlDurationDefaultInSeconds=0
复制代码

retention policy (for a namespace)

$ $PULSAR_HOME/bin/pulsar-admin namespaces \get-retention public/default$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/retention | python -m json.tool$ $PULSAR_HOME/bin/pulsar-admin namespaces \set-retention public/default \--size 1024M \--time 5m$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/retention \--header "Content-Type:application/json" \--data '{  "retentionTimeInMinutes" : 5,  "retentionSizeInMB" : 1024}'
复制代码

message expiry / message-ttl

$ $PULSAR_HOME/bin/pulsar-admin namespaces \get-message-ttl public/default$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/messageTTL$ $PULSAR_HOME/bin/pulsar-admin namespaces \set-message-ttl public/default \--messageTTL 1800$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/messageTTL \--header "Content-Type:application/json" \--data '1800'
复制代码

更多福利

云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维管理平台 OMP(Operation Management Platform) ,具备 纳管、部署、监控、巡检、自愈、备份、恢复 等功能,可为用户提供便捷的运维能力和业务管理,在提高运维人员等工作效率的同时,极大提升了业务的连续性和安全性。点击下方地址链接,欢迎大家给 OMP 点赞送 star,了解更多相关内容~


GitHub 地址: https://github.com/CloudWise-OpenSource/OMPGitee 地址:https://gitee.com/CloudWise/OMP


微信扫描识别下方二维码,备注【OMP】加入 AIOps 社区运维管理平台 OMP 开发者交流群,与更多行业大佬一起交流学习~


系列阅读

深入浅出Apache Pulsar(1):Pulsar vs Kafka

用户头像

全栈智能业务运维服务商 2021.03.10 加入

我们秉承Make Digital Online的使命,致力于通过先进的产品技术,为企业数字化转型和提升IT运营效率持续赋能。 https://www.cloudwise.com/

评论

发布
暂无评论
深入浅出Apache Pulsar(2):Pulsar消息机制