写点什么

Soul 源码阅读 02|WebSocket 建立连接的过程

用户头像
哼干嘛
关注
发布于: 2021 年 01 月 22 日

一、soul-admin

  1. 在 application.yml 中配置同步方式为:websocket

  2. 启动 soul-admin 服务,根据上一步的配置,会执行到这里

@Configuration    @ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)    @EnableConfigurationProperties(WebsocketSyncProperties.class)    static class WebsocketListener {        @Bean        @ConditionalOnMissingBean(WebsocketDataChangedListener.class)        public DataChangedListener websocketDataChangedListener() {            return new WebsocketDataChangedListener();        }              @Bean        @ConditionalOnMissingBean(WebsocketCollector.class)        public WebsocketCollector websocketCollector() {            return new WebsocketCollector();        }
@Bean @ConditionalOnMissingBean(ServerEndpointExporter.class) public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
复制代码

通过 WebsocketCollector 开启一个 websocket 服务

二、网关

  1. 在网关的 pom 文件中配置使用 WebSocket 连接:

<!--soul data sync start use websocket--><dependency>    <groupId>org.dromara</groupId>    <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>    <version>${project.version}</version></dependency>
复制代码
  1. 在 application.yml 中配置要建立连接的 websocket 地址(上面 soul-admin 开启的 websocket 服务地址)

  2. 通过 soul-spring-boot-starter-sync-data-websocket,找到 Bean 方法最终会获得一个 WebsocketSyncDataService 对象。

  3. WebsocketSyncDataService 中做了什么事情?

  • 使用了 java-websocket 这个第三方库来进行 WebSocket 连接

  • 通过 ScheduledThreadPoolExecutor 实现每 30s 一次的心跳检测

@Slf4jpublic class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor; /** * Instantiates a new Websocket sync cache. * * @param websocketConfig the websocket config * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); for (String url : urls) { try { clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url({}) is error", url, e); } } try { for (WebSocketClient client : clients) { boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { log.info("websocket connection is successful....."); } else { log.error("websocket connection is error....."); } executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { log.info("websocket reconnect is successful....."); } else { log.error("websocket reconnection is error....."); } } } catch (InterruptedException e) { log.error("websocket connect is error :{}", e.getMessage()); } }, 10, 30, TimeUnit.SECONDS); } /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ } catch (InterruptedException e) { log.info("websocket connection...exception....", e); }
} @Override public void close() { for (WebSocketClient client : clients) { if (!client.isClosed()) { client.close(); } } if (Objects.nonNull(executor)) { executor.shutdown(); } }}
复制代码


用户头像

哼干嘛

关注

早日自由! 2018.09.30 加入

本职工作是后端开发,偶尔也写写前端和小程序

评论

发布
暂无评论
Soul 源码阅读 02|WebSocket建立连接的过程