本教學基於RxJava1.x版本進行全面講解,後續課程將陸續更新,敬請關注…
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });輸出:
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.2.Defer直到有觀察者訂閱時才建立Observable,並且為每個觀察者創建一個新的Observable。 Defer運算元會一直等待直到有觀察者訂閱它,然後它使用Observable工廠方法產生一個Observable。它對每個觀察者都這樣做,因此儘管每個訂閱者都以為自己訂閱的是同一個Observable,事實上每個訂閱者獲取的是它們自己的單獨的資料序列。 程式碼如下:
Observable<String> defer = Observable.defer(new Func0<Observable<String>>() { //当observable被创建的时候顺便调用observable内部的call()方法并在方法中发送消息 //每subscribe()就会在call()中返回一个新的实例对象 @Override public Observable<String> call() { Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello Android !"); } }); return observable; } }); defer.subscribe(new Action1<String>() { @Override public void call(String s) { } }); defer.subscribe(new Action1<String>() { @Override public void call(String s) { } });3.Empty/Never/ThrowEmpty建立一個不發射任何資料但是正常終止的Observable
Never建立一個不發射資料也不終止的ObservableThrow建立一個不發射資料以錯誤終止的Observable#這三個運算子產生的Observable行為非常特殊且受限。測試的時候很有用,有時也用於結合其它的Observables,或作為其它需要Observable的操作符的參數。 RxJava將這些運算元實作為 empty,never和error。 error運算子需要一個Throwable參數,你的Observable會以此終止。這些操作符預設不在任何特定的調度器上執行,但是empty和error有一個可選參數是Scheduler,如果你傳遞了Scheduler參數,它們會在這個調度器上發送通知。 4.Just如果被觀察者想發射一個資料給觀察者,那麼程式碼是這樣:
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello Android"); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: "+s); } });上面的程式碼如想傳送資料首先要實作Observable .OnSubscribe接口,我們可以使用just函數代替。範例程式碼如下:
Observable .just("hello Android") .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: "+s); } });輸出:
onNext: hello Android Sequence complete.Just將單一資料轉換為發射該資料的Observable。 類似的函數有:
public static class Student { public String name; public int age; public Student(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return "Student{" + "name='" + name + '\'' + ", age=" + age + '}'; } }
private ArrayList initPersons() { ArrayList<Student> persons = new ArrayList<>(); persons.add(new Student("张三", 16)); persons.add(new Student("李四", 17)); persons.add(new Student("王二麻子", 18)); return persons; }
ArrayList persons = initPersons(); for (int i = 0; i < persons.size(); i++) { //打印每个同学 Log.i(TAG,persons.get(i).toString()); }
//1.定义被观察者,用来发送一个队列的事件 Observable<Student> observable = Observable.from(persons); //2.当开始订阅的时候 接收被观察者发送过来的一系列事件 observable.subscribe(new Action1<Student>() { @Override public void call(Student student) { Log.i(TAG, "call: "+student.toString()); } });
//3000毫米发送一个请求 该请求包含了一个自增长的整数型变量 Observable<Long> observable = Observable.interval(3000, TimeUnit.MILLISECONDS, Schedulers.io()); observable.subscribe(new Action1<Long>() { @Override public void call(Long i) { // such as printf RxIoScheduler-2call: 685 Log.i(TAG, Thread.currentThread().getName()+"call: "+i); } });輸出:
com.m520it.rxjava I/IT520: RxIoScheduler-2call: 0 com.m520it.rxjava I/IT520: RxIoScheduler-2call: 1 com.m520it.rxjava I/IT520: RxIoScheduler-2call: 2 com.m520it.rxjava I/IT520: RxIoScheduler-2call: 3 ...上面的程式碼在子執行緒中根據某種時間間隔不斷列印資料。這個動作類似於Timer任務,你也可以認為它是用來取代Timer計時器的。 類似的函數還有timer()函數。
Observable<Long> timer = Observable.timer(3000, 2000, TimeUnit.MILLISECONDS); timer.subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.i(TAG, "call: "+aLong); } });7.Range建立一個發射特定整數序列的Observable。 Range運算子發射一個範圍內的有序整數序列,你可以指定範圍的起始和長度。
比如下面这个班级的所有学生都在一个集合里,现在要打印出来,可以是这样的:
private ArrayList initPersons() { ArrayList<Student> persons = new ArrayList<>(); persons.add(new Student("张三", 16)); persons.add(new Student("李四", 17)); persons.add(new Student("王二麻子", 18)); return persons; } final ArrayListstudents = initPersons(); //这里发射一个起始值0,长度students.size()的索引 用来遍历队列 Observable .range(0,students.size()) .subscribe(new Action1 () { @Override public void call(Integer index) { Log.i(TAG, "call: "+students.get(index)); } });
创建一个发射特定数据重复多次的Observable
Repeat重复地发射数据。某些实现允许你重复的发射某个数据序列,还有一些允许你限制重复的次数。
//设置重复发送3次 Observable<String> observable = Observable.just("Hello Android").repeat(3); Action1<String> action1 = new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: " + s); } }; observable.subscribe(action1);
以上就是深入浅出RxJava_03[被观察者创建操作]的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!