@Override
public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
final String service = Context.CONNECTIVITY_SERVICE;
final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
networkCallback = createNetworkCallback(context);
registerIdleReceiver(context);
final NetworkRequest request =
new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
.build();
manager.registerNetworkCallback(request, networkCallback);
return connectivitySubject.toFlowable(BackpressureStrategy.LATEST).doOnCancel(new Action() {
@Override public void run() {
tryToUnregisterCallback(manager);
tryToUnregisterReceiver(context);
}
}).doAfterNext(new Consumer<Connectivity>() {
@Override
public void accept(final Connectivity connectivity) {
lastConnectivity = connectivity;
}
}).flatMap(new Function<Connectivity, Publisher<Connectivity>>() {
@Override
public Publisher<Connectivity> apply(final Connectivity connectivity) {
return propagateAnyConnectedState(lastConnectivity, connectivity);
}
}).startWith(Connectivity.create(context)).distinctUntilChanged().toObservable();
}
评论