首頁 >Java >java教程 >深入淺出RxJava_07[多執行緒&輔助操作(完)]

深入淺出RxJava_07[多執行緒&輔助操作(完)]

黄舟
黄舟原創
2017-03-04 10:00:261725瀏覽

在安卓APP中,我們經常需要透過存取網路來取得數據,請求網路數據需要在子執行緒中操作,以下將這需求分解:

  1. 將網路請求放在被觀察者中(子執行緒)。

  2. 網路請求結果處理放在觀察者中(主執行緒)。

  3. 訂閱(當網路請求完成後,方便被觀察者通知觀察者)

為了更好的實現上面的需求,我們需要知道如何在被觀察者與觀察者如何使用特定的線程來處理。下面的文章將介紹RxJava中執行緒相關的操作。

1.ObserveOn

指定一個觀察者在哪個調度器上觀察這個Observable。

RxJava中,要指定Observable應該在哪個調度器上呼叫觀察者的onNext, onCompleted, onError方法,你需要使用observeOn操作符,傳遞給它一個適當的Scheduler。

  • Javadoc: observeOn(Scheduler))

    Observable
        .create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //main
                Log.i(TAG, "call: "+Thread.currentThread().getName());
                subscriber.onNext("Hello Android !");
            }
        })
        .observeOn(Schedulers.io())
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                //RxIoScheduler-2
                Log.i(TAG, "subscribe call: "+Thread.currentThread().getName());
                Log.i(TAG, "subscribe call: "+s);
            }
        });

上面的程式碼中主要是在main執行緒中呼叫的,所以被觀察者列印的是main線程,而使用了observeOn函數,使得觀察者跳到RxIoScheduler-2線程中運行。

不知道大家注意沒有,上面的程式碼中Schedulers.io()指定了子執行緒的型別。除此之外還有很多起來的線程類型。如下表:

##Schedulers.computation( )用於計算任務,如事件循環或和回調處理,不要用於IO操作(IO操作請使用Schedulers.io());預設執行緒數等於處理器的數量Schedulers.from(executor)使用指定的Executor作為調度器Schedulers.immediate()在當前執行緒立即開始執行任務Schedulers.io()#用於IO密集型任務,如非同步阻塞IO操作,這個調度器的執行緒池會根據需要成長;對於普通的運算任務,請使用Schedulers.computation();Schedulers.io()預設是一個CachedThreadScheduler,很像一個有執行緒快取的新執行緒調度器# Schedulers.newThread()為每個任務建立一個新執行緒Schedulers.trampoline()當其它排隊的任務完成後,在目前執行緒排隊開始執行AndroidSchedulers.mainThread()Android指定的主執行緒##

2.SubscribeOn

指定Observable自身在哪个调度器上执行。

ObserveOn操作符的作用类似,但是功能很有限,它指示Observable在一个指定的调度器上给观察者发通知。

示例代码如下:

Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            //RxComputationScheduler-1
            Log.i(TAG, "call: "+Thread.currentThread().getName());
            subscriber.onNext("Hello Android !");
        }
    })
    //指定被观察者在哪个线程中运行
    .subscribeOn(Schedulers.computation())
    //指定观察者在哪个线程中运行
    .observeOn(Schedulers.io())
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            //RxIoScheduler-2
            Log.i(TAG, "subscribe call: "+Thread.currentThread().getName());
            Log.i(TAG, "subscribe call: "+s);
        }
    });

3.使用调度器

除了上面介绍的2个函数,你也可以用它们调度你自己的任务。下面的示例展示了Scheduler.Worker的用法:

//模拟在子线程执行任务
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
    @Override
    public void call() {
        //call:---RxNewThreadScheduler-1
        Log.i(TAG, "call:---"+Thread.currentThread().getName());
    }
});

检查或设置取消订阅状态

Worker类的对象实现了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,所以你可以在订阅取消时停止任务,或者从正在调度的任务内部取消订阅,示例:

final Scheduler.Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            Log.i(TAG, "do your work !");
            //执行完任务后取消订阅状态
            worker.unsubscribe();
        }
    }

});

下面的一系列函数作为RxJava的辅助函数,不一定常用,仅仅帮助我们增强代码的功能。

4.Delay

Delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量。

示例代码:

Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Android");
            //如果发送异常 则直接抛出异常 上面的发送无效
            //subscriber.onError(new NullPointerException("MOCK"));
            subscriber.onNext("Android2");
        }
    })
    //整体延迟2秒
    .delay(2000, TimeUnit.MILLISECONDS)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: " + s);
        }
    });

delaySubscription

还有一个操作符delaySubscription让你你可以延迟订阅原始Observable。它结合搜一个定义延时的参数。

Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Android");
        }
    })
    //延迟2秒订阅
    .delaySubscription(2,TimeUnit.SECONDS)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: " + s);
        }
    });

5.Do

此操作符可以认为是监听器的一种,它监听onNext()事件和subcribe()事件,会在此两个事件前被调用。此函数分别为:doOnEach(),doOnNext(),doOnSubscribe().

doOnEach

doOnEach操作符让你可以注册一个回调,它产生的Observable每发射一项数据就会调用它一次。

示例代码:

    Observable
        .just("Hello","Android")
        //每发送一次 就会现在Observer的onNext()中调用一次
        .doOnEach(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "onCompleted: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: ");

            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: "+s);
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, "call: " + s);
            }
        });

输出:

com.m520it.rxjava I/IT520: onNext: Hello
com.m520it.rxjava I/IT520: call: Hello
com.m520it.rxjava I/IT520: onNext: Android
com.m520it.rxjava I/IT520: call: Android
com.m520it.rxjava I/IT520: onCompleted:

类似的函数还有:

  • Javadoc: doOnEach(Action1))

  • Javadoc: doOnEach(Observer))

doOnNext

doOnNext操作符类似于doOnEach(Action1),但是它的Action不是接受一个Notification参数,而是接受发射的数据项。

示例代码

Observable.just(1, 2, 3)
    .doOnNext(new Action1<Integer>() {
        @Override
        public void call(Integer item) {
            Log.i(TAG, "doOnNext call: "+item);
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

输出

com.m520it.rxjava I/IT520: doOnNext call: 1
com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: doOnNext call: 2
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: doOnNext call: 3
com.m520it.rxjava I/IT520: call: 3

doOnSubscribe

doOnSubscribe操作符注册一个动作,当观察者订阅它生成的Observable它就会被调用。

Javadoc: doOnSubscribe(Action0))

Observable.just(1, 2, 3)
    // 订阅之前调用
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            Log.i(TAG, "doOnSubscribe call ");
        }
    })
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

输出

 I/IT520: doOnSubscribe call 
 I/IT520: call: 1
 I/IT520: call: 2
 I/IT520: call: 3

6.Serialize

强制一个Observable连续调用并保证行为正确

一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompleted或onError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。

RxJava提供具有此功能的函数为serialize()

7.Timeout

如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable。

RxJava中的实现为timeout函数,我们可以使用该函数作为网络请求的超时异常处理。

示例代码:

//每次发送之后 下一次发送不能超过2秒 如果超过则跳转到onError()
Observable.interval(2, TimeUnit.SECONDS)
        .timeout(2, TimeUnit.SECONDS)
        .subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.i(TAG, "call: "+aLong);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                //call throwable: null
                Log.i(TAG, "call throwable: " + throwable.getLocalizedMessage());
            }
        });

8.Using

Using操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。

using(Func0,Func1,Action1)操作符接受三个参数:

  1. 一个用于创建一次性资源的工厂函数

  2. 一个用于创建Observable的工厂函数

  3. 一个用于释放资源的函数

当一个观察者订阅using返回的Observable时,using将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),using使用第三个函数释放它创建的资源。

示例代码:

final Observable<Long> observable = Observable.using(new Func0<String>() {
    //创建一次性资源
    @Override
    public String call() {
        return "Hello Android !";
    }
}, new Func1<String, Observable<Long>>() {
    //创建被观察者
    @Override
    public Observable<Long> call(String s) {
        Log.i(TAG, "Func1 call: " + s);
        return Observable.interval(1, TimeUnit.SECONDS);
    }
}, new Action1<String>() {
    //用于销毁一次性资源
    @Override
    public void call(String s) {
        Log.i(TAG, "Action1 call: " + s);
    }
});

observable.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted: ");
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.i(TAG, "onNext: "+aLong);
        //取消订阅后 才能执行被观察者的销毁资源方法
        unsubscribe();
    }
});

在安卓APP中,我们经常需要通过访问网络获取数据,请求网络数据需要在子线程中操作,以下将这需求进行分解:

  1. 将网络请求放在被观察者中(子线程)。

  2. 网络请求结果处理放在观察者中(主线程)。

  3. 订阅(当网络请求完成后,方便被观察者通知观察者)

为了更好的实现上面的需求,我们需要知道如何在被观察者与观察者如何使用特定的线程来处理。下面的文章将介绍RxJava中线程相关的操作。

 以上就是深入浅出RxJava_07[多线程&辅助操作(完)]的内容,更多相关内容请关注PHP中文网(www.php.cn)!


調度器類型 效果
陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn