首頁 >Java >java教程 >深入淺出RxJava_01[什麼是RxJava] 的詳細介紹

深入淺出RxJava_01[什麼是RxJava] 的詳細介紹

黄舟
黄舟原創
2017-03-04 09:38:141425瀏覽


本教學基於RxJava1.x版本進行全面講解,後續課程將陸續更新,敬請關注…

1.什麼是RxJava

  • Rx是Reactive Extensions的簡寫,翻譯為回應的擴充。也就是透過由一方發出訊息,另一方回應訊息並作出處理的核心框架代碼。

  • 該框架由微軟的架構師Erik Meijer領導的團隊開發,並在2012年11月開源。

  • Rx函式庫支援.NET、JavaScript和C++等,現在已經支援幾乎全部的流行程式語言了。

  • Rx的語言函式庫大多由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社群網站是 reactivex.io。

  • RxJava作為一個流行的框架,其原始碼依託在GitHub,除了支援RxJava,針對安卓系統也除了一個支援框架RxAndroid

2.RxJava簡化程式碼

一般我們在安卓專案中,如果想從後台取得資料並刷新介面,程式碼大概如下,下面我們來看一個例子:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

上面的程式碼經過多層嵌套後可讀性太差了!如果你用了RxJava 可以這樣寫:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

這樣寫的好處就是減少層次嵌套 提高了程式碼的可讀性,除了簡化程式碼,RxJava還可以為每個方法提供特定的執行執行緒。

3.引入框架

目前RxJava已經升級為2.0版本,但為了能夠更好的理解RxJava,我們可以從1.0版本開始學習。也為了讓我們的安卓專案能夠更好的使用RxJava,可以在專案中引入gradle腳本依賴:

compile &#39;io.reactivex:rxandroid:1.2.1&#39;
compile &#39;io.reactivex:rxjava:1.1.6&#39;

現在 我們的專案已經支援RxJava的功能了。

4.響應式的核心

所謂的響應式,無非就是存在這樣的2個部分,一部分負責發送事件/訊息,另一部分負責回應事件/訊息。

以前如果我們想看新聞,一般需要透過看報紙。例如,你對某個報紙雜誌比較有興趣,那你首先要做3件事:

  1. 提供你家的地址

  2. 找到對應的報社

  3. 去報社訂閱整個月的報紙

經過了上面的流程,以後每天只要有新的報紙資料出來了,報社都會將雜誌送到你家。

深入淺出RxJava_01[什麼是RxJava] 的詳細介紹

將上面的範例進行程式碼抽象,步驟如下:

  1. 提供觀察者(因為你是關心雜誌內容的人所以你是觀察該事件的人)

  2. 提供被觀察者(只要有新的雜誌出來就需要通知關心的人所以報社是被觀察的對象)

  3. 訂閱(也就是觀察者&被觀察者之間要相互關聯以便被觀察的對像一變化就會馬上通知觀察該事件的對象)

深入淺出RxJava_01[什麼是RxJava] 的詳細介紹

上面範例的示範程式碼如下:

//1.创建被观察者
Observable<String> observable = 
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //4.开始发送事件 
                //事件有3个类型 分别是onNext() onCompleted() onError()
                //onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。
                subscriber.onNext("Hello Android !");
                subscriber.onNext("Hello Java !");
                subscriber.onNext("Hello C !");
                subscriber.onCompleted();
            }
        });

//2.创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//3.订阅
observable.subscribe(subscriber);

輸出如下:

com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted

程式碼運行的原則

  • 上面的程式碼中,當觀察者subscriber訂閱了被觀察者observable之後,系統會自動回呼observable物件內部的call()。

  • 在observable的call()方法實體中,發送瞭如onNext/onCompleted/onError事件後。

  • 接著subscriber就能回呼到對應的方法。

5.被觀察者變種

普通的Observable發送需要三個方法onNext, onError, onCompleted,而Single作為Observable的變種,只需要兩個方法:

  • onSuccess - Single發射單一的值到這個方法

  • onError - 如果無法發射所需的值,Single發射一個Throwable物件到這個方法

Single只會呼叫這兩個方法中的一個,而且只會呼叫一次,呼叫了任何一個方法之後,訂閱關係終止。

final Single<String> single = Single.create(new Single.OnSubscribe<String>() {
            @Override
            public void call(SingleSubscriber<? super String> singleSubscriber) {
                //先调用onNext() 最后调用onCompleted() 
                //singleSubscriber.onSuccess("Hello Android !");
                //只调用onError();
                singleSubscriber.onError(new NullPointerException("mock Exception !"));
            }
        });

Observer<String> observer = new Observer<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};
single.subscribe(observer);

6.觀察者變種

Observer觀察者對象,上面我們用Subscriber物件取代。因為物件本身就是繼承了Observer。

該物件實作了onNext()&onCompleted()&onError()事件,我們如果對哪個事件比較關心,只需要實現對應的方法即可,程式碼如下:

//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//订阅
observable.subscribe(subscriber);

上面的程式碼中,如果你只關心onNext()事件,但卻不得不實作onCompleted()&onError()事件.這樣的程式碼就顯得很臃腫。鑑於這種需求,RxJava框架在訂閱方面做了特定的調整,程式碼如下:

//为指定的onNext事件创建独立的接口
Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i(TAG, "call: "+s);
    }
};

//订阅
observable.subscribe(onNextAction);

不知道大家注意到沒有,subscribe()訂閱的不再是觀察者,而是特定的onNext接口對象。類似的函數如下,我們可以根據需要實作對應的訂閱:

  • public Subscription subscribe(final Observer observer)

  • public Subscription subscribe(final Action1 onNext)

  • public Subscription subscribe(final Action1 onNext, Action1 onError)

  • public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

这里还有一个forEach函数有类似的功能:

  • public void forEach(final Action1 onNext)

  • public void forEach(final Action1 onNext, Action1 onError)

  • public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

7.Subject变种

上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。

针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。

AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。

以下贴出代码:

//创建被观察者
final AsyncSubject<String> subject = AsyncSubject.create();
//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "s:" + s);

    }
};
//订阅事件
subject.subscribe(subscriber);
//被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();

输出:

s:Hello Java 
onCompleted

然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:

subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
//因为发送了异常 所以onNext()无法被打印
subject.onError(null);

BehaviorSubject

当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:

BehaviorSubject subject=BehaviorSubject.create("NROMAL");

Subscriber subscriber = new Subscriber() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext: " + o);
    }
};

//subject.onNext("Hello Android !");
//subject.onNext("Hello Java !");
//subject.onNext("Hello C !");
//这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效
//如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效
subject.subscribe(subscriber);

subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");

PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。

代码如下:

PublishSubject subject= PublishSubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction1 call: "+s);
    }
};

Action1<String> onNextAction2 = new Action1<String>(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction2 call: "+s);
    }
};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !

ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

代码如下:

ReplaySubject subject= ReplaySubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction1 call: "+s);
    }
};

Action1<String> onNextAction2 = new Action1<String>(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction2 call: "+s);
    }
};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !

Subject总结

  • AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()

  • BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()

  • PublishSubject只会打印订阅后的任何事件。

  • ReplaySubject无论订阅在何时都会调用发送的事件。

 以上就是深入浅出RxJava_01[什么是RxJava] 的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!


陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn