Maison  >  Article  >  Java  >  Un exemple de code de RxJava_02 [Profondeur d'abonnement et gestion des exceptions]

Un exemple de code de RxJava_02 [Profondeur d'abonnement et gestion des exceptions]

黄舟
黄舟original
2017-03-04 09:41:191148parcourir

Ce tutoriel est basé sur RxJava1 >8. Connectable ObservablesLes sections précédentes mentionnaient trois détails dans le processus de candidature : Observables, Observateurs et Abonnements. La section suivante continue de comprendre d'autres points de connaissances sur l'abonnement.
Nous introduisons ici un observable connectable appelé ConnectableObservable. Un Observable connectable est comme un Observable normal, à l'exception de ceci : un Observable connectable ne commence pas à émettre de données lorsqu'il est abonné, uniquement lorsque son connect() est appelé. De cette façon, vous pouvez attendre que tous les abonnés potentiels se soient abonnés à l'Observable avant de commencer à émettre des données.

Publier

Cet opérateur peut convertir un Observable normal en un Observable connectable.

Un Observable connectable (connectable Observable) est similaire à un Observable normal, mais il ne commence pas à émettre de données lorsqu'il est abonné, mais ne démarre pas tant que l'opérateur Connect n'est pas utilisé. De cette façon, vous pouvez demander à un Observable de commencer à émettre des données à tout moment.

Connect dans RxJava est une méthode de l'interface ConnectableObservable Vous pouvez utiliser l'opérateur de publication pour convertir un Observable ordinaire en ConnectableObservable.

Différent des abonnements d'objets observables ordinaires, la méthode call() de l'objet Action1 n'est pas directement appelée dans le code ci-dessus.

Connect

Un observable connectable (connectable Observable) est similaire à un observable normal, mais il ne commence pas à émettre des données lorsqu'il est abonné, mais pas tant que l'opérateur Connect ne démarre pas. . Avec cette méthode, vous pouvez attendre que tous les observateurs soient abonnés à l'Observable avant de commencer à émettre des données.
//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Sortie :

RefCount

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

//当调用可连接的被观察者connect()方法后 开始发送所有的数据。
connectableObservable.connect(new Action1() {
    @Override
    public void call(Subscription subscription) {
        Log.i(TAG, "call: "+subscription);
    }
});
L'opérateur RefCount peut convertir un observable connectable en un observable ordinaire.

IT520: call: OperatorPublish$PublishSubscriber@20dce78
IT520: call: 1
IT520: call: 2
IT520: call: 3
Replay

Garantit que tous les observateurs reçoivent la même séquence de données, même s'ils se sont abonnés après que l'Observable ait commencé à émettre des données

Regardons d'abord un exemple :
//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//将一个可链接的被观察者转换成一个普通观察者
Observable<Integer> integerObservable = connectableObservable.refCount();


//为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3...
integerObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Sortie :

Nous convertissons d'abord un Observable ordinaire en un ConnectableObservable via publi(). Lorsque connect() est appelé, les observateurs abonnés ci-dessus connect() recevront les données. Les observateurs abonnés après connect() ne peuvent pas recevoir de données. Si nous voulons que tous les observateurs reçoivent des données en même temps lors de l'appel de connect(), quel que soit l'ordre des abonnements, nous devons utiliser replay().

Observable<Integer> observable =
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });

ConnectableObservable<Integer> connectableObservable =observable.publish();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

Sortie :

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3

9. "Froid Observable" & "Hot Observable"

Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//这里将publish()改为replay()
ConnectableObservable<Integer> connectableObservable =observable.replay();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});
Comme nous l'avons mentionné plus tôt, lors de l'abonnement (si l'observateur a Envoi données), certains observateurs reçoivent des données directement et d’autres attendent un certain temps avant de recevoir des données.

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3
com.m520it.rxjava I/IT520: call--2--: 1
com.m520it.rxjava I/IT520: call--2--: 2
com.m520it.rxjava I/IT520: call--2--: 3

Nous appelons un observateur qui peut recevoir des données immédiatement après s'être abonné à l'observateur un "observable chaud".

    Par exemple, le ConnectableObservable ci-dessus ne peut pas envoyer de données même après avoir été abonné. Ce n'est qu'en appelant connect() que l'observateur peut recevoir les données. Nous appelons cet observateur un "Observable froid"
  • 10. Gestion des erreurs
  • De nombreux opérateurs peuvent être utilisés pour répondre aux notifications onError émises par un Observable ou aux erreurs La récupération

  • L'opérateur Catch intercepte la notification onError de l'Observable d'origine et la remplace par d'autres éléments de données ou séquences de données, afin que l'Observable généré puisse se terminer normalement ou pas du tout.

RxJava implémente Catch sous la forme de trois opérateurs différents :

onErrorReturn

La méthode onErrorReturn renvoie un nouvel observable qui reflète le comportement de l'observable d'origine. ce dernier ignorera l'appel onError du premier et ne transmettra pas l'erreur à l'observateur, il émettra plutôt un élément spécial et appellera la méthode onCompleted de l'observateur.

Le code suivant envoie 1, 2, 3 et simule l'envoi d'une exception pendant le processus d'envoi. Tant qu'une exception est envoyée, onErrorReturn() sera appelé et 44 sera envoyé. :

Sortie :

onErrorResumeNext
Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    }).onErrorReturn(new Func1<Throwable, Integer>() {
        @Override
        public Integer call(Throwable throwable) {
            return 44;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

Laissez l'Observable commencer à émettre la séquence de données du deuxième Observable lorsqu'il rencontre un erreur.
com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 44

Le code suivant simule l'envoi d'une exception lors de l'envoi. Ensuite, onErrorResumeNext sera appelé et commencera à émettre de nouveaux objets observables.

Sortie :

onExceptionResumeNext
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onNext(2);
        subscriber.onError(new NullPointerException("mock exception !"));
        subscriber.onNext(3);
    }
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
    @Override
    public Observable<? extends Integer> call(Throwable throwable) {
            Observable<Integer> innerObservable =
                    Observable.create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(4);
                            subscriber.onNext(5);
                        }
                    });
        return innerObservable;
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Laissez l'Observable continuer à émettre les éléments de données suivants lorsqu'il rencontre une erreur.
com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 3
com.m520it.rxjava I/IT520: call: 4
com.m520it.rxjava I/IT520: call: 5

Sortie :

Mécanisme de nouvelle tentative

//创建一个错误处理的Observable对象
Observable<Integer> exceptionObserver = Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(55);
                subscriber.onNext(66);
            }
        });

Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new NullPointerException("mock exception !"));
                subscriber.onNext(3);
            }
        })
        //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver
        .onExceptionResumeNext(exceptionObserver)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG, "call: "+integer);
            }
        });
Si l'Observable d'origine rencontre une erreur, réabonnez-vous et attendez-vous à ce qu'il se termine correctement.

L'implémentation dans RxJava est retry et retryWhen.
com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 55
com.m520it.rxjava I/IT520: call: 66

Les fonctions similaires incluent :

Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    })
    .retry(3)//重复3次订阅
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

Javadoc : retry()) Quel que soit le nombre de notifications onError reçues, l'abonnement continuera et émettent l'Observable original.

    Javadoc : retry(long)) réessayer se réabonnera jusqu'au nombre de fois spécifié. Si le nombre de fois dépasse, il ne tentera pas de se réabonner
  • Javadoc : retry(Func2))
  • retryWhen

Ce qui précède est le contenu de l'exemple de code de RxJava_02 [profondeur d'abonnement et gestion des exceptions] en termes simples. Pour plus de contenu connexe, veuillez faire attention au site Web PHP chinois (www.php.cn). )!


Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn