写点什么

soul 数据同步(一)概述及 websocket 同步策略

用户头像
xzy
关注
发布于: 2021 年 01 月 21 日
soul数据同步(一)概述及websocket同步策略

一、数据同步概述

1. 为什么需要数据同步


网关是流量请求的入口,在微服务架构中承担了非常重要的角色,网关高可用的重要性不言而喻。在使用网关的过程中,为了满足业务诉求,经常需要变更配置,比如流控规则、路由规则等等。因此,网关动态配置是保障网关高可用的重要因素。


soul 网关通过数据同步的方式,实现了网关的动态配置。


2. 数据的同步策略


  1. websocket

  2. zookeeper

  3. http 长轮询

  4. nacos


3. 数据同步实现原理

下图展示了 Soul 数据同步的流程,Soul 网关在启动时,会从配置服务同步配置数据,管理员在管理后台,变更用户、规则、插件、流量配置,也会同步到 Soul 网关。同步配置数据使用push模式,还是pull模式,取决于数据同步策略。


如下图所示,soul-admin 在用户发生配置变更后,会通过 EventPublisher 发出配置变更通知,由 EventDispatcher 处理该变更通知,然后根据配置的同步策略(http、weboscket、zookeeper),将配置发送给对应的事件处理器


  • 如果是 websocket 同步策略,则将变更后的数据主动推送给 soul-web,并且在网关层,会有对应的 WebsocketCacheHandler 处理器处理来处 admin 的数据推送

  • 如果是 zookeeper 同步策略,将变更数据更新到 zookeeper,而 ZookeeperSyncCache 会监听到 zookeeper 的数据变更,并予以处理

  • 如果是 http 同步策略,soul-web 主动发起长轮询请求,默认有 90s 超时时间,如果 soul-admin 没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http 请求,反复同样的请求



二、websoket 同步策略

1. 使用 websocket 同步


1.1 soul-admin 模块


修改 application.yml,配置数据同步策略为 websocket

soul:  sync:    websocket:      enabled: true
复制代码


1.2 soul-bootstrap 模块


修改 pom.xml,引入soul-spring-boot-starter-sync-data-websocket 依赖

  <dependency>       <groupId>org.dromara</groupId>       <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>       <version>${project.version}</version>  </dependency>
复制代码


修改 application-local.yml,配置 websocket 数据同步策略

soul :    sync:        websocket :             urls: ws://localhost:9095/websocket
复制代码


2. websocket 同步源码分析


2.1 soul-admin 启动源码分析


创建org.dromara.soul.admin.listener.websocket.WebsocketDataChangedListenerorg.dromara.soul.admin.listener.websocket.WebsocketCollector 实例


@Configuration@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)@EnableConfigurationProperties(WebsocketSyncProperties.class)static class WebsocketListener {
@Bean @ConditionalOnMissingBean(WebsocketDataChangedListener.class) public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener(); }
@Bean @ConditionalOnMissingBean(WebsocketCollector.class) public WebsocketCollector websocketCollector() { return new WebsocketCollector(); } ....}
复制代码


WebsocketCollector,负责监听websocket事件(打开连接、关闭连接、接受消息、连接错误)以及数据发送

@ServerEndpoint("/websocket")public class WebsocketCollector {
private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
private static Session session;
@OnOpen public void onOpen(final Session session) { log.info("websocket on open successful...."); SESSION_SET.add(session); }
@OnMessage public void onMessage(final String message, final Session session) { if (message.equals(DataEventTypeEnum.MYSELF.name())) { WebsocketCollector.session = session; SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF); } }
@OnClose public void onClose(final Session session) { SESSION_SET.remove(session); WebsocketCollector.session = null; }
@OnError public void onError(final Session session, final Throwable error) { SESSION_SET.remove(session); WebsocketCollector.session = null; log.error("websocket collection error: ", error); }
public static void send(final String message, final DataEventTypeEnum type) { if (StringUtils.isNotBlank(message)) { if (DataEventTypeEnum.MYSELF == type) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("websocket send result is exception: ", e); } return; } for (Session session : SESSION_SET) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("websocket send result is exception: ", e); } } } }}
复制代码


2.2 soul-bootstrap 启动源码分析


创建org.dromara.soul.plugin.sync.data.weboscket.WebsocketSyncDataService实例

@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
@Bean public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use websocket sync soul data......."); return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); }
...}
复制代码


WebsocketSyncDataService构造器执行了以下逻辑

  1. 根据配置 soul.sync.websocket.urls 的配置创建SoulWebsocketClient对象,放入 clients。

  2. 遍历 clients,监理websocket连接。

  3. 创建定时任务,每 30 秒执行一次,判断 websocket 连接是否断开,断开时进行重新连接。


public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor; public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); for (String url : urls) { try { clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url({}) is error", url, e); } } try { for (WebSocketClient client : clients) { boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { log.info("websocket connection is successful....."); } else { log.error("websocket connection is error....."); } executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { log.info("websocket reconnect is successful....."); } else { log.error("websocket reconnection is error....."); } } } catch (InterruptedException e) { log.error("websocket connect is error :{}", e.getMessage()); } }, 10, 30, TimeUnit.SECONDS); } /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ } catch (InterruptedException e) { log.info("websocket connection...exception....", e); }
} @Override public void close() { .... }}
复制代码


SoulWebsocketClient与配置服务连接成功后,会发送一条 MYSELF 事件消息


@Slf4jpublic final class SoulWebsocketClient extends WebSocketClient {    		.....        @Override    public void onOpen(final ServerHandshake serverHandshake) {        if (!alreadySync) {            send(DataEventTypeEnum.MYSELF.name());            alreadySync = true;        }    }   ......}
复制代码


配置服务,接受到 MYSELF 消息,通过 SyncDataServiceImpl 的 syncAll(final DataEventTypeEnum type)方法将配置数据 pushSoul 网关

@ServerEndpoint("/websocket")public class WebsocketCollector {    ......          @OnMessage    public void onMessage(final String message, final Session session) {        if (message.equals(DataEventTypeEnum.MYSELF.name())) {            WebsocketCollector.session = session;            SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);        }    }        ......}
复制代码


2.3 soul-admin 在用户发生配置变更后源码分析

接下来,通过描述更新 divide 插件的 Selector 配置时的执行流程来分析源码,其他配置的修改也是类似的。


当更新Selector时,会向 http://localhost:9095/selector/{selectorId} 发送请求


执行SelectorController.updateSelector() 方法,然后调用SelectorServiceImpl.createOrUpdate() ,更新配置到数据库,然后通过eventPublisher发送 SELECTOR 更新event

@Service("selectorService")public class SelectorServiceImpl implements SelectorService {
private final SelectorMapper selectorMapper;
private final SelectorConditionMapper selectorConditionMapper; private final PluginMapper pluginMapper;
private final ApplicationEventPublisher eventPublisher;

@Override @Transactional(rollbackFor = RuntimeException.class) public int createOrUpdate(final SelectorDTO selectorDTO) { int selectorCount; SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions(); if (StringUtils.isEmpty(selectorDTO.getId())) { selectorCount = selectorMapper.insertSelective(selectorDO); selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); } else { selectorCount = selectorMapper.updateSelective(selectorDO); //delete rule condition then add selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId())); selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO); selectorConditionMapper.insertSelective(selectorConditionDO); }); } publishEvent(selectorDO, selectorConditionDTOs); return selectorCount; } private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) { PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId()); List<ConditionData> conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList()); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList)))); }
}
复制代码


DataChangedEventDispatcher 接受到刚才发布的 SELECTOR 更新event,将该 event 包含的数据传递给

soul-admin 启动时创建的WebsocketDataChangedListener实例的onSelectorChanged()方法

public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
private ApplicationContext applicationContext;
private List<DataChangedListener> listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) { this.applicationContext = applicationContext; }
@Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } } ......
}
复制代码


WebsocketDataChangedListener.onSelectorChanged()调用 WebsocketCollector.send()方法


public class WebsocketDataChangedListener implements DataChangedListener {	  	....     @Override    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {        WebsocketData<SelectorData> websocketData =                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);    }		   ....
}
复制代码


WebsocketCollector.send 方法,遍历 SESSION_SET,将数据 push 到所有的 soul 网关

public class WebsocketCollector {        private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();        private static Session session;     @OnOpen    public void onOpen(final Session session) {        log.info("websocket on open successful....");        SESSION_SET.add(session);    }            public static void send(final String message, final DataEventTypeEnum type) {        if (StringUtils.isNotBlank(message)) {            if (DataEventTypeEnum.MYSELF == type) {                try {                    session.getBasicRemote().sendText(message);                } catch (IOException e) {                    log.error("websocket send result is exception: ", e);                }                return;            }            for (Session session : SESSION_SET) {                try {                    session.getBasicRemote().sendText(message);                } catch (IOException e) {                    log.error("websocket send result is exception: ", e);                }            }        }    }}
复制代码


用户头像

xzy

关注

还未添加个人签名 2017.10.17 加入

还未添加个人简介

评论

发布
暂无评论
soul数据同步(一)概述及websocket同步策略