ホームページ >Java >&#&チュートリアル >RxJava_02のコード例【サブスクリプションの深さと例外処理】
このチュートリアルは、RxJava1 に基づく包括的な説明です。プロセスの 3 つの詳細: オブザーバー、オブザーバー、サブスクリプション。 次のセクションでは、引き続きサブスクリプションに関するその他の知識ポイントを理解します。 ここでは、ConnectableObservable と呼ばれる接続可能なオブザーバブルを紹介します。接続可能な Observable は通常の Observable と似ていますが、次の点が異なります。接続可能な Observable は、サブスクライブされたときはデータの出力を開始せず、connect() が呼び出されたときにのみデータの出力を開始します。このようにして、すべての潜在的なサブスクライバーが Observable にサブスクライブされるまで、データの送信を開始する前に待つことができます。
Publish
接続可能な Observable (接続可能な Observable) は通常の Observable と似ていますが、サブスクライブされたときにデータの送信を開始せず、Connect オペレーターが使用されるまで開始されません。このようにして、Observable がいつでもデータの出力を開始できるようにします。
RxJava の Connect は ConnectableObservable インターフェースのメソッドであり、publish オペレーターを使用して通常の Observable を ConnectableObservable に変換できます。
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Connect
接続可能な Observable (接続可能な Observable) は通常の Observable と似ていますが、サブスクライブされたときにデータの送信を開始せず、Connect オペレーターが使用されるまで開始されません。このメソッドを使用すると、データの送信を開始する前に、すべてのオブザーバーが Observable にサブスクライブされるのを待つことができます。
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } }); //当调用可连接的被观察者connect()方法后 开始发送所有的数据。 connectableObservable.connect(new Action1() { @Override public void call(Subscription subscription) { Log.i(TAG, "call: "+subscription); } });
出力:
IT520: call: OperatorPublish$PublishSubscriber@20dce78 IT520: call: 1 IT520: call: 2 IT520: call: 3
RefCount
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //将一个可链接的被观察者转换成一个普通观察者 Observable<Integer> integerObservable = connectableObservable.refCount(); //为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3... integerObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Replay
は、Observable がデータの送信を開始した後にサブスクライブした場合でも、すべてのオブザーバーが同じデータ シーケンスを受信することを保証します。
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); ConnectableObservable<Integer> connectableObservable =observable.publish(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
出力:
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //这里将publish()改为replay() ConnectableObservable<Integer> connectableObservable =observable.replay(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
出力:
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3 com.m520it.rxjava I/IT520: call--2--: 1 com.m520it.rxjava I/IT520: call--2--: 2 com.m520it.rxjava I/IT520: call--2--: 3
9.「コールドオブザーバブル」と「ホットオブザーバブル」
サブスクライブするとき(オブザーバーがデータを送信する場合)、オブザーバーはデータを直接受信するか、受信するまでに時間がかかると前述しました。データ。
オブザーバーを購読するとすぐにデータを受信できるオブザーバーを「ホットオブザーバブル」と呼びます。
たとえば、上記の ConnectableObservable は、サブスクライブされた後でもデータを送信できません。connect() を呼び出すことによってのみ、オブザーバーはデータを受信できます。このオブザーバーを「コールド Observable」と呼びます
10. エラー処理
Catch オペレーターは、元の Observable の onError 通知をインターセプトします、生成された Observable が正常に終了するか、まったく終了しないように、他のデータ項目またはデータ シーケンスに置き換えます。
onErrorReturn メソッドは、元の Observable の動作をミラーリングする新しい Observable を返します。後者は、前者の onError 呼び出しを無視し、代わりにエラーをオブザベーションに渡しません。 、特別なアイテムを発行し、オブザーバーの onCompleted メソッドを呼び出します。
次のコードは 1、2、3 を送信し、例外が送信される限り、onErrorReturn() が呼び出され、44 が送信されます。出力:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 44; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
onErrorResumeNext
により、Observable は、エラーが発生したときに 2 番目の Observable のデータ シーケンスの出力を開始できます。
次のコードは、送信時の例外の送信をシミュレートします。次に、onErrorResumeNext が呼び出され、新しい Observable オブジェクトの生成が開始されます。
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 44
出力:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { Observable<Integer> innerObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(4); subscriber.onNext(5); } }); return innerObservable; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
onExceptionResumeNext
エラーが発生した場合でも、Observable が後続のデータ項目を出力し続けるようにします。
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 3 com.m520it.rxjava I/IT520: call: 4 com.m520it.rxjava I/IT520: call: 5
出力:
//创建一个错误处理的Observable对象 Observable<Integer> exceptionObserver = Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(55); subscriber.onNext(66); } }); Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver .onExceptionResumeNext(exceptionObserver) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
再試行メカニズム
元の Observable でエラーが発生した場合は、それを再サブスクライブし、正常に終了することを期待します。 RxJava での実装は retry と retryWhen です。
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 55 com.m520it.rxjava I/IT520: call: 66同様の関数は次のとおりです:
Javadoc: retry()) onError 通知を何回受信しても、サブスクライブを続けて元の Observable を発行します。
Javadoc: retry(long)) retryは、指定された回数まで再サブスクライブします
上記は、RxJava_02 [サブスクリプションの深さと例外処理] のコード例の詳細な説明です。さらに関連するコンテンツについては、PHP 中国語 Web サイト (www.php.cn) に注目してください。