首頁 >Java >java教程 >【Android】RxJava之初始篇

【Android】RxJava之初始篇

高洛峰
高洛峰原創
2016-11-15 10:12:121220瀏覽

關於RxJava

RxJava是ReactiveX推出在Java VM環境下使用的非同步操作庫。除了在Java環境ReactiveX也為其他程式語言推出Rx函式庫,例如Py、Js、Go等。網路上有很多關於對RxJava的介紹和使用,在Android開發中也有很多專案使用RxJava。那為什麼還要使用RxJava呢,Android開發也有提供非同步操作的方法供開發者使用,我想應該是RxJava比起Handle、AsyncTask簡潔優雅。

1 RxJava採用鍊式調用,在程式邏輯上清晰簡潔

2 採用擴展式觀察者設計模式

關於觀察者模式以及其他RxJava的介紹這個就不做重複,下面內容主要圍繞RxJava和RxAndroid使用。對於RxJava官方文件已經有詳細介紹,本節是以學習討論為主,如有錯誤的地方希望大家可以指出。

被觀察者Observable

使用RxJava需要建立Observable,Observable用來發射資料。如下Observable的create方法需要傳入一個OnSubscribe,其繼承於Action1>,Action中的Subscriber就是訂閱者。

public static <T> Observable<T> create(OnSubscribe<T> f) { 
    return new Observable<T>(RxJavaHooks.onCreate(f)); 
}

另外create方法中需要實作介面call,回傳subscriber物件。 call方法實作在observable訂閱後要執行的事件流。 subscriber.onNext發射data,subscriber.onCompleted可以表示發射事件結束。接著呼叫observable的subscribe方法實作被訂閱後執行的事件流。

Observable<String> observable = Observable 
                .create(new Observable.OnSubscribe<String>() { 
 
            @Override 
            public void call(Subscriber<? super String> subscriber) { 
                subscriber.onNext("1"); 
                subscriber.onNext("2"); 
                subscriber.onNext("3"); 
                subscriber.onNext("4"); 
                subscriber.onNext("5"); 
            } 
}); 
Subscriber<String> subscriber = new  Subscriber<String>() { 
            @Override 
            public void onCompleted() { 
 
            } 
 
            @Override 
            public void onError(Throwable e) { 
 
            } 
 
            @Override 
            public void onNext(String s) { 
                System.out.print(s + &#39;\n&#39;); 
            } 
}; 
observable.subscribe(subscriber); 
//输出结果 print: 
//1 
//2 
//3 
//4 
//5

Observable除了使用create方法建立外還可以使用from或just快速設定發射的事件流,簡化了create的步驟。

【Android】RxJava之初始篇

Observable<String> o = Observable.from("a", "b", "c");

【Android】RxJava之初始篇

Observable<String> o = Observable.just("one object");

說好的非同步操作

RxJava的執行緒由Schedulers調度者控制,並透過它來控制具體操作在什麼執行緒中進行。

Schedulers.immediate() 在當前執行緒中執行

Schedulers.newThread() 為每個任務開闢執行緒執行

Schedulers.computation() 計算任務執行的執行緒

....

AndroidSchedulers.mainThread() Android 主執行緒運行

對於執行緒的控制主要由subscribeOn()和observeOn()兩個方法控制:

subscribeOn 控制Observable.OnSubscribe所處的執行緒,等同於Oservbable. create、just、from時所處的線程。

observeOn 控制Subscriber的線程,也可以說是控制事件被執行時所在的線程。

Observable 
        .just(1,2,3) 
        .subscribeOn(Schedulers.io()) 
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribe(new Subscriber<Integer>() { 
            @Override 
            public void onCompleted() { 
 
            } 
 
            @Override 
            public void onError(Throwable e) { 
 
            } 
 
            @Override 
            public void onNext(Integer integer) { 
                 System.out.print(integer + &#39;\n&#39;);                        
            } 
}); 
//输出结果 print: 
//1 
//2 
//3

寫下上面的RxJava鍊式調用的代碼,有沒有覺得比以前使用的異步調用清爽許多,對處女座還說這很治愈!

操作符Operators

ReactiveX提供超級多的操作符,提供超級多的操作符每個操作符都有不同的功能,但目的都是在Observable和Subscribe之間變換和修改發射出去的事件流。這節介紹幾個比較常見簡單的操作符,之後有機會再寫一節操作符篇詳細說說每個操作符的作用。附上官方操作符文件看看就知道有多少多了。

Map() 

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { 
        return create(new OnSubscribeMap<T, R>(this, func)); 
    }

【Android】RxJava之初始篇

先介紹一個操作符map,map實作Func1介面將T型別資料轉換為R型數據,回傳R型數據。例如傳入Integer類型的事件佇列,經過map加工後再以String類型傳回。

Observable 
                .just(1,2,3) 
                .subscribeOn(Schedulers.io()) 
                .observeOn(AndroidSchedulers.mainThread()) 
                .map(new Func1<Integer, String>() { 
                    @Override 
                    public String call(Integer integer) { 
                        return integer + ""; 
                    } 
                }) 
                .subscribe(new Subscriber<String>() { 
                    ...... 
                    @Override 
                    public void onNext(String str) { 
                        System.out.print(str + &#39;\n&#39;); 
                    } 
                }); 
//输出结果 print: 
//1 
//2 
//3

Filter() 

public final Observable<T> filter(Func1<? super T, Boolean> predicate) { 
       return create(new OnSubscribeFilter<T>(this, predicate)); 
   }

【Android】RxJava之初始篇

filter和map一樣實現Func1介面不過它變換之後的類型為boolean,對發射的事件流進行篩選,當變換後的boolean值為true,訂閱者才能收到透過篩選的事件,反之該事件不被消費。例如事件流篩選要求當int值可被2整除才能繼續傳遞,所以最後訂閱者可消費的事件為2,4,6,8,10。

Observable 
                .just(1,2,3,4,5,6,7,8,9,10) 
                .subscribeOn(Schedulers.io()) 
                .observeOn(AndroidSchedulers.mainThread()) 
                .filter(new Func1<Integer, Boolean>() { 
                    @Override 
                    public Boolean call(Integer integer) { 
                        return integer % 2 == 0; 
                    } 
                }) 
                .map(new Func1<Integer, String>() { 
                    @Override 
                    public String call(Integer integer) { 
                        return integer + ""; 
                    } 
                }) 
                .subscribe(new Subscriber<String>() { 
                   ...... 
                    @Override 
                    public void onNext(String str) { 
                        System.out.print(str + &#39;\n&#39;); 
                        Log.i("subscribe", str); 
                    } 
                }); 
//输出结果 print: 
//2 
//3 
//4 
//6 
//8 
//10

Skip()

public final Observable<T> skip(int count) { 
        return lift(new OperatorSkip<T>(count)); 
    }

【Android】RxJava之初始篇

skip操作符表示跳過前幾個事件從某一個事件開始發射事件,下標從0開始。

Observable 
                .just(1,2,3,4,5,6,7,8,9,10) 
                .subscribeOn(Schedulers.io()) 
                .observeOn(AndroidSchedulers.mainThread()) 
                .skip(3) 
                .map(new Func1<Integer, String>() { 
                    @Override 
                    public String call(Integer integer) { 
                        return integer + ""; 
                    } 
                }) 
                .subscribe(new Subscriber<String>() { 
                    ...... 
                    @Override 
                    public void onNext(String s) { 
                        System.out.print(s + &#39;\n&#39;); 
                        Log.i("subscribe", s); 
                    } 
                }); 
//输出结果 print: 
//4 
//5 
//6 
//7 
//8 
//9 
//10

Range()

public static Observable<Integer> range(int start, int count) { 
        if (count < 0) { 
            throw new IllegalArgumentException("Count can not be negative"); 
        } 
        if (count == 0) { 
            return Observable.empty(); 
        } 
        if (start > Integer.MAX_VALUE - count + 1) { 
            throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE"); 
        } 
        if(count == 1) { 
            return Observable.just(start); 
        } 
        return Observable.create(new OnSubscribeRange(start, start + (count - 1))); 
    }

【Android】RxJava之初始篇

range運算子可以理解為just,from傳遞一個連續的int型別待發射數組,n為起始int值,m為Count。例如n = 1,m = 5 int數組就是{1,2,3,4,5}

結尾

這節先學習到這,算是對RxJava的初步認識與學習。其實運用RxJava主要還是依賴對操作符的使用,前面所介紹的操作符屬於最簡單基礎的,還有很多特別有用的操作符沒有介紹。之後再繼續介紹一些操作符。 RxJava在Android開發中很受歡迎,就是因為它的強大,同時RxJava可以和Retrofit組合使用,更有效率處理網路請求回值。另外GitHub GoogleSample上的android-architecture也有使用RxJava框架的TODO項目,可以看看理解RxJava在專案中的實作應用。


陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn