写点什么

Soul 学习笔记 --- 数据同步 websocket 连接建立过程分析(五)

用户头像
fightingting
关注
发布于: 2021 年 01 月 20 日
Soul 学习笔记---数据同步 websocket 连接建立过程分析(五)

上一篇讲到 soul 是如何使用 websocket 进行数据同步的,今天来分析下,websocket 连接是什么时候建立的。


上一篇也讲到,启动 soul-admin 时,因为 yml 配置了使用 websocket 进行同步,会加载这三个类,WebsocketCollector 这个类就是开启一个 websocket 服务。



soul-bootstrappom 文件引入了这个 starter


<!--soul data sync start use websocket--><dependency>    <groupId>org.dromara</groupId>    <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>    <version>${project.version}</version></dependency>
复制代码


启动 soul-bootstrap 时,就会去寻找 soul-spring-boot-starter-sync-data-websocketresources/META-INF/spring.factories 文件,然后根据文件中配置去加载指定模块。


//spring.factories 文件内容org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
复制代码


这个文件配置的就是 WebsocketSyncDataConfiguration 类。



看到这个类的代码,我先去查了下 ObjectProvider 相关知识。


spring4.3 之前,我们需要在一个类 A 里注入另一个类 B 时,会使用 @Autowired 注解,不加程序会报异常。4.3 后,引入了一个新特性,我们只需要在类 A 加一个构造函数,B 作为构造函数的参数传进来,就可以不加 @Autowired,但 B 必须要在 spring 容器中,否则会出现异常,此时我们就需要 引入 ObjectProvider


//4.3 之前@Servicepublic class A {    private final B b;    @Autowired    public A (B b) {        this.b = b    }}//4.3 之后,不需要加 @Autowired 注解,但如果 B 不在 spring 容器,会报异常@Servicepublic class A {    private final B b;    public A (B b) {        this.b = b    }}//引入 ObjectProvider@Servicepublic class A {    private final B b;    public A(ObjectProvider<B> bProvider) {    	//如果不可用或不唯一(没有指定primary)则返回null。否则,返回对象。        this.b = bProvider.getIfUnique();    }}
复制代码


在下面这个类里,使用的是 ObjectProvidergetIfAvailable(Supplier<T> defaultSupplier) 方法,这个方法是说 如果对象存在直接返回,对象不存在,就进行回调,回调对象由 defaultSupplier 提供。


@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")@Slf4jpublic class WebsocketSyncDataConfiguration {
/** * Websocket sync data service. */ @Bean public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use websocket sync soul data......."); return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); }
/** * Config websocket config. * * @return the websocket config */ @Bean @ConfigurationProperties(prefix = "soul.sync.websocket") public WebsocketConfig websocketConfig() { return new WebsocketConfig(); }}
复制代码


WebsocketSyncDataConfiguration 这个类的加载依赖于 soul.sync.websocket,而我们在 soul-bootstrap 配置如下,所以会加载这个类。



可以看到这个类创建了一个 WebsocketSyncDataService 对象,就是在这个 service,创建了一个 webSocket 客户端,和我们在 soul-admin 创建的 websocket 服务建立了连接,这里还有一个线程池 ScheduledThreadPoolExecutor,如果客户端连接关闭,会定时尝试重新连接。


    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,                                    final PluginDataSubscriber pluginDataSubscriber,                                    final List<MetaDataSubscriber> metaDataSubscribers,                                    final List<AuthDataSubscriber> authDataSubscribers) {        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));        for (String url : urls) {            try {                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));            } catch (URISyntaxException e) {                log.error("websocket url({}) is error", url, e);            }        }        try {            for (WebSocketClient client : clients) {                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);                if (success) {                    log.info("websocket connection is successful.....");                } else {                    log.error("websocket connection is error.....");                }                executor.scheduleAtFixedRate(() -> {                    try {                        if (client.isClosed()) {                            boolean reconnectSuccess = client.reconnectBlocking();                            if (reconnectSuccess) {                                log.info("websocket reconnect is successful.....");                            } else {                                log.error("websocket reconnection is error.....");                            }                        }                    } catch (InterruptedException e) {                        log.error("websocket connect is error :{}", e.getMessage());                    }                }, 10, 30, TimeUnit.SECONDS);            }            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/        } catch (InterruptedException e) {            log.info("websocket connection...exception....", e);        }
}
复制代码


因此 soul-adminsoul-bootstrap 项目启动完成,websocket 连接也建立成功了,接下来有数据变动,soul-admin 服务端就会向客户端发送消息,数据就及时的同步到内存中。


参考资料


spring ObjectProvider 源码分析


用户头像

fightingting

关注

还未添加个人签名 2018.09.17 加入

还未添加个人简介

评论

发布
暂无评论
Soul 学习笔记---数据同步 websocket 连接建立过程分析(五)