一、数据同步概述
1. 为什么需要数据同步
网关是流量请求的入口,在微服务架构中承担了非常重要的角色,网关高可用的重要性不言而喻。在使用网关的过程中,为了满足业务诉求,经常需要变更配置,比如流控规则、路由规则等等。因此,网关动态配置是保障网关高可用的重要因素。
soul 网关通过数据同步的方式,实现了网关的动态配置。
2. 数据的同步策略
websocket
zookeeper
http 长轮询
nacos
3. 数据同步实现原理
下图展示了 Soul 数据同步的流程,Soul 网关在启动时,会从配置服务同步配置数据,管理员在管理后台,变更用户、规则、插件、流量配置,也会同步到 Soul 网关。同步配置数据使用push模式,还是pull模式,取决于数据同步策略。
如下图所示,soul-admin 在用户发生配置变更后,会通过 EventPublisher 发出配置变更通知,由 EventDispatcher 处理该变更通知,然后根据配置的同步策略(http、weboscket、zookeeper),将配置发送给对应的事件处理器
如果是 websocket 同步策略,则将变更后的数据主动推送给 soul-web,并且在网关层,会有对应的 WebsocketCacheHandler 处理器处理来处 admin 的数据推送
如果是 zookeeper 同步策略,将变更数据更新到 zookeeper,而 ZookeeperSyncCache 会监听到 zookeeper 的数据变更,并予以处理
如果是 http 同步策略,soul-web 主动发起长轮询请求,默认有 90s 超时时间,如果 soul-admin 没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http 请求,反复同样的请求
二、websoket 同步策略
1. 使用 websocket 同步
1.1 soul-admin 模块
修改 application.yml,配置数据同步策略为 websocket
soul: sync: websocket: enabled: true
复制代码
1.2 soul-bootstrap 模块
修改 pom.xml,引入soul-spring-boot-starter-sync-data-websocket 依赖
<dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId> <version>${project.version}</version> </dependency>
复制代码
修改 application-local.yml,配置 websocket 数据同步策略
soul : sync: websocket : urls: ws://localhost:9095/websocket
复制代码
2. websocket 同步源码分析
2.1 soul-admin 启动源码分析
创建org.dromara.soul.admin.listener.websocket.WebsocketDataChangedListener和 org.dromara.soul.admin.listener.websocket.WebsocketCollector 实例
@Configuration@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)@EnableConfigurationProperties(WebsocketSyncProperties.class)static class WebsocketListener {
@Bean @ConditionalOnMissingBean(WebsocketDataChangedListener.class) public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener(); }
@Bean @ConditionalOnMissingBean(WebsocketCollector.class) public WebsocketCollector websocketCollector() { return new WebsocketCollector(); } ....}
复制代码
WebsocketCollector,负责监听websocket事件(打开连接、关闭连接、接受消息、连接错误)以及数据发送
@ServerEndpoint("/websocket")public class WebsocketCollector {
private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
private static Session session;
@OnOpen public void onOpen(final Session session) { log.info("websocket on open successful...."); SESSION_SET.add(session); }
@OnMessage public void onMessage(final String message, final Session session) { if (message.equals(DataEventTypeEnum.MYSELF.name())) { WebsocketCollector.session = session; SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF); } }
@OnClose public void onClose(final Session session) { SESSION_SET.remove(session); WebsocketCollector.session = null; }
@OnError public void onError(final Session session, final Throwable error) { SESSION_SET.remove(session); WebsocketCollector.session = null; log.error("websocket collection error: ", error); }
public static void send(final String message, final DataEventTypeEnum type) { if (StringUtils.isNotBlank(message)) { if (DataEventTypeEnum.MYSELF == type) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("websocket send result is exception: ", e); } return; } for (Session session : SESSION_SET) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("websocket send result is exception: ", e); } } } }}
复制代码
2.2 soul-bootstrap 启动源码分析
创建org.dromara.soul.plugin.sync.data.weboscket.WebsocketSyncDataService实例
@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
@Bean public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use websocket sync soul data......."); return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); }
...}
复制代码
WebsocketSyncDataService构造器执行了以下逻辑
根据配置 soul.sync.websocket.urls 的配置创建SoulWebsocketClient对象,放入 clients。
遍历 clients,监理websocket连接。
创建定时任务,每 30 秒执行一次,判断 websocket 连接是否断开,断开时进行重新连接。
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor; public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); for (String url : urls) { try { clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url({}) is error", url, e); } } try { for (WebSocketClient client : clients) { boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { log.info("websocket connection is successful....."); } else { log.error("websocket connection is error....."); } executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { log.info("websocket reconnect is successful....."); } else { log.error("websocket reconnection is error....."); } } } catch (InterruptedException e) { log.error("websocket connect is error :{}", e.getMessage()); } }, 10, 30, TimeUnit.SECONDS); } /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ } catch (InterruptedException e) { log.info("websocket connection...exception....", e); }
} @Override public void close() { .... }}
复制代码
SoulWebsocketClient与配置服务连接成功后,会发送一条 MYSELF 事件消息
@Slf4jpublic final class SoulWebsocketClient extends WebSocketClient { ..... @Override public void onOpen(final ServerHandshake serverHandshake) { if (!alreadySync) { send(DataEventTypeEnum.MYSELF.name()); alreadySync = true; } } ......}
复制代码
配置服务,接受到 MYSELF 消息,通过 SyncDataServiceImpl 的 syncAll(final DataEventTypeEnum type)方法将配置数据 push 到 Soul 网关
@ServerEndpoint("/websocket")public class WebsocketCollector { ...... @OnMessage public void onMessage(final String message, final Session session) { if (message.equals(DataEventTypeEnum.MYSELF.name())) { WebsocketCollector.session = session; SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF); } } ......}
复制代码
2.3 soul-admin 在用户发生配置变更后源码分析
接下来,通过描述更新 divide 插件的 Selector 配置时的执行流程来分析源码,其他配置的修改也是类似的。
当更新Selector时,会向 http://localhost:9095/selector/{selectorId} 发送请求
执行SelectorController.updateSelector() 方法,然后调用SelectorServiceImpl.createOrUpdate() ,更新配置到数据库,然后通过eventPublisher发送 SELECTOR 更新event。
@Service("selectorService")public class SelectorServiceImpl implements SelectorService {
private final SelectorMapper selectorMapper;
private final SelectorConditionMapper selectorConditionMapper; private final PluginMapper pluginMapper;
private final ApplicationEventPublisher eventPublisher;
@Override @Transactional(rollbackFor = RuntimeException.class) public int createOrUpdate(final SelectorDTO selectorDTO) { int selectorCount; SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); if (StringUtils.isEmpty(selectorDTO.getId())) { selectorCount = selectorMapper.insertSelective(selectorDO); selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); } else { selectorCount = selectorMapper.updateSelective(selectorDO); //delete rule condition then add selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId())); selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO); selectorConditionMapper.insertSelective(selectorConditionDO); }); } publishEvent(selectorDO, selectorConditionDTOs); return selectorCount; } private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) { PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId()); List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList()); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList)))); }
}
复制代码
DataChangedEventDispatcher 接受到刚才发布的 SELECTOR 更新event,将该 event 包含的数据传递给
soul-admin 启动时创建的WebsocketDataChangedListener实例的onSelectorChanged()方法
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
private ApplicationContext applicationContext;
private List<DataChangedListener> listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) { this.applicationContext = applicationContext; }
@Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } } ......
}
复制代码
WebsocketDataChangedListener.onSelectorChanged()调用 WebsocketCollector.send()方法
public class WebsocketDataChangedListener implements DataChangedListener { .... @Override public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) { WebsocketData<SelectorData> websocketData = new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); } ....
}
复制代码
WebsocketCollector.send 方法,遍历 SESSION_SET,将数据 push 到所有的 soul 网关
public class WebsocketCollector { private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>(); private static Session session; @OnOpen public void onOpen(final Session session) { log.info("websocket on open successful...."); SESSION_SET.add(session); } public static void send(final String message, final DataEventTypeEnum type) { if (StringUtils.isNotBlank(message)) { if (DataEventTypeEnum.MYSELF == type) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("websocket send result is exception: ", e); } return; } for (Session session : SESSION_SET) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("websocket send result is exception: ", e); } } } }}
复制代码
评论