Heim >Java >javaLernprogramm >Ein Codebeispiel für RxJava_02 [Abonnementtiefe und Ausnahmebehandlung]

Ein Codebeispiel für RxJava_02 [Abonnementtiefe und Ausnahmebehandlung]

黄舟
黄舟Original
2017-03-04 09:41:191180Durchsuche

Dieses Tutorial basiert auf RxJava1 >8.In den vorherigen Abschnitten wurden drei Details im Bewerbungsprozess erwähnt: Observables, Observers und Abonnements. Im nächsten Abschnitt werden weitere Wissenspunkte zum Abonnement erläutert.
Hier stellen wir ein verbindbares Observable namens ConnectableObservable vor. Ein verbindbares Observable ist genau wie ein normales Observable, mit der Ausnahme: Ein verbindbares Observable beginnt nicht mit der Ausgabe von Daten, wenn es abonniert wird, sondern erst, wenn sein connect() aufgerufen wird. Auf diese Weise können Sie warten, bis alle potenziellen Abonnenten das Observable abonniert haben, bevor Sie mit der Datenausgabe beginnen.

Veröffentlichen

Dieser Operator kann ein normales Observable in ein verbindbares Observable umwandeln.

Ein verbindbares Observable (verbindbares Observable) ähnelt einem normalen Observable, beginnt jedoch nicht mit der Ausgabe von Daten, wenn es abonniert wird, sondern erst, wenn der Connect-Operator verwendet wird. Auf diese Weise können Sie ein Observable jederzeit mit der Ausgabe von Daten beginnen lassen.

Connect in RxJava ist eine Methode der ConnectableObservable-Schnittstelle. Sie können den Veröffentlichungsoperator verwenden, um ein gewöhnliches Observable in ein ConnectableObservable umzuwandeln.

Im Gegensatz zu gewöhnlichen Observable-Objektabonnements wird die call()-Methode des Action1-Objekts im obigen Code nicht direkt aufgerufen.

Verbinden

Ein verbindbares Observable (verbindbares Observable) ähnelt einem normalen Observable, beginnt jedoch nicht mit der Ausgabe von Daten, wenn es abonniert wird, sondern erst, wenn der Connect-Operator verwendet wird . Mit dieser Methode können Sie warten, bis alle Beobachter das Observable abonniert haben, bevor Sie mit der Ausgabe von Daten beginnen.
//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Ausgabe:

RefCount

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

//当调用可连接的被观察者connect()方法后 开始发送所有的数据。
connectableObservable.connect(new Action1() {
    @Override
    public void call(Subscription subscription) {
        Log.i(TAG, "call: "+subscription);
    }
});
Der RefCount-Operator kann ein verbindbares Observable in ein gewöhnliches Observable umwandeln.

IT520: call: OperatorPublish$PublishSubscriber@20dce78
IT520: call: 1
IT520: call: 2
IT520: call: 3
Wiedergabe

Stellt sicher, dass alle Beobachter die gleiche Datensequenz erhalten, auch wenn sie sich angemeldet haben, nachdem das Observable mit der Datenausgabe begonnen hat.

Schauen wir uns zunächst ein Beispiel an:
//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//将一个可链接的被观察者转换成一个普通观察者
Observable<Integer> integerObservable = connectableObservable.refCount();


//为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3...
integerObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Ausgabe:

Zuerst konvertieren wir ein gewöhnliches Observable in ein ConnectableObservable durch Publish(). Wenn connect() aufgerufen wird, erhalten die oben abonnierten Beobachter connect() die Daten. Beobachter, die nach connect() abonniert wurden, können keine Daten empfangen. Wenn wir möchten, dass alle Beobachter beim Aufruf von connect() unabhängig von der Reihenfolge der Abonnements gleichzeitig Daten empfangen, müssen wir replay() verwenden.

Observable<Integer> observable =
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });

ConnectableObservable<Integer> connectableObservable =observable.publish();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

Ausgabe:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3

9. „Cold Observable“ und „Hot Observable“

Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//这里将publish()改为replay()
ConnectableObservable<Integer> connectableObservable =observable.replay();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});
Wie bereits erwähnt, beim Abonnieren (wenn der Beobachter über „Senden“ verfügt). Daten), einige Beobachter empfangen Daten direkt und andere warten eine gewisse Zeit, bevor sie Daten empfangen.

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3
com.m520it.rxjava I/IT520: call--2--: 1
com.m520it.rxjava I/IT520: call--2--: 2
com.m520it.rxjava I/IT520: call--2--: 3

Wir nennen einen Beobachter, der Daten sofort nach dem Abonnement des Beobachters empfangen kann, ein „heißes Observable“.

    Zum Beispiel kann das obige ConnectableObservable keine Daten senden, selbst nachdem es abonniert wurde. Nur durch Aufrufen von connect() kann der Beobachter die Daten empfangen. Wir nennen diesen Beobachter ein „kaltes Observable“
  • 10. Fehlerbehandlung
  • Viele Operatoren können verwendet werden, um auf onError-Benachrichtigungen zu reagieren, die von einem Observable oder aus Fehlern ausgegeben werden. Die Wiederherstellung

  • Der Catch-Operator fängt die onError-Benachrichtigung des ursprünglichen Observable ab und ersetzt sie durch andere Datenelemente oder Datensequenzen, sodass das generierte Observable normal oder überhaupt nicht enden kann.

RxJava implementiert Catch als drei verschiedene Operatoren:

onErrorReturn

Die onErrorReturn-Methode gibt ein neues Observable zurück, das das Verhalten des ursprünglichen Observable widerspiegelt Letzterer ignoriert den onError-Aufruf des ersteren und gibt den Fehler nicht an den Beobachter weiter. Stattdessen wird ein spezielles Element ausgegeben und die onCompleted-Methode des Beobachters aufgerufen.

Der folgende Code sendet 1, 2, 3 und simuliert das Senden einer Ausnahme während des Sendevorgangs. Solange eine Ausnahme gesendet wird, wird onErrorReturn() aufgerufen und 44 gesendet. Der Code lautet wie folgt :

Ausgabe:

onErrorResumeNext
Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    }).onErrorReturn(new Func1<Throwable, Integer>() {
        @Override
        public Integer call(Throwable throwable) {
            return 44;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

Lassen Sie das Observable beginnen, die Datensequenz des zweiten Observable auszugeben, wenn es auf ein trifft Fehler.
com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 44

Der folgende Code simuliert das Senden einer Ausnahme beim Senden. Dann wird onErrorResumeNext aufgerufen und beginnt mit der Ausgabe neuer Observable-Objekte.

Ausgabe:

onExceptionResumeNext
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onNext(2);
        subscriber.onError(new NullPointerException("mock exception !"));
        subscriber.onNext(3);
    }
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
    @Override
    public Observable<? extends Integer> call(Throwable throwable) {
            Observable<Integer> innerObservable =
                    Observable.create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(4);
                            subscriber.onNext(5);
                        }
                    });
        return innerObservable;
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Lassen Sie das Observable weiterhin nachfolgende Datenelemente ausgeben, wenn ein Fehler auftritt.
com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 3
com.m520it.rxjava I/IT520: call: 4
com.m520it.rxjava I/IT520: call: 5

Ausgabe:

Wiederholungsmechanismus wiederholen

//创建一个错误处理的Observable对象
Observable<Integer> exceptionObserver = Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(55);
                subscriber.onNext(66);
            }
        });

Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new NullPointerException("mock exception !"));
                subscriber.onNext(3);
            }
        })
        //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver
        .onExceptionResumeNext(exceptionObserver)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG, "call: "+integer);
            }
        });
Wenn beim ursprünglichen Observable ein Fehler auftritt, abonnieren Sie es erneut und erwarten Sie, dass es ordnungsgemäß beendet wird.

Die Implementierung in RxJava ist retry und retryWhen.
com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 55
com.m520it.rxjava I/IT520: call: 66

Ähnliche Funktionen umfassen:

Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    })
    .retry(3)//重复3次订阅
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

Javadoc: retry()) Unabhängig davon, wie viele onError-Benachrichtigungen empfangen werden, wird das Abonnement fortgesetzt und das ursprüngliche Observable ausgeben.

    Javadoc: retry(long)) Bei einem erneuten Versuch wird die angegebene Anzahl von Malen erneut abonniert. Wenn die Anzahl überschritten wird, wird nicht erneut versucht, sich anzumelden
  • Javadoc: retry(Func2))
  • retryWhen

Das Obige ist der Inhalt des Codebeispiels von RxJava_02 [Abonnementtiefe und Ausnahmebehandlung] in einfachen Worten. Weitere verwandte Inhalte finden Sie auf der chinesischen PHP-Website (www.php.cn). )!


Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn