1
Soul 源码阅读 02|WebSocket 建立连接的过程
发布于: 2021 年 01 月 22 日
一、soul-admin
在 application.yml 中配置同步方式为:websocket
启动 soul-admin 服务,根据上一步的配置,会执行到这里
@Configuration
@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
return new WebsocketDataChangedListener();
}
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
return new WebsocketCollector();
}
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
复制代码
通过 WebsocketCollector
开启一个 websocket 服务
二、网关
在网关的 pom 文件中配置使用 WebSocket 连接:
<!--soul data sync start use websocket-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
复制代码
在 application.yml 中配置要建立连接的 websocket 地址(上面 soul-admin 开启的 websocket 服务地址)
通过 soul-spring-boot-starter-sync-data-websocket,找到 Bean 方法最终会获得一个 WebsocketSyncDataService 对象。
WebsocketSyncDataService 中做了什么事情?
使用了
java-websocket
这个第三方库来进行 WebSocket 连接通过
ScheduledThreadPoolExecutor
实现每 30s 一次的心跳检测
@Slf4j
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor;
/**
* Instantiates a new Websocket sync cache.
*
* @param websocketConfig the websocket config
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
@Override
public void close() {
for (WebSocketClient client : clients) {
if (!client.isClosed()) {
client.close();
}
}
if (Objects.nonNull(executor)) {
executor.shutdown();
}
}
}
复制代码
划线
评论
复制
发布于: 2021 年 01 月 22 日阅读数: 27
哼干嘛
关注
早日自由! 2018.09.30 加入
本职工作是后端开发,偶尔也写写前端和小程序
评论