Heim >Java >javaLernprogramm >Eine ausführliche Erklärung von RxJava_07 [Multithreading und Hilfsoperationen (Ende)]
In Android-Apps müssen wir häufig auf das Netzwerk zugreifen, um Daten anzufordern. Diese Anforderung ist wie folgt unterteilt:
Netzwerkanfragen werden im Beobachter (untergeordneter Thread) platziert.
Die Verarbeitung der Netzwerkanforderungsergebnisse wird im Beobachter (Hauptthread) platziert.
Abonnement (wenn die Netzwerkanfrage abgeschlossen ist, ist es für den Beobachteten praktisch, den Beobachter zu benachrichtigen)
Um das besser zu realisieren Oben genannten Anforderungen müssen wir wissen, wie ein bestimmter Thread zum Umgang mit dem Beobachteten und dem Beobachter verwendet wird. Im folgenden Artikel werden Thread-bezogene Vorgänge in RxJava vorgestellt.
Gibt den Scheduler an, auf dem ein Beobachter dieses Observable beobachtet.
Um in RxJava den Scheduler anzugeben, auf dem das Observable die Methoden onNext, onCompleted und onError des Beobachters aufrufen soll, müssen Sie den ObserverOn-Operator verwenden und ihm einen geeigneten Scheduler übergeben.
Javadoc: observOn(Scheduler))
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //main Log.i(TAG, "call: "+Thread.currentThread().getName()); subscriber.onNext("Hello Android !"); } }) .observeOn(Schedulers.io()) .subscribe(new Action1<String>() { @Override public void call(String s) { //RxIoScheduler-2 Log.i(TAG, "subscribe call: "+Thread.currentThread().getName()); Log.i(TAG, "subscribe call: "+s); } });
Der obige Code wird hauptsächlich im Hauptthread aufgerufen, sodass er beobachtet wird Was Der Leser druckt den Hauptthread, und die Funktion „ObservOn“ wird verwendet, wodurch der Beobachter zur Ausführung zum RxIoScheduler-2-Thread springt.
Ich weiß nicht, ob Ihnen aufgefallen ist, dass Schedulers.io() im obigen Code den Typ des untergeordneten Threads angibt. Darüber hinaus gibt es viele Thread-Typen. Wie in der folgenden Tabelle gezeigt:
调度器类型 | 效果 |
---|---|
Schedulers.computation() | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread() | 为每个任务创建一个新线程 |
Schedulers.trampoline() | 当其它排队的任务完成后,在当前线程排队开始执行 |
AndroidSchedulers.mainThread() | Android指定的主线程 |
指定Observable自身在哪个调度器上执行。
ObserveOn操作符的作用类似,但是功能很有限,它指示Observable在一个指定的调度器上给观察者发通知。
示例代码如下:
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //RxComputationScheduler-1 Log.i(TAG, "call: "+Thread.currentThread().getName()); subscriber.onNext("Hello Android !"); } }) //指定被观察者在哪个线程中运行 .subscribeOn(Schedulers.computation()) //指定观察者在哪个线程中运行 .observeOn(Schedulers.io()) .subscribe(new Action1<String>() { @Override public void call(String s) { //RxIoScheduler-2 Log.i(TAG, "subscribe call: "+Thread.currentThread().getName()); Log.i(TAG, "subscribe call: "+s); } });
除了上面介绍的2个函数,你也可以用它们调度你自己的任务。下面的示例展示了Scheduler.Worker的用法:
//模拟在子线程执行任务 Scheduler.Worker worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { //call:---RxNewThreadScheduler-1 Log.i(TAG, "call:---"+Thread.currentThread().getName()); } });
Worker类的对象实现了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,所以你可以在订阅取消时停止任务,或者从正在调度的任务内部取消订阅,示例:
final Scheduler.Worker worker = Schedulers.newThread().createWorker(); Subscription mySubscription = worker.schedule(new Action0() { @Override public void call() { while(!worker.isUnsubscribed()) { Log.i(TAG, "do your work !"); //执行完任务后取消订阅状态 worker.unsubscribe(); } } });
Delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量。
示例代码:
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Android"); //如果发送异常 则直接抛出异常 上面的发送无效 //subscriber.onError(new NullPointerException("MOCK")); subscriber.onNext("Android2"); } }) //整体延迟2秒 .delay(2000, TimeUnit.MILLISECONDS) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: " + s); } });
还有一个操作符delaySubscription让你你可以延迟订阅原始Observable。它结合搜一个定义延时的参数。
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Android"); } }) //延迟2秒订阅 .delaySubscription(2,TimeUnit.SECONDS) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: " + s); } });
此操作符可以认为是监听器的一种,它监听onNext()事件和subcribe()事件,会在此两个事件前被调用。此函数分别为:doOnEach(),doOnNext(),doOnSubscribe().
doOnEach操作符让你可以注册一个回调,它产生的Observable每发射一项数据就会调用它一次。
示例代码:
Observable .just("Hello","Android") //每发送一次 就会现在Observer的onNext()中调用一次 .doOnEach(new Observer<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted: "); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: " + s); } });
输出:
com.m520it.rxjava I/IT520: onNext: Hello com.m520it.rxjava I/IT520: call: Hello com.m520it.rxjava I/IT520: onNext: Android com.m520it.rxjava I/IT520: call: Android com.m520it.rxjava I/IT520: onCompleted:
类似的函数还有:
Javadoc: doOnEach(Action1))
Javadoc: doOnEach(Observer))
doOnNext操作符类似于doOnEach(Action1),但是它的Action不是接受一个Notification参数,而是接受发射的数据项。
示例代码
Observable.just(1, 2, 3) .doOnNext(new Action1<Integer>() { @Override public void call(Integer item) { Log.i(TAG, "doOnNext call: "+item); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
输出
com.m520it.rxjava I/IT520: doOnNext call: 1 com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: doOnNext call: 2 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: doOnNext call: 3 com.m520it.rxjava I/IT520: call: 3
doOnSubscribe操作符注册一个动作,当观察者订阅它生成的Observable它就会被调用。
Javadoc: doOnSubscribe(Action0))
Observable.just(1, 2, 3) // 订阅之前调用 .doOnSubscribe(new Action0() { @Override public void call() { Log.i(TAG, "doOnSubscribe call "); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
输出
I/IT520: doOnSubscribe call I/IT520: call: 1 I/IT520: call: 2 I/IT520: call: 3
强制一个Observable连续调用并保证行为正确
一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompleted或onError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。
RxJava提供具有此功能的函数为serialize()
如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable。
RxJava中的实现为timeout函数,我们可以使用该函数作为网络请求的超时异常处理。
示例代码:
//每次发送之后 下一次发送不能超过2秒 如果超过则跳转到onError() Observable.interval(2, TimeUnit.SECONDS) .timeout(2, TimeUnit.SECONDS) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.i(TAG, "call: "+aLong); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { //call throwable: null Log.i(TAG, "call throwable: " + throwable.getLocalizedMessage()); } });
Using操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。
using(Func0,Func1,Action1)操作符接受三个参数:
一个用于创建一次性资源的工厂函数
一个用于创建Observable的工厂函数
一个用于释放资源的函数
当一个观察者订阅using返回的Observable时,using将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),using使用第三个函数释放它创建的资源。
示例代码:
final Observable<Long> observable = Observable.using(new Func0<String>() { //创建一次性资源 @Override public String call() { return "Hello Android !"; } }, new Func1<String, Observable<Long>>() { //创建被观察者 @Override public Observable<Long> call(String s) { Log.i(TAG, "Func1 call: " + s); return Observable.interval(1, TimeUnit.SECONDS); } }, new Action1<String>() { //用于销毁一次性资源 @Override public void call(String s) { Log.i(TAG, "Action1 call: " + s); } }); observable.subscribe(new Subscriber<Long>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted: "); } @Override public void onError(Throwable e) { } @Override public void onNext(Long aLong) { Log.i(TAG, "onNext: "+aLong); //取消订阅后 才能执行被观察者的销毁资源方法 unsubscribe(); } });
在安卓APP中,我们经常需要通过访问网络获取数据,请求网络数据需要在子线程中操作,以下将这需求进行分解:
将网络请求放在被观察者中(子线程)。
网络请求结果处理放在观察者中(主线程)。
订阅(当网络请求完成后,方便被观察者通知观察者)
为了更好的实现上面的需求,我们需要知道如何在被观察者与观察者如何使用特定的线程来处理。下面的文章将介绍RxJava中线程相关的操作。
以上就是深入浅出RxJava_07[多线程&辅助操作(完)]的内容,更多相关内容请关注PHP中文网(www.php.cn)!