本教學基於RxJava1.x版本進行全面講解,後續課程將陸續更新,敬請關注…
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });與普通的Observable物件訂閱不同,上面的程式碼中並沒直接呼叫Action1物件的call()方法。 Connect可連接的Observable (connectable Observable)與普通的Observable差不多,不過它並不會在被訂閱時開始發射數據,而是直到使用了Connect操作符時才會開始。用這個方法,你可以等待所有的觀察者都訂閱了Observable之後再開始發射資料。
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } }); //当调用可连接的被观察者connect()方法后 开始发送所有的数据。 connectableObservable.connect(new Action1輸出:() { @Override public void call(Subscription subscription) { Log.i(TAG, "call: "+subscription); } });
IT520: call: OperatorPublish$PublishSubscriber@20dce78 IT520: call: 1 IT520: call: 2 IT520: call: 3RefCountRefCount運算子可以讓一個可連接的Observable轉換成普通的Observable。
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //将一个可链接的被观察者转换成一个普通观察者 Observable<Integer> integerObservable = connectableObservable.refCount(); //为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3... integerObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });Replay保證所有的觀察者收到相同的資料序列,即使它們在Observable開始發射資料之後才訂閱.先來看一個例子:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); ConnectableObservable<Integer> connectableObservable =observable.publish(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });輸出:
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3首先我們透過publish()將一個普通的Observable轉換成ConnectableObservable。當呼叫connect()的時候,則connect()上面已經訂閱的觀察者會收到資料。而connect()後面訂閱的觀察者則無法接收到資料。 如果我們想要讓所有的觀察者在呼叫connect()的時候同時接收到資料而跟訂閱的順序無關,則需要透過replay()。
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //这里将publish()改为replay() ConnectableObservable<Integer> connectableObservable =observable.replay(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });輸出:
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3 com.m520it.rxjava I/IT520: call--2--: 1 com.m520it.rxjava I/IT520: call--2--: 2 com.m520it.rxjava I/IT520: call--2--: 39.「冷Observable」&「熱Observable」前面我們提到,訂閱的時候(如果觀察者有發送資料的) ,觀察者有直接接收資料的,有等過了一段時間才接收資料的。
onErrorReturn
onErrorReturn方法傳回鏡像原有Observable行為的新Observable,後者會忽略前者的onError調用,不會將錯誤傳遞給觀察者,作為替代,它會發出一個特殊的項並調用觀察者的onCompleted方法。 下面的程式碼發送1,2,3 並在發送的過程中模擬發送一個異常,只要有異常發送,onErrorReturn()就會被調用並發送44.代碼如下:Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 44; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });輸出:
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 44
onErrorResumeNext
#讓Observable在遇到錯誤時開始發射第二個Observable的資料序列。 下面的程式碼在發送的時候,模擬發送一個異常。接著onErrorResumeNext就會被呼叫 並開始發射新的Observable物件。Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { Observable<Integer> innerObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(4); subscriber.onNext(5); } }); return innerObservable; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });輸出:
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 3 com.m520it.rxjava I/IT520: call: 4 com.m520it.rxjava I/IT520: call: 5
onExceptionResumeNext
讓Observable在遇到錯誤時繼續發射後面的資料項。//创建一个错误处理的Observable对象 Observable<Integer> exceptionObserver = Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(55); subscriber.onNext(66); } }); Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver .onExceptionResumeNext(exceptionObserver) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });輸出:
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 55 com.m520it.rxjava I/IT520: call: 66Retry重試機制如果原始Observable遇到錯誤,重新訂閱它並期望它能正常終止。 RxJava中的實作為retry和retryWhen。
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) .retry(3)//重复3次订阅 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
類似的函數還有:
以上就是深入淺出RxJava_02[訂閱深入與異常處理]的程式碼範例的內容,更多相關內容請關注PHP中文網(www.php.cn)!