>  기사  >  Java  >  RxJava 기본 사용 사례 공유 소스 코드 분석 [코드 포함]

RxJava 기본 사용 사례 공유 소스 코드 분석 [코드 포함]

php是最好的语言
php是最好的语言원래의
2018-07-24 15:02:231472검색

이 기사에서는 RxJava의 기본 사용법을 자세히 설명합니다. RxJava는 사용법이 매우 간단한 마법의 프레임워크이지만 내부 구현이 약간 복잡하고 코드 로직이 약간 복잡합니다. RxJava 소스 코드 분석에 대한 온라인 기사, 소스 코드 게시물은 거의 없고 불완전합니다. 전체 소스 코드 분석은 참고용으로 아래에 나열되어 있습니다.

1. RxJava의 기본 사용법

  Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {
                
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {

            }
        });

2 먼저 .subscribe(new Observera87fdacec66f0909fc0757c19f2d2b1d()) 메서드를 살펴보세요

 public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }

이것은 관찰자 매개변수(ObserverableSubscriber)에 전달된 간단한 캡슐화일 뿐입니다. 구독 방법 읽어보기

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that&#39;s not the appropriate approach
             * so I won&#39;t mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can&#39;t listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren&#39;t we throwing the hook&#39;s return value.
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

여기만 주의하시면 됩니다

  RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
    public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }
RxJavaHooks.onObservableStart(observable, observable.onSubscribe) 
这个方法返回的是它的第二个参数,也就是Observable它自己的onSubscribe 对象,
所以在subscribe 方法里面调用了  onSubscribe.call(subscriber)方法

여기의 구독자는 전달된 매개변수입니다

protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

onSubscribe 객체가 create에서 전달된 매개변수임을 알 수 있습니다. 그러면 전체 프로세스가 매우 명확해집니다.

subscribe 메소드만 호출됩니다. 전체 프로세스가 실행됩니다. subscribe ===> onSubscribe.call(observer) 메소드를 호출하고 관찰자도 전달합니다.

관련 권장 사항:

RxJava Operator (8) Aggregate_PHP 튜토리얼

동영상:

JavaScript Basic 강화 동영상 튜토리얼

위 내용은 RxJava 기본 사용 사례 공유 소스 코드 분석 [코드 포함]의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.