写点什么

[Pulsar] TopicPolicy 的同步过程

作者:Zike Yang
  • 2021 年 12 月 21 日
  • 本文字数:2490 字

    阅读完需:约 8 分钟

Pulsar 中的 Topic level Policy 记录了许多参数,如 dispatch-rate、subscription-type-enabled 等等,使用 SystemTopic 来进行状态同步,每个 broker 维护所有的 topicPolicies,当我们需要更高一个 topic 的 topic policy 时,则需要将这个更改同步到其他的 Broker 中,当这个 topic 发生了 unload、ownership 的转换时,其他 broker 则能够直接拿到该 topic 的 policy,本文将介绍 pulsar 中 topic policy 的同步过程。


SystemTopicBasedTopicPoliciesService

Pulsar 中主要使用 SystemTopicBasedTopicPoliciesService 实现基于 system topic 同步的 topic policy。当 Broker 获得了某个 namespace 或者当前 namespace 中不存在相关的 policy cache 时,则会调用 prepareInitPoliciesCache 以及 initPoliciesCache 进行初始化。

初始化过程中会为每个 namespace 创建 system topic client 和 reader,监听 namespace 下的__change_events 的 topic。

这个 reader 将一直监听,只要这个 topic 出现了新的 PolicyEvent,就会去处理该 event,以保证当前 Broker 的 policy cache 是同步的。


当 TopicPolicy 发生变更时,则会调用 sendTopicPolicyEvent 方法将这个 Event 发送到上面所说的 topic 中:

private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,                                                     TopicPolicies policies) {  createSystemTopicFactoryIfNeeded();
CompletableFuture<Void> result = new CompletableFuture<>();
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writerFuture = systemTopicClient.newWriterAsync(); writerFuture.whenComplete((writer, ex) -> { if (ex != null) { result.completeExceptionally(ex); } else { PulsarEvent event = getPulsarEvent(topicName, actionType, policies); CompletableFuture<MessageId> actionFuture = ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event); actionFuture.whenComplete(((messageId, e) -> { if (e != null) { result.completeExceptionally(e); } else { if (messageId != null) { result.complete(null); } else { result.completeExceptionally(new RuntimeException("Got message id is null.")); } } writer.closeAsync().whenComplete((v, cause) -> { if (cause != null) { log.error("[{}] Close writer error.", topicName, cause); } else { if (log.isDebugEnabled()) { log.debug("[{}] Close writer success.", topicName); } } }); }) ); } }); return result;}
复制代码


其他 Broker 的 reader 则会监听到这个事件,然后处理相关的 event:

private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {  // delete policies  if (msg.getValue() == null) {    TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());    if (hasReplicateTo(msg)) {      globalPoliciesCache.remove(topicName);    } else {      policiesCache.remove(topicName);    }    return;  }  if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {    TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();    TopicName topicName =      TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic());    switch (msg.getValue().getActionType()) {      case INSERT:        TopicPolicies old = event.getPolicies().isGlobalPolicies()          ? globalPoliciesCache.putIfAbsent(topicName, event.getPolicies())          : policiesCache.putIfAbsent(topicName, event.getPolicies());        if (old != null) {          log.warn("Policy insert failed, the topic: {}' policy already exist", topicName);        }        break;      case UPDATE:        if (event.getPolicies().isGlobalPolicies()) {          globalPoliciesCache.put(topicName, event.getPolicies());        } else {          policiesCache.put(topicName, event.getPolicies());        }        break;      case DELETE:        policiesCache.remove(topicName);        SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory          .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());        systemTopicClient.newWriterAsync().thenAccept(writer                                                      -> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null))                                                      .whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> {                                                        if (ex != null) {                                                          log.error("close writer failed ", ex);                                                        }                                                      })));        break;      case NONE:        break;      default:        log.warn("Unknown event action type: {}", msg.getValue().getActionType());        break;    }  }}
复制代码

这样,就可以完成 TopicPolicy 的更新。

发布于: 24 分钟前阅读数: 3
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] TopicPolicy的同步过程