一、数据同步概述
1. 为什么需要数据同步
网关是流量请求的入口,在微服务架构中承担了非常重要的角色,网关高可用的重要性不言而喻。在使用网关的过程中,为了满足业务诉求,经常需要变更配置,比如流控规则、路由规则等等。因此,网关动态配置是保障网关高可用的重要因素。
soul 网关通过数据同步的方式,实现了网关的动态配置。
2. 数据的同步策略
websocket
zookeeper
http 长轮询
nacos
3. 数据同步实现原理
下图展示了 Soul
数据同步的流程,Soul
网关在启动时,会从配置服务同步配置数据,管理员在管理后台,变更用户、规则、插件、流量配置,也会同步到 Soul
网关。同步配置数据使用push
模式,还是pull
模式,取决于数据同步策略。
如下图所示,soul-admin
在用户发生配置变更后,会通过 EventPublisher
发出配置变更通知,由 EventDispatcher
处理该变更通知,然后根据配置的同步策略(http、weboscket、zookeeper),将配置发送给对应的事件处理器
如果是 websocket
同步策略,则将变更后的数据主动推送给 soul-web
,并且在网关层,会有对应的 WebsocketCacheHandler
处理器处理来处 admin
的数据推送
如果是 zookeeper
同步策略,将变更数据更新到 zookeeper
,而 ZookeeperSyncCache
会监听到 zookeeper
的数据变更,并予以处理
如果是 http
同步策略,soul-web
主动发起长轮询请求,默认有 90s 超时时间,如果 soul-admin
没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http 请求,反复同样的请求
二、websoket 同步策略
1. 使用 websocket 同步
1.1 soul-admin 模块
修改 application.yml,配置数据同步策略为 websocket
soul:
sync:
websocket:
enabled: true
复制代码
1.2 soul-bootstrap 模块
修改 pom.xml,引入soul-spring-boot-starter-sync-data-websocke
t 依赖
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
复制代码
修改 application-local.yml,配置 websocket 数据同步策略
soul :
sync:
websocket :
urls: ws://localhost:9095/websocket
复制代码
2. websocket 同步源码分析
2.1 soul-admin 启动源码分析
创建org.dromara.soul.admin.listener.websocket.WebsocketDataChangedListener
和 org.dromara.soul.admin.listener.websocket.WebsocketCollector
实例
@Configuration
@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
return new WebsocketDataChangedListener();
}
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
return new WebsocketCollector();
}
....
}
复制代码
WebsocketCollector
,负责监听websocket
事件(打开连接、关闭连接、接受消息、连接错误)以及数据发送
@ServerEndpoint("/websocket")
public class WebsocketCollector {
private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
private static Session session;
@OnOpen
public void onOpen(final Session session) {
log.info("websocket on open successful....");
SESSION_SET.add(session);
}
@OnMessage
public void onMessage(final String message, final Session session) {
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
WebsocketCollector.session = session;
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
}
}
@OnClose
public void onClose(final Session session) {
SESSION_SET.remove(session);
WebsocketCollector.session = null;
}
@OnError
public void onError(final Session session, final Throwable error) {
SESSION_SET.remove(session);
WebsocketCollector.session = null;
log.error("websocket collection error: ", error);
}
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
if (DataEventTypeEnum.MYSELF == type) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
return;
}
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
}
}
复制代码
2.2 soul-bootstrap 启动源码分析
创建org.dromara.soul.plugin.sync.data.weboscket.WebsocketSyncDataService
实例
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
...
}
复制代码
WebsocketSyncDataService
构造器执行了以下逻辑
根据配置 soul.sync.websocket.urls
的配置创建SoulWebsocketClient
对象,放入 clients。
遍历 clients,监理websocket
连接。
创建定时任务,每 30 秒执行一次,判断 websocket 连接是否断开,断开时进行重新连接。
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor;
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
@Override
public void close() {
....
}
}
复制代码
SoulWebsocketClient
与配置服务连接成功后,会发送一条 MYSELF
事件消息
@Slf4j
public final class SoulWebsocketClient extends WebSocketClient {
.....
@Override
public void onOpen(final ServerHandshake serverHandshake) {
if (!alreadySync) {
send(DataEventTypeEnum.MYSELF.name());
alreadySync = true;
}
}
......
}
复制代码
配置服务,接受到 MYSELF
消息,通过 SyncDataServiceImpl 的 syncAll(final DataEventTypeEnum type)方法将配置数据 push
到 Soul
网关
@ServerEndpoint("/websocket")
public class WebsocketCollector {
......
@OnMessage
public void onMessage(final String message, final Session session) {
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
WebsocketCollector.session = session;
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
}
}
......
}
复制代码
2.3 soul-admin 在用户发生配置变更后源码分析
接下来,通过描述更新 divide 插件的 Selector 配置时的执行流程来分析源码,其他配置的修改也是类似的。
当更新Selector
时,会向 http://localhost:9095/selector/{selectorId}
发送请求
执行SelectorController.updateSelector()
方法,然后调用SelectorServiceImpl.createOrUpdate()
,更新配置到数据库,然后通过eventPublisher
发送 SELECTOR
更新event
。
@Service("selectorService")
public class SelectorServiceImpl implements SelectorService {
private final SelectorMapper selectorMapper;
private final SelectorConditionMapper selectorConditionMapper;
private final PluginMapper pluginMapper;
private final ApplicationEventPublisher eventPublisher;
@Override
@Transactional(rollbackFor = RuntimeException.class)
public int createOrUpdate(final SelectorDTO selectorDTO) {
int selectorCount;
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();
if (StringUtils.isEmpty(selectorDTO.getId())) {
selectorCount = selectorMapper.insertSelective(selectorDO);
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));
});
} else {
selectorCount = selectorMapper.updateSelective(selectorDO);
//delete rule condition then add
selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));
selectorConditionDTOs.forEach(selectorConditionDTO -> {
selectorConditionDTO.setSelectorId(selectorDO.getId());
SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);
selectorConditionMapper.insertSelective(selectorConditionDO);
});
}
publishEvent(selectorDO, selectorConditionDTOs);
return selectorCount;
}
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
List<ConditionData> conditionDataList =
selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}
}
复制代码
DataChangedEventDispatcher 接受到刚才发布的 SELECTOR 更新event
,将该 event 包含的数据传递给
soul-admin 启动时创建的WebsocketDataChangedListener
实例的onSelectorChanged
()方法
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
private ApplicationContext applicationContext;
private List<DataChangedListener> listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
......
}
复制代码
WebsocketDataChangedListener.onSelectorChanged()调用 WebsocketCollector.send()方法
public class WebsocketDataChangedListener implements DataChangedListener {
....
@Override
public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
WebsocketData<SelectorData> websocketData =
new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
....
}
复制代码
WebsocketCollector.send 方法,遍历 SESSION_SET,将数据 push 到所有的 soul 网关
public class WebsocketCollector {
private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
private static Session session;
@OnOpen
public void onOpen(final Session session) {
log.info("websocket on open successful....");
SESSION_SET.add(session);
}
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
if (DataEventTypeEnum.MYSELF == type) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
return;
}
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
}
}
复制代码
评论