一、数据同步概述
1. 为什么需要数据同步
网关是流量请求的入口,在微服务架构中承担了非常重要的角色,网关高可用的重要性不言而喻。在使用网关的过程中,为了满足业务诉求,经常需要变更配置,比如流控规则、路由规则等等。因此,网关动态配置是保障网关高可用的重要因素。
soul 网关通过数据同步的方式,实现了网关的动态配置。
2. 数据的同步策略
- websocket 
- zookeeper 
- http 长轮询 
- nacos 
3. 数据同步实现原理
下图展示了 Soul 数据同步的流程,Soul 网关在启动时,会从配置服务同步配置数据,管理员在管理后台,变更用户、规则、插件、流量配置,也会同步到 Soul 网关。同步配置数据使用push模式,还是pull模式,取决于数据同步策略。
如下图所示,soul-admin 在用户发生配置变更后,会通过 EventPublisher 发出配置变更通知,由 EventDispatcher 处理该变更通知,然后根据配置的同步策略(http、weboscket、zookeeper),将配置发送给对应的事件处理器
- 如果是 - websocket同步策略,则将变更后的数据主动推送给- soul-web,并且在网关层,会有对应的- WebsocketCacheHandler处理器处理来处- admin的数据推送
 
- 如果是 - zookeeper同步策略,将变更数据更新到- zookeeper,而- ZookeeperSyncCache会监听到- zookeeper的数据变更,并予以处理
 
- 如果是 - http同步策略,- soul-web主动发起长轮询请求,默认有 90s 超时时间,如果- soul-admin没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http 请求,反复同样的请求
 
二、websoket 同步策略
1. 使用 websocket 同步
1.1 soul-admin 模块
修改 application.yml,配置数据同步策略为 websocket
 soul:  sync:    websocket:      enabled: true
   复制代码
 
1.2 soul-bootstrap 模块
修改 pom.xml,引入soul-spring-boot-starter-sync-data-websocket 依赖
   <dependency>       <groupId>org.dromara</groupId>       <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>       <version>${project.version}</version>  </dependency>
   复制代码
 
修改 application-local.yml,配置 websocket 数据同步策略
 soul :    sync:        websocket :             urls: ws://localhost:9095/websocket
   复制代码
 
2. websocket 同步源码分析
2.1 soul-admin 启动源码分析
创建org.dromara.soul.admin.listener.websocket.WebsocketDataChangedListener和 org.dromara.soul.admin.listener.websocket.WebsocketCollector 实例
 @Configuration@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)@EnableConfigurationProperties(WebsocketSyncProperties.class)static class WebsocketListener {
  @Bean  @ConditionalOnMissingBean(WebsocketDataChangedListener.class)  public DataChangedListener websocketDataChangedListener() {    return new WebsocketDataChangedListener();  }
  @Bean  @ConditionalOnMissingBean(WebsocketCollector.class)  public WebsocketCollector websocketCollector() {    return new WebsocketCollector();  }	  ....}
   复制代码
 
WebsocketCollector,负责监听websocket事件(打开连接、关闭连接、接受消息、连接错误)以及数据发送
 @ServerEndpoint("/websocket")public class WebsocketCollector {
    private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
    private static Session session;
       @OnOpen    public void onOpen(final Session session) {        log.info("websocket on open successful....");        SESSION_SET.add(session);    }
    @OnMessage    public void onMessage(final String message, final Session session) {        if (message.equals(DataEventTypeEnum.MYSELF.name())) {            WebsocketCollector.session = session;            SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);        }    }
       @OnClose    public void onClose(final Session session) {        SESSION_SET.remove(session);        WebsocketCollector.session = null;    }
     @OnError    public void onError(final Session session, final Throwable error) {        SESSION_SET.remove(session);        WebsocketCollector.session = null;        log.error("websocket collection error: ", error);    }
    public static void send(final String message, final DataEventTypeEnum type) {        if (StringUtils.isNotBlank(message)) {            if (DataEventTypeEnum.MYSELF == type) {                try {                    session.getBasicRemote().sendText(message);                } catch (IOException e) {                    log.error("websocket send result is exception: ", e);                }                return;            }            for (Session session : SESSION_SET) {                try {                    session.getBasicRemote().sendText(message);                } catch (IOException e) {                    log.error("websocket send result is exception: ", e);                }            }        }    }}
   复制代码
 
2.2 soul-bootstrap 启动源码分析
创建org.dromara.soul.plugin.sync.data.weboscket.WebsocketSyncDataService实例
 @Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
    @Bean    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {        log.info("you use websocket sync soul data.......");        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));    }
    ...}
   复制代码
 
WebsocketSyncDataService构造器执行了以下逻辑
- 根据配置 - soul.sync.websocket.urls的配置创建- SoulWebsocketClient对象,放入 clients。
 
- 遍历 clients,监理- websocket连接。
 
- 创建定时任务,每 30 秒执行一次,判断 websocket 连接是否断开,断开时进行重新连接。 
 public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
    private final List<WebSocketClient> clients = new ArrayList<>();
    private final ScheduledThreadPoolExecutor executor;     public WebsocketSyncDataService(final WebsocketConfig websocketConfig,                                    final PluginDataSubscriber pluginDataSubscriber,                                    final List<MetaDataSubscriber> metaDataSubscribers,                                    final List<AuthDataSubscriber> authDataSubscribers) {        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));        for (String url : urls) {            try {                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));            } catch (URISyntaxException e) {                log.error("websocket url({}) is error", url, e);            }        }        try {            for (WebSocketClient client : clients) {                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);                if (success) {                    log.info("websocket connection is successful.....");                } else {                    log.error("websocket connection is error.....");                }                executor.scheduleAtFixedRate(() -> {                    try {                        if (client.isClosed()) {                            boolean reconnectSuccess = client.reconnectBlocking();                            if (reconnectSuccess) {                                log.info("websocket reconnect is successful.....");                            } else {                                log.error("websocket reconnection is error.....");                            }                        }                    } catch (InterruptedException e) {                        log.error("websocket connect is error :{}", e.getMessage());                    }                }, 10, 30, TimeUnit.SECONDS);            }            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/        } catch (InterruptedException e) {            log.info("websocket connection...exception....", e);        }
    }        @Override    public void close() {       ....    }}
   复制代码
 
SoulWebsocketClient与配置服务连接成功后,会发送一条 MYSELF 事件消息
 @Slf4jpublic final class SoulWebsocketClient extends WebSocketClient {    		.....        @Override    public void onOpen(final ServerHandshake serverHandshake) {        if (!alreadySync) {            send(DataEventTypeEnum.MYSELF.name());            alreadySync = true;        }    }   ......}
   复制代码
 
配置服务,接受到 MYSELF 消息,通过 SyncDataServiceImpl 的 syncAll(final DataEventTypeEnum type)方法将配置数据 push 到 Soul 网关
 @ServerEndpoint("/websocket")public class WebsocketCollector {    ......          @OnMessage    public void onMessage(final String message, final Session session) {        if (message.equals(DataEventTypeEnum.MYSELF.name())) {            WebsocketCollector.session = session;            SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);        }    }        ......}
   复制代码
 
2.3 soul-admin 在用户发生配置变更后源码分析
接下来,通过描述更新 divide 插件的 Selector 配置时的执行流程来分析源码,其他配置的修改也是类似的。
当更新Selector时,会向 http://localhost:9095/selector/{selectorId} 发送请求
执行SelectorController.updateSelector() 方法,然后调用SelectorServiceImpl.createOrUpdate() ,更新配置到数据库,然后通过eventPublisher发送 SELECTOR 更新event。
 @Service("selectorService")public class SelectorServiceImpl implements SelectorService {
    private final SelectorMapper selectorMapper;
    private final SelectorConditionMapper selectorConditionMapper;      private final PluginMapper pluginMapper;
    private final ApplicationEventPublisher eventPublisher;
    @Override    @Transactional(rollbackFor = RuntimeException.class)    public int createOrUpdate(final SelectorDTO selectorDTO) {        int selectorCount;        SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);        List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();        if (StringUtils.isEmpty(selectorDTO.getId())) {            selectorCount = selectorMapper.insertSelective(selectorDO);            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));            });        } else {            selectorCount = selectorMapper.updateSelective(selectorDO);            //delete rule condition then add            selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));            selectorConditionDTOs.forEach(selectorConditionDTO -> {                selectorConditionDTO.setSelectorId(selectorDO.getId());                SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);                selectorConditionMapper.insertSelective(selectorConditionDO);            });        }        publishEvent(selectorDO, selectorConditionDTOs);        return selectorCount;    }     private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());        List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());        // publish change event.        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));    }
}
   复制代码
 
DataChangedEventDispatcher 接受到刚才发布的 SELECTOR 更新event,将该 event 包含的数据传递给
soul-admin 启动时创建的WebsocketDataChangedListener实例的onSelectorChanged()方法
 public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    private ApplicationContext applicationContext;
    private List<DataChangedListener> listeners;
    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {        this.applicationContext = applicationContext;    }
    @Override    @SuppressWarnings("unchecked")    public void onApplicationEvent(final DataChangedEvent event) {        for (DataChangedListener listener : listeners) {            switch (event.getGroupKey()) {                case APP_AUTH:                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());                    break;                case PLUGIN:                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());                    break;                case RULE:                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());                    break;                case SELECTOR:                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());                    break;                case META_DATA:                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());                    break;                default:                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());            }        }    }	  ......
}
   复制代码
 
WebsocketDataChangedListener.onSelectorChanged()调用 WebsocketCollector.send()方法
 public class WebsocketDataChangedListener implements DataChangedListener {	  	....     @Override    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {        WebsocketData<SelectorData> websocketData =                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);    }		   ....
}
   复制代码
 
WebsocketCollector.send 方法,遍历 SESSION_SET,将数据 push 到所有的 soul 网关
 public class WebsocketCollector {        private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();        private static Session session;     @OnOpen    public void onOpen(final Session session) {        log.info("websocket on open successful....");        SESSION_SET.add(session);    }            public static void send(final String message, final DataEventTypeEnum type) {        if (StringUtils.isNotBlank(message)) {            if (DataEventTypeEnum.MYSELF == type) {                try {                    session.getBasicRemote().sendText(message);                } catch (IOException e) {                    log.error("websocket send result is exception: ", e);                }                return;            }            for (Session session : SESSION_SET) {                try {                    session.getBasicRemote().sendText(message);                } catch (IOException e) {                    log.error("websocket send result is exception: ", e);                }            }        }    }}
   复制代码
 
评论