写点什么

soul 数据同步(二)zookeeper 同步策略

用户头像
xzy
关注
发布于: 2021 年 01 月 21 日
soul数据同步(二)zookeeper同步策略

基于 zookeeper 的同步原理很简单,主要是依赖 zookeeper 的 watch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存


1. 使用 zk 进行数据同步


1.1 soul-admin


修改 application.yml,配置 zk 同步相关属性

soul:  sync:    zookeeper:      url: localhost:2181            sessionTimeout: 5000      connectionTimeout: 2000
复制代码


1.2 soul-bootstrap


修改 pom.xml,引入 zk 依赖

<dependency>         <groupId>org.dromara</groupId>         <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>         <version>${project.version}</version></dependency>
复制代码


修改 application-local.yml,配置 zk 同步相关属性

soul :    sync:        zookeeper:             url: localhost:2181             sessionTimeout: 5000             connectionTimeout: 2000
复制代码


2. 源码分析


2.1 soul-admin 的启动分析


读取 application.yml 中,zk 同步相关配置属性,创建 ZkClient 实例

 // org.dromara.soul.admin.config.ZookeeperConfiguration@EnableConfigurationProperties(ZookeeperProperties.class)public class ZookeeperConfiguration {  
/** * register zkClient in spring ioc. * * @param zookeeperProp the zookeeper configuration * @return ZkClient {@linkplain ZkClient} */ @Bean @ConditionalOnMissingBean(ZkClient.class) public ZkClient zkClient(final ZookeeperProperties zookeeperProp) { return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout()); }}
// org.dromara.soul.admin.config.ZookeeperProperties@Data@ConfigurationProperties(prefix = "soul.sync.zookeeper")public class ZookeeperProperties {
private String url;
private Integer sessionTimeout;
private Integer connectionTimeout;
private String serializer;}
复制代码


创建org.dromara.soul.admin.listener.zookeeper.ZookeeperDataChangedListenerorg.dromara.soul.admin.listener.zookeeper.ZookeeperDataInit实例

// org.dromara.soul.admin.config.DataSyncConfiguration@Configurationpublic class DataSyncConfiguration {    .......    /**     * The type Zookeeper listener.     */    @Configuration    @ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")    @Import(ZookeeperConfiguration.class)    static class ZookeeperListener {
/** * Config event listener data changed listener. * * @param zkClient the zk client * @return the data changed listener */ @Bean @ConditionalOnMissingBean(ZookeeperDataChangedListener.class) public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) { return new ZookeeperDataChangedListener(zkClient); }
/** * Zookeeper data init zookeeper data init. * * @param zkClient the zk client * @param syncDataService the sync data service * @return the zookeeper data init */ @Bean @ConditionalOnMissingBean(ZookeeperDataInit.class) public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) { return new ZookeeperDataInit(zkClient, syncDataService); } }
.......}
复制代码


ZookeeperDataChangedListener 实现了org.dromara.soul.admin.listener.DataChangedListener接口,该接口定义了一系列数据变更回调方法,ZookeeperDataChangedListener实现了这些方法,当数据变更时,会通过内部的ZkClient写入 zk


// org.dromara.soul.admin.listener.DataChangedListenerpublic interface DataChangedListener {
default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) { } default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) { }
default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) { } default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {
} default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) { }}
复制代码


ZookeeperDataInit实现了org.springframework.boot.CommandLineRunner接口,应用初始化后会执行其 run()方法,当 zk 中没有 soul 相关配置时,会调用 syncDataService.syncAll()

// org.dromara.soul.admin.listener.zookeeper.ZookeeperDataInitpublic class ZookeeperDataInit implements CommandLineRunner {
private final ZkClient zkClient;
private final SyncDataService syncDataService;
/** * Instantiates a new Zookeeper data init. * * @param zkClient the zk client * @param syncDataService the sync data service */ public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) { this.zkClient = zkClient; this.syncDataService = syncDataService; }
@Override public void run(final String... args) { String pluginPath = ZkPathConstants.PLUGIN_PARENT; String authPath = ZkPathConstants.APP_AUTH_PARENT; String metaDataPath = ZkPathConstants.META_DATA; if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) { syncDataService.syncAll(DataEventTypeEnum.REFRESH); } }}
复制代码


syncDataService.syncAll(),从数据库中读取 plugin、selector、rule 配置,通过 eventPublisher,发布出去

// org.dromara.soul.admin.service.sync.SyncDataServiceImpl@Service("syncDataService")public class SyncDataServiceImpl implements SyncDataService {
private final AppAuthService appAuthService; private final MetaDataService metaDataService; private final PluginService pluginService; private final SelectorService selectorService; private final RuleService ruleService; private final ApplicationEventPublisher eventPublisher;

@Override public boolean syncAll(final DataEventTypeEnum type) { appAuthService.syncData(); List<PluginData> pluginDataList = pluginService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList)); List<SelectorData> selectorDataList = selectorService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList)); List<RuleData> ruleDataList = ruleService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList)); metaDataService.syncData(); return true; }
@Override public boolean syncPluginData(final String pluginId) { ..... }}
复制代码


DataChangedEventDispatcher 接收到 eventPublisher,调用 DataChangedListener 的回调方法,这里的 listener 是前面创建的org.dromara.soul.admin.listener.zookeeper.ZookeeperDataChangedListener实例,其会调用方法会同步数据到 zk

// org.dromara.soul.admin.listener.DataChangedEventDispatcher@Componentpublic class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
private ApplicationContext applicationContext;
private List<DataChangedListener> listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) { this.applicationContext = applicationContext; }
@Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } }
@Override public void afterPropertiesSet() { Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values(); this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans)); }
}
复制代码


2.2 soul-bootstrap 的启动分析


创建 org.dromara.soul.sync.data.zookeeper.ZookeeperSyncDataService 和 ZkClient 实例

// org.dromara.soul.spring.boot.sync.data.zookeeper.ZookeeperSyncDataConfiguration@Configuration@ConditionalOnClass(ZookeeperSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")@EnableConfigurationProperties(ZookeeperConfig.class)@Slf4jpublic class ZookeeperSyncDataConfiguration {
@Bean public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use zookeeper sync soul data......."); return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); }
@Bean public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) { return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout()); }
}
复制代码


在 ZookeeperSyncDataService 构造器里,执行 watcher*方法,将 zk 数据同步到本地缓存,并对监听 zk 节点

// org.dromara.soul.sync.data.zookeeper.ZookeeperSyncDataServicepublic class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
private final ZkClient zkClient; public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { this.zkClient = zkClient; ... watcherData(); watchAppAuth(); watchMetaData(); } private void watcherData() { final String pluginParent = ZkPathConstants.PLUGIN_PARENT; List<String> pluginZKs = zkClientGetChildren(pluginParent); for (String pluginName : pluginZKs) { watcherAll(pluginName); } zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> { if (CollectionUtils.isNotEmpty(currentChildren)) { for (String pluginName : currentChildren) { watcherAll(pluginName); } } }); }
private void watcherAll(final String pluginName) { watcherPlugin(pluginName); watcherSelector(pluginName); watcherRule(pluginName); }
private void watcherPlugin(final String pluginName) { String pluginPath = ZkPathConstants.buildPluginPath(pluginName); if (!zkClient.exists(pluginPath)) { zkClient.createPersistent(pluginPath, true); } cachePluginData(zkClient.readData(pluginPath)); subscribePluginDataChanges(pluginPath, pluginName); }
private void watcherSelector(final String pluginName) { ... }
private void watcherRule(final String pluginName) { ... }

private void subscribePluginDataChanges(final String pluginPath, final String pluginName) { zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {
@Override public void handleDataChange(final String dataPath, final Object data) { Optional.ofNullable(data) .ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe((PluginData) d))); }
@Override public void handleDataDeleted(final String dataPath) { final PluginData data = new PluginData(); data.setName(pluginName); Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.unSubscribe(data)); } }); }
.....}
复制代码


2.3 soul-admin 配置变更后源码分析


与上一篇文章分析的流程一样,不一样的是,最终处理的 listener 是 soul-admin 启动时创建的org.dromara.soul.admin.listener.zookeeper.ZookeeperDataChangedListener,其中的回调方法会将修改后的配置同


用户头像

xzy

关注

还未添加个人签名 2017.10.17 加入

还未添加个人简介

评论

发布
暂无评论
soul数据同步(二)zookeeper同步策略