ホームページ >Java >&#&チュートリアル >RxJava の基本的な使用例共有ソースコード分析 [コード付き]
この記事では、RxJava の基本的な使用法について詳しく説明します。RxJava は非常に簡単な使用法を備えた魔法のフレームワークですが、内部実装は少し複雑で、コード ロジックは少し複雑です。 RxJava のソース コード分析に関するオンライン記事は少なく、完全なソース コード分析は参考のために以下にリストされています。
1. RxJava の基本的な使い方
Observable.create(new Observable.OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { } }).subscribe(new Observer<Object>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Object o) { } });
2. まずは .subscribe(new Observera87fdacec66f0909fc0757c19f2d2b1d()) メソッドを見てみましょう
これは、渡されたオブザーバー パラメーター (ObserverableSubscriber) の単なるカプセル化です
サブスクライブメソッドの読み取り
public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber<T>(observer)); }
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }
ここだけ注意する必要があります
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } } return Subscriptions.unsubscribed(); } }
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
ここでのサブスクライバーは渡されるパラメータです
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart; if (f != null) { return f.call(instance, onSubscribe); } return onSubscribe; }
RxJavaHooks.onObservableStart(observable, observable.onSubscribe) 这个方法返回的是它的第二个参数,也就是Observable它自己的onSubscribe 对象, 所以在subscribe 方法里面调用了 onSubscribe.call(subscriber)方法
onSubscribe オブジェクトが create によって渡されるパラメータであることがわかり、プロセス全体が非常に明確です
subscribe メソッドのみが呼び出されます プロセス全体が実行されます: submit ===> onSubscribe.call(observer) メソッドを呼び出し、オブザーバーも渡します。 Aggregate_PHPチュートリアル
ビデオ:以上がRxJava の基本的な使用例共有ソースコード分析 [コード付き]の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。