@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) { final String service = Context.CONNECTIVITY_SERVICE; final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service); networkCallback = createNetworkCallback(context);
registerIdleReceiver(context);
final NetworkRequest request = new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) .addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED) .build();
manager.registerNetworkCallback(request, networkCallback);
return connectivitySubject.toFlowable(BackpressureStrategy.LATEST).doOnCancel(new Action() { @Override public void run() { tryToUnregisterCallback(manager); tryToUnregisterReceiver(context); } }).doAfterNext(new Consumer<Connectivity>() { @Override public void accept(final Connectivity connectivity) { lastConnectivity = connectivity; } }).flatMap(new Function<Connectivity, Publisher<Connectivity>>() { @Override public Publisher<Connectivity> apply(final Connectivity connectivity) { return propagateAnyConnectedState(lastConnectivity, connectivity); } }).startWith(Connectivity.create(context)).distinctUntilChanged().toObservable(); }
评论