Pulsar 中的 Topic level Policy 记录了许多参数,如 dispatch-rate、subscription-type-enabled 等等,使用 SystemTopic 来进行状态同步,每个 broker 维护所有的 topicPolicies,当我们需要更高一个 topic 的 topic policy 时,则需要将这个更改同步到其他的 Broker 中,当这个 topic 发生了 unload、ownership 的转换时,其他 broker 则能够直接拿到该 topic 的 policy,本文将介绍 pulsar 中 topic policy 的同步过程。
SystemTopicBasedTopicPoliciesService
Pulsar 中主要使用 SystemTopicBasedTopicPoliciesService 实现基于 system topic 同步的 topic policy。当 Broker 获得了某个 namespace 或者当前 namespace 中不存在相关的 policy cache 时,则会调用 prepareInitPoliciesCache 以及 initPoliciesCache 进行初始化。
初始化过程中会为每个 namespace 创建 system topic client 和 reader,监听 namespace 下的__change_events 的 topic。
这个 reader 将一直监听,只要这个 topic 出现了新的 PolicyEvent,就会去处理该 event,以保证当前 Broker 的 policy cache 是同步的。
当 TopicPolicy 发生变更时,则会调用 sendTopicPolicyEvent 方法将这个 Event 发送到上面所说的 topic 中:
private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
TopicPolicies policies) {
createSystemTopicFactoryIfNeeded();
CompletableFuture<Void> result = new CompletableFuture<>();
SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writerFuture = systemTopicClient.newWriterAsync();
writerFuture.whenComplete((writer, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
} else {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> actionFuture =
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event);
actionFuture.whenComplete(((messageId, e) -> {
if (e != null) {
result.completeExceptionally(e);
} else {
if (messageId != null) {
result.complete(null);
} else {
result.completeExceptionally(new RuntimeException("Got message id is null."));
}
}
writer.closeAsync().whenComplete((v, cause) -> {
if (cause != null) {
log.error("[{}] Close writer error.", topicName, cause);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Close writer success.", topicName);
}
}
});
})
);
}
});
return result;
}
复制代码
其他 Broker 的 reader 则会监听到这个事件,然后处理相关的 event:
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
// delete policies
if (msg.getValue() == null) {
TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
if (hasReplicateTo(msg)) {
globalPoliciesCache.remove(topicName);
} else {
policiesCache.remove(topicName);
}
return;
}
if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
TopicName topicName =
TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic());
switch (msg.getValue().getActionType()) {
case INSERT:
TopicPolicies old = event.getPolicies().isGlobalPolicies()
? globalPoliciesCache.putIfAbsent(topicName, event.getPolicies())
: policiesCache.putIfAbsent(topicName, event.getPolicies());
if (old != null) {
log.warn("Policy insert failed, the topic: {}' policy already exist", topicName);
}
break;
case UPDATE:
if (event.getPolicies().isGlobalPolicies()) {
globalPoliciesCache.put(topicName, event.getPolicies());
} else {
policiesCache.put(topicName, event.getPolicies());
}
break;
case DELETE:
policiesCache.remove(topicName);
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer
-> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null))
.whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> {
if (ex != null) {
log.error("close writer failed ", ex);
}
})));
break;
case NONE:
break;
default:
log.warn("Unknown event action type: {}", msg.getValue().getActionType());
break;
}
}
}
复制代码
这样,就可以完成 TopicPolicy 的更新。
评论