1
Soul 源码阅读 02|WebSocket 建立连接的过程
发布于: 2021 年 01 月 22 日
一、soul-admin
在 application.yml 中配置同步方式为:websocket
启动 soul-admin 服务,根据上一步的配置,会执行到这里
@Configuration @ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true) @EnableConfigurationProperties(WebsocketSyncProperties.class) static class WebsocketListener { @Bean @ConditionalOnMissingBean(WebsocketDataChangedListener.class) public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener(); } @Bean @ConditionalOnMissingBean(WebsocketCollector.class) public WebsocketCollector websocketCollector() { return new WebsocketCollector(); }
@Bean @ConditionalOnMissingBean(ServerEndpointExporter.class) public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }复制代码
通过 WebsocketCollector 开启一个 websocket 服务
二、网关
在网关的 pom 文件中配置使用 WebSocket 连接:
<!--soul data sync start use websocket--><dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId> <version>${project.version}</version></dependency>复制代码
在 application.yml 中配置要建立连接的 websocket 地址(上面 soul-admin 开启的 websocket 服务地址)
通过 soul-spring-boot-starter-sync-data-websocket,找到 Bean 方法最终会获得一个 WebsocketSyncDataService 对象。
WebsocketSyncDataService 中做了什么事情?
使用了
java-websocket这个第三方库来进行 WebSocket 连接通过
ScheduledThreadPoolExecutor实现每 30s 一次的心跳检测
@Slf4jpublic class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor; /** * Instantiates a new Websocket sync cache. * * @param websocketConfig the websocket config * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); for (String url : urls) { try { clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url({}) is error", url, e); } } try { for (WebSocketClient client : clients) { boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { log.info("websocket connection is successful....."); } else { log.error("websocket connection is error....."); } executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { log.info("websocket reconnect is successful....."); } else { log.error("websocket reconnection is error....."); } } } catch (InterruptedException e) { log.error("websocket connect is error :{}", e.getMessage()); } }, 10, 30, TimeUnit.SECONDS); } /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ } catch (InterruptedException e) { log.info("websocket connection...exception....", e); }
} @Override public void close() { for (WebSocketClient client : clients) { if (!client.isClosed()) { client.close(); } } if (Objects.nonNull(executor)) { executor.shutdown(); } }}复制代码
划线
评论
复制
发布于: 2021 年 01 月 22 日阅读数: 27
哼干嘛
关注
早日自由! 2018.09.30 加入
本职工作是后端开发,偶尔也写写前端和小程序











评论