首頁  >  文章  >  Java  >  深入淺出RxJava_03[被觀察者創建操作]的詳細介紹

深入淺出RxJava_03[被觀察者創建操作]的詳細介紹

黄舟
黄舟原創
2017-03-04 09:44:261756瀏覽

本教學基於RxJava1.x版本進行全面講解,後續課程將陸續更新,敬請關注…

# #以下的函數都是用來創建被觀察者Observable的函數。我們可以根據需要建立對應的函數。

  1. Create - 最原始的Observable建立函數

  2. Defer    - 建立訂閱後才建立的Observable

  3. #Empty/Never/Throw - 建立不傳送資料/以傳送一個例外的資料的bservable

  4. Just - 建立發送1-9個值的Observable

  5. From - 建立發送一個佇列的Observable

  6. Interval&Timer -建立一個類似定時器的Observable

  7. #Range -建立一個發送特定整數型的Observable

  8. Repeat - 建立一個設定重複發送次數的Observable

1.Create

你可以使用Create運算子從頭開始建立一個Observable,給這個運算子傳遞一個接受觀察者作為參數的函數,寫這個函數讓它的行為表現為一個Observable–恰當的呼叫觀察者的onNext,onError和onCompleted方法。

一個形式正確的有限Observable必須嘗試調用觀察者的onCompleted正好一次或它的onError正好一次,而且此後不能再調用觀察者的任何其它方法。

建議你在傳遞給create方法的函數中檢查觀察者的isUnsubscribed狀態,以便在沒有觀察者的時候,讓你的Observable停止發射資料或做昂貴的運算。

範例程式碼:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {
        try {
            if (!observer.isUnsubscribed()) {
                for (int i = 1; i < 5; i++) {
                    observer.onNext(i);
                }
                observer.onCompleted();
            }
        } catch (Exception e) {
            observer.onError(e);
        }
    }
 } ).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

輸出:

Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

2.Defer

直到有觀察者訂閱時才建立Observable,並且為每個觀察者創建一個新的Observable。

Defer運算元會一直等待直到有觀察者訂閱它,然後它使用Observable工廠方法產生一個Observable。它對每個觀察者都這樣做,因此儘管每個訂閱者都以為自己訂閱的是同一個Observable,事實上每個訂閱者獲取的是它們自己的單獨的資料序列。

程式碼如下:

Observable<String> defer = Observable.defer(new Func0<Observable<String>>() {
    //当observable被创建的时候顺便调用observable内部的call()方法并在方法中发送消息
    //每subscribe()就会在call()中返回一个新的实例对象
    @Override
    public Observable<String> call() {
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello Android !");
            }
        });
        return observable;
    }
});
defer.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
    }
});

defer.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
    }
});

3.Empty/Never/Throw

Empty

建立一個不發射任何資料但是正常終止的Observable

Never

建立一個不發射資料也不終止的Observable

Throw

建立一個不發射資料以錯誤終止的Observable

#這三個運算子產生的Observable行為非常特殊且受限。測試的時候很有用,有時也用於結合其它的Observables,或作為其它需要Observable的操作符的參數。

RxJava將這些運算元實作為 empty,never和error。 error運算子需要一個Throwable參數,你的Observable會以此終止。這些操作符預設不在任何特定的調度器上執行,但是empty和error有一個可選參數是Scheduler,如果你傳遞了Scheduler參數,它們會在這個調度器上發送通知。

4.Just

如果被觀察者想發射一個資料給觀察者,那麼程式碼是這樣:

Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hello Android");
        }
    })
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: "+s);
        }
    });

上面的程式碼如想傳送資料首先要實作Observable .OnSubscribe接口,我們可以使用just函數代替。範例程式碼如下:

Observable
    .just("hello Android")
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: "+s);
        }
    });

輸出:

onNext: hello Android
Sequence complete.

Just將單一資料轉換為發射該資料的Observable。

類似的函數有:

  • just(T t1);

  • just(T t1,T t2);

  • just(T t1,T t2,T t3);

  • just(T t1,T t2,T t3,T t4) ;

  • just(T t1,T t2,T t3,T t4,T t5);

RxJava將此運算子實作為just函數,它接受一到九個參數,傳回一個按參數列表順序發射這些資料的Observable。

5.From

比相比發送一個到9個數據的just函數,From函數發送一個佇列的數據,輸出的長度無限。

  1. 學校開了一個興趣班,開學班裡來了幾個同學。以下就是同學類別的定義:

    public static class Student {
    
        public String name;
        public int age;
    
        public Student(String name, int age) {
            this.name = name;
            this.age = age;
        }
    
        @Override
        public String toString() {
            return "Student{" +
                    "name=&#39;" + name + &#39;\&#39;&#39; +
                    ", age=" + age +
                    &#39;}&#39;;
        }
    }

  2. 這裡建立一個集合 用來儲存每個同學的資訊。

    private ArrayList initPersons() {
        ArrayList<Student> persons = new ArrayList<>();
        persons.add(new Student("张三", 16));
        persons.add(new Student("李四", 17));
        persons.add(new Student("王二麻子", 18));
        return persons;
    }

  3. 接下來老師點評 每個都要喊一聲到,這裡透過列印來說明每個同學的簽到情況。

    ArrayList persons = initPersons();
    for (int i = 0; i < persons.size(); i++) {
        //打印每个同学
        Log.i(TAG,persons.get(i).toString());
    }

RxJava這樣轉換:

//1.定义被观察者,用来发送一个队列的事件
Observable<Student> observable = Observable.from(persons);

//2.当开始订阅的时候 接收被观察者发送过来的一系列事件
observable.subscribe(new Action1<Student>() {
    @Override
    public void call(Student student) {
        Log.i(TAG, "call: "+student.toString());
    }
});

6.Interval&Timer

建立一個以固定時間間隔發射整數序列的Observable。

RxJava將這個運算子實作為interval方法。它接受一個表示時間間隔的參數和一個表示時間單位的參數。

//3000毫米发送一个请求 该请求包含了一个自增长的整数型变量
Observable<Long> observable = Observable.interval(3000,         TimeUnit.MILLISECONDS, Schedulers.io());

observable.subscribe(new Action1<Long>() {
    @Override
    public void call(Long i) {
        // such as printf RxIoScheduler-2call: 685
        Log.i(TAG, Thread.currentThread().getName()+"call: "+i);
    }
});

輸出:

com.m520it.rxjava I/IT520: RxIoScheduler-2call: 0
com.m520it.rxjava I/IT520: RxIoScheduler-2call: 1
com.m520it.rxjava I/IT520: RxIoScheduler-2call: 2
com.m520it.rxjava I/IT520: RxIoScheduler-2call: 3
...

上面的程式碼在子執行緒中根據某種時間間隔不斷列印資料。這個動作類似於Timer任務,你也可以認為它是用來取代Timer計時器的。

類似的函數還有timer()函數。

Observable<Long> timer = Observable.timer(3000, 2000, TimeUnit.MILLISECONDS);
    timer.subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            Log.i(TAG, "call: "+aLong);
        }
    });

7.Range

建立一個發射特定整數序列的Observable。

Range運算子發射一個範圍內的有序整數序列,你可以指定範圍的起始和長度。

比如下面这个班级的所有学生都在一个集合里,现在要打印出来,可以是这样的:

private ArrayList initPersons() {
    ArrayList<Student> persons = new ArrayList<>();
    persons.add(new Student("张三", 16));
    persons.add(new Student("李四", 17));
    persons.add(new Student("王二麻子", 18));
    return persons;
}


final ArrayList students = initPersons();
//这里发射一个起始值0,长度students.size()的索引 用来遍历队列
Observable
        .range(0,students.size())
        .subscribe(new Action1() {
            @Override
            public void call(Integer index) {
                Log.i(TAG, "call: "+students.get(index));
            }
        });

8.Repeat

创建一个发射特定数据重复多次的Observable

Repeat重复地发射数据。某些实现允许你重复的发射某个数据序列,还有一些允许你限制重复的次数。

//设置重复发送3次
Observable<String> observable = Observable.just("Hello Android").repeat(3);

Action1<String> action1 = new Action1<String>() {

    @Override
    public void call(String s) {
        Log.i(TAG, "call: " + s);
    }
};
observable.subscribe(action1);

 以上就是深入浅出RxJava_03[被观察者创建操作]的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!


陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn