首頁  >  文章  >  Java  >  深入淺出RxJava_02[訂閱深入與異常處理]的程式碼範例

深入淺出RxJava_02[訂閱深入與異常處理]的程式碼範例

黄舟
黄舟原創
2017-03-04 09:41:191138瀏覽

本教學基於RxJava1.x版本進行全面講解,後續課程將陸續更新,敬請關注…

# #8.可連結的被觀察者

前幾節提到的了回應用過程中的三個細節:被觀察者觀察者和訂閱。 接下來這一節繼續理解下訂閱的其他知識點。

這裡介紹一個叫做ConnectableObservable的可連接被觀察者。一個可連接的Observable與普通的Observable差不多,除了這一點:可連接的Observable在被訂閱時並不開始發射數據,只有在它的connect()被調用時才開始。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個Observable之後才開始發射資料。

Publish

此運算子可以將普通的Observable轉換成可連接的Observable。

可連接的Observable (connectable Observable)與普通的Observable差不多,不過它並不會在被訂閱時開始發射數據,而是直到使用了Connect操作符時才會開始。用這種方法,你可以在任何時候讓一個Observable開始發射資料。

RxJava中connect是ConnectableObservable介面的一個方法,使用publish運算元可以將一個普通的Observable轉換為一個ConnectableObservable。

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

與普通的Observable物件訂閱不同,上面的程式碼中並沒直接呼叫Action1物件的call()方法。

Connect

可連接的Observable (connectable Observable)與普通的Observable差不多,不過它並不會在被訂閱時開始發射數據,而是直到使用了Connect操作符時才會開始。用這個方法,你可以等待所有的觀察者都訂閱了Observable之後再開始發射資料。

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

//当调用可连接的被观察者connect()方法后 开始发送所有的数据。
connectableObservable.connect(new Action1() {
    @Override
    public void call(Subscription subscription) {
        Log.i(TAG, "call: "+subscription);
    }
});

輸出:

IT520: call: OperatorPublish$PublishSubscriber@20dce78
IT520: call: 1
IT520: call: 2
IT520: call: 3

RefCount

RefCount運算子可以讓一個可連接的Observable轉換成普通的Observable。

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//将一个可链接的被观察者转换成一个普通观察者
Observable<Integer> integerObservable = connectableObservable.refCount();


//为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3...
integerObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Replay

保證所有的觀察者收到相同的資料序列,即使它們在Observable開始發射資料之後才訂閱.

先來看一個例子:

Observable<Integer> observable =
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });

ConnectableObservable<Integer> connectableObservable =observable.publish();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

輸出:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3

首先我們透過publish()將一個普通的Observable轉換成ConnectableObservable。當呼叫connect()的時候,則connect()上面已經訂閱的觀察者會收到資料。而connect()後面訂閱的觀察者則無法接收到資料。 如果我們想要讓所有的觀察者在呼叫connect()的時候同時接收到資料而跟訂閱的順序無關,則需要透過replay()。

Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//这里将publish()改为replay()
ConnectableObservable<Integer> connectableObservable =observable.replay();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

輸出:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3
com.m520it.rxjava I/IT520: call--2--: 1
com.m520it.rxjava I/IT520: call--2--: 2
com.m520it.rxjava I/IT520: call--2--: 3

9.「冷Observable」&「熱Observable」

前面我們提到,訂閱的時候(如果觀察者有發送資料的) ,觀察者有直接接收資料的,有等過了一段時間才接收資料的。

  • 我們將一訂閱觀察者就馬上能接收資料的觀察者稱之為「熱Observable」。

  • 如上面的ConnectableObservable就算訂閱後,也沒能傳送數據,只有呼叫connect()才能讓觀察者接收到資料。我們稱該觀察者為「冷Observable」

10.錯誤處理

很多操作符可用於對Observable發射的onError通知做出回應或從錯誤中恢復

Catch操作符攔截原始Observable的onError通知,將它替換為其它的資料項或資料序列,讓產生的Observable能夠正常終止或根本不終止。

RxJava將Catch實作為三個不同的運算元:

onErrorReturn

onErrorReturn方法傳回鏡像原有Observable行為的新Observable,後者會忽略前者的onError調用,不會將錯誤傳遞給觀察者,作為替代,它會發出一個特殊的項並調用觀察者的onCompleted方法。

下面的程式碼發送1,2,3 並在發送的過程中模擬發送一個異常,只要有異常發送,onErrorReturn()就會被調用並發送44.代碼如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    }).onErrorReturn(new Func1<Throwable, Integer>() {
        @Override
        public Integer call(Throwable throwable) {
            return 44;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

輸出:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 44

onErrorResumeNext

#讓Observable在遇到錯誤時開始發射第二個Observable的資料序列。

下面的程式碼在發送的時候,模擬發送一個異常。接著onErrorResumeNext就會被呼叫 並開始發射新的Observable物件。

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onNext(2);
        subscriber.onError(new NullPointerException("mock exception !"));
        subscriber.onNext(3);
    }
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
    @Override
    public Observable<? extends Integer> call(Throwable throwable) {
            Observable<Integer> innerObservable =
                    Observable.create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(4);
                            subscriber.onNext(5);
                        }
                    });
        return innerObservable;
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

輸出:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 3
com.m520it.rxjava I/IT520: call: 4
com.m520it.rxjava I/IT520: call: 5

onExceptionResumeNext

讓Observable在遇到錯誤時繼續發射後面的資料項。

//创建一个错误处理的Observable对象
Observable<Integer> exceptionObserver = Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(55);
                subscriber.onNext(66);
            }
        });

Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new NullPointerException("mock exception !"));
                subscriber.onNext(3);
            }
        })
        //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver
        .onExceptionResumeNext(exceptionObserver)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG, "call: "+integer);
            }
        });

輸出:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 55
com.m520it.rxjava I/IT520: call: 66

Retry重試機制

如果原始Observable遇到錯誤,重新訂閱它並期望它能正常終止。

RxJava中的實作為retry和retryWhen。

Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    })
    .retry(3)//重复3次订阅
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

類似的函數還有:

  • Javadoc: retry())  無論收到多少次onError通知,都會繼續訂閱並發射原始Observable。

  • Javadoc: retry(long)) retry會最多重新訂閱指定的次數,如果次數超了,不會嘗試再次訂閱

  • Javadoc: retry(Func2))

  • retryWhen

 以上就是深入淺出RxJava_02[訂閱深入與異常處理]的程式碼範例的內容,更多相關內容請關注PHP中文網(www.php.cn)!


#
陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn