soul 数据同步(二)zookeeper 同步策略
基于 zookeeper 的同步原理很简单,主要是依赖 zookeeper
的 watch 机制,soul-web
会监听配置的节点,soul-admin
在启动的时候,会将数据全量写入 zookeeper
,后续数据发生变更时,会增量更新 zookeeper
的节点,与此同时,soul-web
会监听配置信息的节点,一旦有信息变更时,会更新本地缓存
1. 使用 zk 进行数据同步
1.1 soul-admin
修改 application.yml,配置 zk 同步相关属性
soul:
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
1.2 soul-bootstrap
修改 pom.xml,引入 zk 依赖
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
修改 application-local.yml,配置 zk 同步相关属性
soul :
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
2. 源码分析
2.1 soul-admin 的启动分析
读取 application.yml 中,zk 同步相关配置属性,创建 ZkClient 实例
// org.dromara.soul.admin.config.ZookeeperConfiguration
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperConfiguration {
/**
* register zkClient in spring ioc.
*
* @param zookeeperProp the zookeeper configuration
* @return ZkClient {@linkplain ZkClient}
*/
@Bean
@ConditionalOnMissingBean(ZkClient.class)
public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {
return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout());
}
}
// org.dromara.soul.admin.config.ZookeeperProperties
@Data
@ConfigurationProperties(prefix = "soul.sync.zookeeper")
public class ZookeeperProperties {
private String url;
private Integer sessionTimeout;
private Integer connectionTimeout;
private String serializer;
}
创建org.dromara.soul.admin.listener.zookeeper.ZookeeperDataChangedListener
和org.dromara.soul.admin.listener.zookeeper.ZookeeperDataInit
实例
// org.dromara.soul.admin.config.DataSyncConfiguration
@Configuration
public class DataSyncConfiguration {
.......
/**
* The type Zookeeper listener.
*/
@Configuration
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {
/**
* Config event listener data changed listener.
*
* @param zkClient the zk client
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
return new ZookeeperDataChangedListener(zkClient);
}
/**
* Zookeeper data init zookeeper data init.
*
* @param zkClient the zk client
* @param syncDataService the sync data service
* @return the zookeeper data init
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataInit.class)
public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
return new ZookeeperDataInit(zkClient, syncDataService);
}
}
.......
}
ZookeeperDataChangedListener
实现了org.dromara.soul.admin.listener.DataChangedListener
接口,该接口定义了一系列数据变更回调方法,ZookeeperDataChangedListener
实现了这些方法,当数据变更时,会通过内部的ZkClient
写入 zk
// org.dromara.soul.admin.listener.DataChangedListener
public interface DataChangedListener {
default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) {
}
default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) {
}
default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {
}
default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {
}
default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) {
}
}
ZookeeperDataInit
实现了org.springframework.boot.CommandLineRunner
接口,应用初始化后会执行其 run()方法,当 zk 中没有 soul 相关配置时,会调用 syncDataService.syncAll()
// org.dromara.soul.admin.listener.zookeeper.ZookeeperDataInit
public class ZookeeperDataInit implements CommandLineRunner {
private final ZkClient zkClient;
private final SyncDataService syncDataService;
/**
* Instantiates a new Zookeeper data init.
*
* @param zkClient the zk client
* @param syncDataService the sync data service
*/
public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
this.zkClient = zkClient;
this.syncDataService = syncDataService;
}
@Override
public void run(final String... args) {
String pluginPath = ZkPathConstants.PLUGIN_PARENT;
String authPath = ZkPathConstants.APP_AUTH_PARENT;
String metaDataPath = ZkPathConstants.META_DATA;
if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}
}
syncDataService.syncAll(),从数据库中读取 plugin、selector、rule 配置,通过 eventPublisher,发布出去
// org.dromara.soul.admin.service.sync.SyncDataServiceImpl
@Service("syncDataService")
public class SyncDataServiceImpl implements SyncDataService {
private final AppAuthService appAuthService;
private final MetaDataService metaDataService;
private final PluginService pluginService;
private final SelectorService selectorService;
private final RuleService ruleService;
private final ApplicationEventPublisher eventPublisher;
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true;
}
@Override
public boolean syncPluginData(final String pluginId) {
.....
}
}
DataChangedEventDispatcher 接收到 eventPublisher,调用 DataChangedListener 的回调方法,这里的 listener 是前面创建的org.dromara.soul.admin.listener.zookeeper.ZookeeperDataChangedListener
实例,其会调用方法会同步数据到 zk
// org.dromara.soul.admin.listener.DataChangedEventDispatcher
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
private ApplicationContext applicationContext;
private List<DataChangedListener> listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
@Override
public void afterPropertiesSet() {
Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
}
}
2.2 soul-bootstrap 的启动分析
创建 org.dromara.soul.sync.data.zookeeper.ZookeeperSyncDataService 和 ZkClient 实例
// org.dromara.soul.spring.boot.sync.data.zookeeper.ZookeeperSyncDataConfiguration
@Configuration
@ConditionalOnClass(ZookeeperSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@EnableConfigurationProperties(ZookeeperConfig.class)
@Slf4j
public class ZookeeperSyncDataConfiguration {
@Bean
public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use zookeeper sync soul data.......");
return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
@Bean
public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) {
return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout());
}
}
在 ZookeeperSyncDataService 构造器里,执行 watcher*方法,将 zk 数据同步到本地缓存,并对监听 zk 节点
// org.dromara.soul.sync.data.zookeeper.ZookeeperSyncDataService
public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
private final ZkClient zkClient;
public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
this.zkClient = zkClient;
...
watcherData();
watchAppAuth();
watchMetaData();
}
private void watcherData() {
final String pluginParent = ZkPathConstants.PLUGIN_PARENT;
List<String> pluginZKs = zkClientGetChildren(pluginParent);
for (String pluginName : pluginZKs) {
watcherAll(pluginName);
}
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
for (String pluginName : currentChildren) {
watcherAll(pluginName);
}
}
});
}
private void watcherAll(final String pluginName) {
watcherPlugin(pluginName);
watcherSelector(pluginName);
watcherRule(pluginName);
}
private void watcherPlugin(final String pluginName) {
String pluginPath = ZkPathConstants.buildPluginPath(pluginName);
if (!zkClient.exists(pluginPath)) {
zkClient.createPersistent(pluginPath, true);
}
cachePluginData(zkClient.readData(pluginPath));
subscribePluginDataChanges(pluginPath, pluginName);
}
private void watcherSelector(final String pluginName) {
...
}
private void watcherRule(final String pluginName) {
...
}
private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {
zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {
@Override
public void handleDataChange(final String dataPath, final Object data) {
Optional.ofNullable(data)
.ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe((PluginData) d)));
}
@Override
public void handleDataDeleted(final String dataPath) {
final PluginData data = new PluginData();
data.setName(pluginName);
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.unSubscribe(data));
}
});
}
.....
}
2.3 soul-admin 配置变更后源码分析
与上一篇文章分析的流程一样,不一样的是,最终处理的 listener 是 soul-admin 启动时创建的org.dromara.soul.admin.listener.zookeeper.ZookeeperDataChangedListener
,其中的回调方法会将修改后的配置同
xzy
还未添加个人签名 2017.10.17 加入
还未添加个人简介
评论