上一篇讲到 soul 是如何使用 websocket 进行数据同步的,今天来分析下,websocket 连接是什么时候建立的。
上一篇也讲到,启动 soul-admin 时,因为 yml 配置了使用 websocket 进行同步,会加载这三个类,WebsocketCollector 这个类就是开启一个 websocket 服务。
soul-bootstrap 的 pom 文件引入了这个 starter。
<!--soul data sync start use websocket--><dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId> <version>${project.version}</version></dependency>
复制代码
启动 soul-bootstrap 时,就会去寻找 soul-spring-boot-starter-sync-data-websocket 包 resources/META-INF/spring.factories 文件,然后根据文件中配置去加载指定模块。
//spring.factories 文件内容org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
复制代码
这个文件配置的就是 WebsocketSyncDataConfiguration 类。
看到这个类的代码,我先去查了下 ObjectProvider 相关知识。
在 spring4.3 之前,我们需要在一个类 A 里注入另一个类 B 时,会使用 @Autowired 注解,不加程序会报异常。4.3 后,引入了一个新特性,我们只需要在类 A 加一个构造函数,B 作为构造函数的参数传进来,就可以不加 @Autowired,但 B 必须要在 spring 容器中,否则会出现异常,此时我们就需要 引入 ObjectProvider。
//4.3 之前@Servicepublic class A { private final B b; @Autowired public A (B b) { this.b = b }}//4.3 之后,不需要加 @Autowired 注解,但如果 B 不在 spring 容器,会报异常@Servicepublic class A { private final B b; public A (B b) { this.b = b }}//引入 ObjectProvider@Servicepublic class A { private final B b; public A(ObjectProvider<B> bProvider) { //如果不可用或不唯一(没有指定primary)则返回null。否则,返回对象。 this.b = bProvider.getIfUnique(); }}
复制代码
在下面这个类里,使用的是 ObjectProvider 的 getIfAvailable(Supplier<T> defaultSupplier) 方法,这个方法是说 如果对象存在直接返回,对象不存在,就进行回调,回调对象由 defaultSupplier 提供。
@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
/** * Websocket sync data service. */ @Bean public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use websocket sync soul data......."); return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); }
/** * Config websocket config. * * @return the websocket config */ @Bean @ConfigurationProperties(prefix = "soul.sync.websocket") public WebsocketConfig websocketConfig() { return new WebsocketConfig(); }}
复制代码
WebsocketSyncDataConfiguration 这个类的加载依赖于 soul.sync.websocket,而我们在 soul-bootstrap 配置如下,所以会加载这个类。
可以看到这个类创建了一个 WebsocketSyncDataService 对象,就是在这个 service,创建了一个 webSocket 客户端,和我们在 soul-admin 创建的 websocket 服务建立了连接,这里还有一个线程池 ScheduledThreadPoolExecutor,如果客户端连接关闭,会定时尝试重新连接。
public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); for (String url : urls) { try { clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url({}) is error", url, e); } } try { for (WebSocketClient client : clients) { boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { log.info("websocket connection is successful....."); } else { log.error("websocket connection is error....."); } executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { log.info("websocket reconnect is successful....."); } else { log.error("websocket reconnection is error....."); } } } catch (InterruptedException e) { log.error("websocket connect is error :{}", e.getMessage()); } }, 10, 30, TimeUnit.SECONDS); } /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ } catch (InterruptedException e) { log.info("websocket connection...exception....", e); }
}
复制代码
因此 soul-admin,soul-bootstrap 项目启动完成,websocket 连接也建立成功了,接下来有数据变动,soul-admin 服务端就会向客户端发送消息,数据就及时的同步到内存中。
参考资料
spring ObjectProvider 源码分析
评论