Home >Java >javaTutorial >A code example of RxJava_02 [Subscription depth and exception handling]
This tutorial is a comprehensive explanation based on the RxJava1.x version. Subsequent courses will be updated one after another, so stay tuned...
The previous sections mentioned three details in the response application process: observed observers and subscriptions. The next section continues to understand other knowledge points about subscription.
Here we introduce a connectable observable called ConnectableObservable. A connectable Observable is just like a normal Observable, except for this: a connectable Observable does not start emitting data when it is subscribed to, only when its connect() is called. In this way, you can wait until all potential subscribers are subscribed to the Observable before starting to emit data.
This operator can convert an ordinary Observable into a connectable Observable.
Connectable Observable (connectable Observable) is similar to an ordinary Observable, but it does not start emitting data when it is subscribed, but does not start until the Connect operator is used. In this way, you can have an Observable start emitting data at any time.
Connect in RxJava is a method of the ConnectableObservable interface. You can use the publish operator to convert an ordinary Observable into a ConnectableObservable.
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Different from ordinary Observable object subscriptions, the call() method of the Action1 object is not directly called in the above code.
A connectable Observable (connectable Observable) is similar to a normal Observable, but it does not start emitting data when it is subscribed, but not until the Connect operator is used. will start. With this method, you can wait until all observers are subscribed to the Observable before starting to emit data.
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } }); //当调用可连接的被观察者connect()方法后 开始发送所有的数据。 connectableObservable.connect(new Action1() { @Override public void call(Subscription subscription) { Log.i(TAG, "call: "+subscription); } });
Output:
IT520: call: OperatorPublish$PublishSubscriber@20dce78 IT520: call: 1 IT520: call: 2 IT520: call: 3
The RefCount operator can convert a connectable Observable into an ordinary Observable.
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //将一个可链接的被观察者转换成一个普通观察者 Observable<Integer> integerObservable = connectableObservable.refCount(); //为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3... integerObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Ensure that all observers receive the same data sequence, even if they subscribe after the Observable starts emitting data.
Let’s look at an example first:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); ConnectableObservable<Integer> connectableObservable =observable.publish(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
Output:
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3
First we convert an ordinary Observable into a ConnectableObservable through publish(). When connect() is called, the observers subscribed above connect() will receive the data. Observers subscribed after connect() cannot receive data. If we want all observers to receive data at the same time when calling connect(), regardless of the order of subscriptions, we need to use replay().
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //这里将publish()改为replay() ConnectableObservable<Integer> connectableObservable =observable.replay(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
Output:
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3 com.m520it.rxjava I/IT520: call--2--: 1 com.m520it.rxjava I/IT520: call--2--: 2 com.m520it.rxjava I/IT520: call--2--: 3
We mentioned earlier that when subscribing (if the observer sends data) , some observers receive data directly, and some wait for a period of time before receiving data.
We call an observer that can receive data as soon as it is subscribed to an observer a "hot Observable".
For example, the ConnectableObservable above cannot send data even after being subscribed. Only by calling connect() can the observer receive the data. We call this observer a "cold Observable"
Many operators can be used to respond to onError notifications emitted by an Observable or from errors. Recovery
The Catch operator intercepts the onError notification of the original Observable and replaces it with other data items or data sequences, so that the generated Observable can terminate normally or not at all.
RxJava implements Catch as three different operators:
onErrorReturn
The onErrorReturn method returns a new Observable that mirrors the behavior of the original Observable, The latter will ignore the former's onError call and will not pass the error to the observer. Instead, it will emit a special item and call the observer's onCompleted method.
The following code sends 1, 2, 3 and simulates sending an exception during the sending process. As long as an exception is sent, onErrorReturn() will be called and 44 will be sent. The code is as follows:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 44; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Output:
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 44
onErrorResumeNext
Let the Observable start emitting the data sequence of the second Observable when it encounters an error.
The following code simulates sending an exception when sending. Then onErrorResumeNext will be called and start emitting new Observable objects.
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { Observable<Integer> innerObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(4); subscriber.onNext(5); } }); return innerObservable; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Output:
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 3 com.m520it.rxjava I/IT520: call: 4 com.m520it.rxjava I/IT520: call: 5
onExceptionResumeNext
Let the Observable continue to emit subsequent data items when encountering an error.
//创建一个错误处理的Observable对象 Observable<Integer> exceptionObserver = Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(55); subscriber.onNext(66); } }); Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver .onExceptionResumeNext(exceptionObserver) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Output:
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 55 com.m520it.rxjava I/IT520: call: 66
If the original Observable encounters an error, resubscribe to it and expect it to terminate gracefully.
The implementation in RxJava is retry and retryWhen.
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) .retry(3)//重复3次订阅 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Similar functions are:
Javadoc: retry()) No matter how many onError notifications are received, it will continue to subscribe and launch Primitive Observable.
Javadoc: retry(long)) retry will re-subscribe up to the specified number of times. If the number of times exceeds, it will not try to subscribe again
Javadoc: retry(Func2))
retryWhen
The above is an in-depth explanation of the code example of RxJava_02 [Subscription depth and exception handling]. For more related content, please pay attention to the PHP Chinese website (www.php.cn)!