>  기사  >  Java  >  RxJava_02의 코드 예시 [구독 깊이 및 예외 처리]

RxJava_02의 코드 예시 [구독 깊이 및 예외 처리]

黄舟
黄舟원래의
2017-03-04 09:41:191102검색

이 튜토리얼은 RxJava1을 기반으로 합니다. >8. 연결 가능한 Observable이전 섹션에서는 Observable, Observer 및 Subscription의 세 가지 세부 사항을 언급했습니다. 다음 섹션에서는 구독에 대한 다른 지식 포인트를 계속해서 이해합니다.
여기에서는 ConnectableObservable이라는 연결 가능한 Observable을 소개합니다. 연결 가능한 Observable은 다음을 제외하고 일반 Observable과 같습니다. 연결 가능한 Observable은 구독할 때 데이터 방출을 시작하지 않고 connect()가 호출될 때만 데이터를 방출하기 시작합니다. 이런 방식으로 데이터 방출을 시작하기 전에 모든 잠재적 구독자가 Observable을 구독할 때까지 기다릴 수 있습니다.

Publish

이 연산자는 일반 Observable을 연결 가능한 Observable로 변환할 수 있습니다.

연결 가능한 Observable(연결 가능한 Observable)은 일반 Observable과 유사하지만 구독 시 데이터 방출을 시작하지 않지만 Connect 연산자를 사용할 때까지 시작되지 않습니다. 이런 방식으로 Observable이 언제든지 데이터를 방출하기 시작하도록 할 수 있습니다.

RxJava의 Connect는 ConnectableObservable 인터페이스의 메서드입니다. 게시 연산자를 사용하여 일반 Observable을 ConnectableObservable로 변환할 수 있습니다.

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

일반적인 Observable 객체 구독과 달리 위 코드에서는 Action1 객체의 call() 메서드가 직접 호출되지 않습니다.

Connect

연결 가능한 Observable(연결 가능한 Observable)은 일반 Observable과 유사하지만 구독할 때 데이터를 방출하기 시작하지 않지만 Connect 연산자가 사용될 때까지는 데이터를 방출하지 않습니다. . 이 방법을 사용하면 데이터 방출을 시작하기 전에 모든 관찰자가 Observable을 구독할 때까지 기다릴 수 있습니다.

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

//当调用可连接的被观察者connect()方法后 开始发送所有的数据。
connectableObservable.connect(new Action1() {
    @Override
    public void call(Subscription subscription) {
        Log.i(TAG, "call: "+subscription);
    }
});

출력:

IT520: call: OperatorPublish$PublishSubscriber@20dce78
IT520: call: 1
IT520: call: 2
IT520: call: 3

RefCount

RefCount 연산자는 연결 가능한 Observable을 일반 Observable로 변환할 수 있습니다.

//创建了一个普通的Observable对象
Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable<Integer> connectableObservable =observable.publish();

//将一个可链接的被观察者转换成一个普通观察者
Observable<Integer> integerObservable = connectableObservable.refCount();


//为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3...
integerObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Replay

Observable이 데이터 방출을 시작한 후에 구독하더라도 모든 관찰자가 동일한 데이터 시퀀스를 수신하도록 보장합니다.

먼저 예를 살펴보겠습니다.

Observable<Integer> observable =
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });

ConnectableObservable<Integer> connectableObservable =observable.publish();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

출력:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3

먼저 게시()를 통해 일반 Observable을 ConnectableObservable로 변환합니다. connect()가 호출되면 connect() 위에 구독된 관찰자는 데이터를 수신하게 됩니다. connect() 이후에 구독된 관찰자는 데이터를 수신할 수 없습니다. connect()를 호출할 때 구독 순서에 관계없이 모든 관찰자가 동시에 데이터를 수신하도록 하려면 replay()를 사용해야 합니다.

Observable<Integer> observable =
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//这里将publish()改为replay()
ConnectableObservable<Integer> connectableObservable =observable.replay();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

출력:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3
com.m520it.rxjava I/IT520: call--2--: 1
com.m520it.rxjava I/IT520: call--2--: 2
com.m520it.rxjava I/IT520: call--2--: 3

9. "Cold Observable" & "Hot Observable"

앞서 구독할 때(관찰자가 데이터를 보내는 경우) 일부 관찰자는 데이터를 직접 전송하고 일부는 데이터를 수신하기 전에 일정 시간 동안 기다립니다.

옵저버를 구독하는 즉시 데이터를 받을 수 있는 옵저버를 "hot Observable"이라고 부릅니다.

    예를 들어 위의 ConnectableObservable은 구독 후에도 데이터를 보낼 수 없습니다. connect()를 호출해야만 관찰자가 데이터를 받을 수 있습니다. 우리는 이 관찰자를 "cold Observable"이라고 부릅니다.
  • 10. 오류 처리
  • Observable에서 발생하는 onError 알림이나 오류로 인해 응답하는 데 많은 연산자를 사용할 수 있습니다.

  • Catch 연산자는 원본 Observable의 onError 알림을 가로채서 이를 다른 데이터 항목이나 데이터 시퀀스로 대체하여 생성된 Observable이 정상적으로 종료되거나 전혀 종료되지 않도록 합니다.

RxJava는 세 가지 다른 연산자로 Catch를 구현합니다.

onErrorReturn

onErrorReturn 메소드는 원래 Observable의 동작을 미러링하는 새로운 Observable을 반환합니다. 후자는 전자의 onError 호출을 무시하고 관찰자에게 오류를 전달하지 않습니다. 대신 특수 항목을 내보내고 관찰자의 onCompleted 메서드를 호출합니다.

다음 코드는 1, 2, 3을 보내고 전송 과정에서 예외 전송을 시뮬레이션합니다. 예외가 전송되면 onErrorReturn()이 호출되고 44가 전송됩니다. :

Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    }).onErrorReturn(new Func1<Throwable, Integer>() {
        @Override
        public Integer call(Throwable throwable) {
            return 44;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });
출력:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 44

onErrorResumeNext

오류가 발생하면 Observable이 두 번째 Observable의 데이터 시퀀스를 내보내도록 합니다.

다음 코드는 전송 시 예외 전송을 시뮬레이션합니다. 그런 다음 onErrorResumeNext가 호출되고 새로운 Observable 객체를 방출하기 시작합니다.

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onNext(2);
        subscriber.onError(new NullPointerException("mock exception !"));
        subscriber.onNext(3);
    }
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
    @Override
    public Observable<? extends Integer> call(Throwable throwable) {
            Observable<Integer> innerObservable =
                    Observable.create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(4);
                            subscriber.onNext(5);
                        }
                    });
        return innerObservable;
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});
출력:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 3
com.m520it.rxjava I/IT520: call: 4
com.m520it.rxjava I/IT520: call: 5

onExceptionResumeNext

오류가 발생할 때 Observable이 계속해서 후속 데이터 항목을 내보내도록 합니다.

//创建一个错误处理的Observable对象
Observable<Integer> exceptionObserver = Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(55);
                subscriber.onNext(66);
            }
        });

Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new NullPointerException("mock exception !"));
                subscriber.onNext(3);
            }
        })
        //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver
        .onExceptionResumeNext(exceptionObserver)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG, "call: "+integer);
            }
        });

출력:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 55
com.m520it.rxjava I/IT520: call: 66
재시도 메커니즘

원래 Observable에 오류가 발생하면 다시 구독하고 정상적으로 종료될 것으로 예상하세요.

RxJava의 구현은 retry와 retryWhen입니다.

Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    })
    .retry(3)//重复3次订阅
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

유사한 기능은 다음과 같습니다:

Javadoc: retry()) onError 알림을 몇 번이나 수신하더라도 계속 구독됩니다. Primitive Observable을 방출합니다.

    Javadoc: retry(long)) retry는 지정된 횟수만큼 재구독을 시도합니다.
  • Javadoc: retry(Func2))
  • retryWhen

위 내용은 RxJava_02 [구독 깊이 및 예외 처리] 코드 예제의 내용을 간단히 정리한 내용이며, 더 많은 관련 내용은 PHP 중국어 홈페이지(www.php.cn)를 참고하시기 바랍니다. )!


성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.