上一篇讲到 soul
是如何使用 websocket
进行数据同步的,今天来分析下,websocket
连接是什么时候建立的。
上一篇也讲到,启动 soul-admin
时,因为 yml
配置了使用 websocket
进行同步,会加载这三个类,WebsocketCollector
这个类就是开启一个 websocket
服务。
soul-bootstrap
的 pom
文件引入了这个 starter
。
<!--soul data sync start use websocket-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
复制代码
启动 soul-bootstrap
时,就会去寻找 soul-spring-boot-starter-sync-data-websocket
包 resources/META-INF/spring.factories
文件,然后根据文件中配置去加载指定模块。
//spring.factories 文件内容
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
复制代码
这个文件配置的就是 WebsocketSyncDataConfiguration
类。
看到这个类的代码,我先去查了下 ObjectProvider
相关知识。
在 spring4.3
之前,我们需要在一个类 A 里注入另一个类 B 时,会使用 @Autowired
注解,不加程序会报异常。4.3 后,引入了一个新特性,我们只需要在类 A 加一个构造函数,B 作为构造函数的参数传进来,就可以不加 @Autowired
,但 B 必须要在 spring
容器中,否则会出现异常,此时我们就需要 引入 ObjectProvider
。
//4.3 之前
@Service
public class A {
private final B b;
@Autowired
public A (B b) {
this.b = b
}
}
//4.3 之后,不需要加 @Autowired 注解,但如果 B 不在 spring 容器,会报异常
@Service
public class A {
private final B b;
public A (B b) {
this.b = b
}
}
//引入 ObjectProvider
@Service
public class A {
private final B b;
public A(ObjectProvider<B> bProvider) {
//如果不可用或不唯一(没有指定primary)则返回null。否则,返回对象。
this.b = bProvider.getIfUnique();
}
}
复制代码
在下面这个类里,使用的是 ObjectProvider
的 getIfAvailable(Supplier<T> defaultSupplier)
方法,这个方法是说 如果对象存在直接返回,对象不存在,就进行回调,回调对象由 defaultSupplier
提供。
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
/**
* Websocket sync data service.
*/
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Config websocket config.
*
* @return the websocket config
*/
@Bean
@ConfigurationProperties(prefix = "soul.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig();
}
}
复制代码
WebsocketSyncDataConfiguration
这个类的加载依赖于 soul.sync.websocket
,而我们在 soul-bootstrap
配置如下,所以会加载这个类。
可以看到这个类创建了一个 WebsocketSyncDataService
对象,就是在这个 service
,创建了一个 webSocket
客户端,和我们在 soul-admin
创建的 websocket
服务建立了连接,这里还有一个线程池 ScheduledThreadPoolExecutor
,如果客户端连接关闭,会定时尝试重新连接。
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
复制代码
因此 soul-admin
,soul-bootstrap
项目启动完成,websocket
连接也建立成功了,接下来有数据变动,soul-admin
服务端就会向客户端发送消息,数据就及时的同步到内存中。
参考资料
spring ObjectProvider 源码分析
评论