ホームページ  >  記事  >  Java  >  RxJava_02のコード例【サブスクリプションの深さと例外処理】

RxJava_02のコード例【サブスクリプションの深さと例外処理】

黄舟
黄舟オリジナル
2017-03-04 09:41:191102ブラウズ

このチュートリアルは、RxJava1 に基づく包括的な説明です。プロセスの 3 つの詳細: オブザーバー、オブザーバー、サブスクリプション。 次のセクションでは、引き続きサブスクリプションに関するその他の知識ポイントを理解します。 ここでは、ConnectableObservable と呼ばれる接続可能なオブザーバブルを紹介します。接続可能な Observable は通常の Observable と似ていますが、次の点が異なります。接続可能な Observable は、サブスクライブされたときはデータの出力を開始せず、connect() が呼び出されたときにのみデータの出力を開始します。このようにして、すべての潜在的なサブスクライバーが Observable にサブスクライブされるまで、データの送信を開始する前に待つことができます。
Publish

このオペレーターは、通常の Observable を接続可能な Observable に変換できます。

接続可能な Observable (接続可能な Observable) は通常の Observable と似ていますが、サブスクライブされたときにデータの送信を開始せず、Connect オペレーターが使用されるまで開始されません。このようにして、Observable がいつでもデータの出力を開始できるようにします。

RxJava の Connect は ConnectableObservable インターフェースのメソッドであり、publish オペレーターを使用して通常の Observable を ConnectableObservable に変換できます。

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

通常の Observable オブジェクトのサブスクリプションとは異なり、上記のコードでは Action1 オブジェクトの call() メソッドは直接呼び出されません。

Connect

接続可能な Observable (接続可能な Observable) は通常の Observable と似ていますが、サブスクライブされたときにデータの送信を開始せず、Connect オペレーターが使用されるまで開始されません。このメソッドを使用すると、データの送信を開始する前に、すべてのオブザーバーが Observable にサブスクライブされるのを待つことができます。

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

//当调用可连接的被观察者connect()方法后 开始发送所有的数据。
connectableObservable.connect(new Action1() {
    @Override
    public void call(Subscription subscription) {
        Log.i(TAG, "call: "+subscription);
    }
});

出力:

IT520: call: OperatorPublish$PublishSubscriber@20dce78
IT520: call: 1
IT520: call: 2
IT520: call: 3

RefCount

RefCount オペレーターは、接続可能な Observable を通常の Observable に変換できます。

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//将一个可链接的被观察者转换成一个普通观察者
Observable<Integer> integerObservable = connectableObservable.refCount();


//为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3...
integerObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Replay

は、Observable がデータの送信を開始した後にサブスクライブした場合でも、すべてのオブザーバーが同じデータ シーケンスを受信することを保証します。

まず例を見てみましょう:

Observable<Integer> observable =
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });

ConnectableObservable<Integer> connectableObservable =observable.publish();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

出力:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3

最初に、publish() を渡します。通常の Observable を ConnectableObservable に変換します。 connect() が呼び出されると、connect() 上でサブスクライブされたオブザーバーがデータを受信します。 connect() の後にサブスクライブされたオブザーバーはデータを受信できません。 connect() を呼び出すときに、サブスクリプションの順序に関係なく、すべてのオブザーバーが同時にデータを受信するようにしたい場合は、replay() を使用する必要があります。

Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//这里将publish()改为replay()
ConnectableObservable<Integer> connectableObservable =observable.replay();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

出力:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3
com.m520it.rxjava I/IT520: call--2--: 1
com.m520it.rxjava I/IT520: call--2--: 2
com.m520it.rxjava I/IT520: call--2--: 3

9.「コールドオブザーバブル」と「ホットオブザーバブル」

サブスクライブするとき(オブザーバーがデータを送信する場合)、オブザーバーはデータを直接受信するか、受信するまでに時間がかかると前述しました。データ。

オブザーバーを購読するとすぐにデータを受信できるオブザーバーを「ホットオブザーバブル」と呼びます。

たとえば、上記の ConnectableObservable は、サブスクライブされた後でもデータを送信できません。connect() を呼び出すことによってのみ、オブザーバーはデータを受信できます。このオブザーバーを「コールド Observable」と呼びます

  • 10. エラー処理

  • 多くのオペレーターを使用して、Observable によって発行された onError 通知に応答したり、エラーから回復したりすることができます
  • Catch オペレーターは、元の Observable の onError 通知をインターセプトします、生成された Observable が正常に終了するか、まったく終了しないように、他のデータ項目またはデータ シーケンスに置き換えます。

  • RxJava は、Catch を 3 つの異なる演算子として実装します。

onErrorReturn

onErrorReturn メソッドは、元の Observable の動作をミラーリングする新しい Observable を返します。後者は、前者の onError 呼び出しを無視し、代わりにエラーをオブザベーションに渡しません。 、特別なアイテムを発行し、オブザーバーの onCompleted メソッドを呼び出します。

次のコードは 1、2、3 を送信し、例外が送信される限り、onErrorReturn() が呼び出され、44 が送信されます。出力:

Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    }).onErrorReturn(new Func1<Throwable, Integer>() {
        @Override
        public Integer call(Throwable throwable) {
            return 44;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

onErrorResumeNext

により、Observable は、エラーが発生したときに 2 番目の Observable のデータ シーケンスの出力を開始できます。

次のコードは、送信時の例外の送信をシミュレートします。次に、onErrorResumeNext が呼び出され、新しい Observable オブジェクトの生成が開始されます。

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 44

出力:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onNext(2);
        subscriber.onError(new NullPointerException("mock exception !"));
        subscriber.onNext(3);
    }
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
    @Override
    public Observable<? extends Integer> call(Throwable throwable) {
            Observable<Integer> innerObservable =
                    Observable.create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(4);
                            subscriber.onNext(5);
                        }
                    });
        return innerObservable;
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

onExceptionResumeNext

エラーが発生した場合でも、Observable が後続のデータ項目を出力し続けるようにします。

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 3
com.m520it.rxjava I/IT520: call: 4
com.m520it.rxjava I/IT520: call: 5

出力:

//创建一个错误处理的Observable对象
Observable<Integer> exceptionObserver = Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(55);
                subscriber.onNext(66);
            }
        });

Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new NullPointerException("mock exception !"));
                subscriber.onNext(3);
            }
        })
        //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver
        .onExceptionResumeNext(exceptionObserver)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG, "call: "+integer);
            }
        });

再試行メカニズム

元の Observable でエラーが発生した場合は、それを再サブスクライブし、正常に終了することを期待します。 RxJava での実装は retry と retryWhen です。

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 55
com.m520it.rxjava I/IT520: call: 66

同様の関数は次のとおりです:

Javadoc: retry()) onError 通知を何回受信しても、サブスクライブを続けて元の Observable を発行します。

Javadoc: retry(long)) retryは、指定された回数まで再サブスクライブします

    Javadoc: retry(Func2))
  • 再試行するとき

上記は、RxJava_02 [サブスクリプションの深さと例外処理] のコード例の詳細な説明です。さらに関連するコンテンツについては、PHP 中国語 Web サイト (www.php.cn) に注目してください。


声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。