ホームページ >Java >&#&チュートリアル >RxJava の基本的な使用例共有ソースコード分析 [コード付き]

RxJava の基本的な使用例共有ソースコード分析 [コード付き]

php是最好的语言
php是最好的语言オリジナル
2018-07-24 15:02:231556ブラウズ

この記事では、RxJava の基本的な使用法について詳しく説明します。RxJava は非常に簡単な使用法を備えた魔法のフレームワークですが、内部実装は少し複雑で、コード ロジックは少し複雑です。 RxJava のソース コード分析に関するオンライン記事は少なく、完全なソース コード分析は参考のために以下にリストされています。

1. RxJava の基本的な使い方

  Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {
                
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {

            }
        });

2. まずは .subscribe(new Observera87fdacec66f0909fc0757c19f2d2b1d()) メソッドを見てみましょう

これは、渡されたオブザーバー パラメーター (ObserverableSubscriber) の単なるカプセル化です

サブスクライブメソッドの読み取り

 public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }
 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

ここだけ注意する必要があります

 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that&#39;s not the appropriate approach
             * so I won&#39;t mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can&#39;t listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren&#39;t we throwing the hook&#39;s return value.
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }
  RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

ここでのサブスクライバーは渡されるパラメータです

    public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }
RxJavaHooks.onObservableStart(observable, observable.onSubscribe) 
这个方法返回的是它的第二个参数,也就是Observable它自己的onSubscribe 对象,
所以在subscribe 方法里面调用了  onSubscribe.call(subscriber)方法

onSubscribe オブジェクトが create によって渡されるパラメータであることがわかり、プロセス全体が非常に明確です

subscribe メソッドのみが呼び出されます プロセス全体が実行されます: submit ===> onSubscribe.call(observer) メソッドを呼び出し、オブザーバーも渡します。 Aggregate_PHPチュートリアル

ビデオ:

JavaScript基礎強化ビデオチュートリアル

以上がRxJava の基本的な使用例共有ソースコード分析 [コード付き]の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。