


This tutorial is a comprehensive explanation based on the RxJava1.x version. Subsequent courses will be updated one after another, so stay tuned...
The following functions are all functions used to create the observed Observable. We can create corresponding functions as needed.
Create - The original Observable creation function
Defer - An Observable created after creating a subscription
-
Empty/Never/Throw - Create an Observable that sends no data/to send data with an exception
Just - Create an Observable that sends 1-9 values
From - Create an Observable that emits a queue
Interval&Timer - Create an Observable similar to a timer
Range - Create an Observable that emits a specific integer type
Repeat - Create an Observable that sets the number of repeated emits
1.Create
You can use the Create operator to create an Observable from scratch, pass this operator a function that accepts an observer as a parameter, and write this function to behave like an Observable - appropriately call the observer's onNext, onError and onCompleted method.
A properly formed finite Observable must attempt to call the observer's onCompleted exactly once or its onError exactly once, and may not call any other methods of the observer thereafter.
It is recommended that you check the isUnsubscribed status of the observer in the function passed to the create method, so that your Observable can stop emitting data or doing expensive operations when there are no observers.
Sample code:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Output:
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
2.Defer
The Observable is not created until an observer subscribes, and for each observer Create a new Observable.
The Defer operator waits until an observer subscribes to it, and then it uses the Observable factory method to generate an Observable. It does this for every Observer, so even though each subscriber thinks they are subscribing to the same Observable, in fact each subscriber is getting their own separate sequence of data.
The code is as follows:
Observable<String> defer = Observable.defer(new Func0<Observable<String>>() { //当observable被创建的时候顺便调用observable内部的call()方法并在方法中发送消息 //每subscribe()就会在call()中返回一个新的实例对象 @Override public Observable<String> call() { Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello Android !"); } }); return observable; } }); defer.subscribe(new Action1<String>() { @Override public void call(String s) { } }); defer.subscribe(new Action1<String>() { @Override public void call(String s) { } });
3.Empty/Never/Throw
Empty
Create an Observable that does not emit any data but terminates normally
Never
Create an Observable that does not emit data and terminates with an error
Throw
Create an Observable that does not emit data and terminates with an error
The behavior of the Observable generated by these three operators is very special and restricted. It is useful for testing, and sometimes used in combination with other Observables, or as parameters of other operators that require Observables.
RxJava implements these operators as empty, never and error. The error operator requires a Throwable parameter, with which your Observable will terminate. These operators do not execute on any specific scheduler by default, but empty and error have an optional parameter called Scheduler. If you pass the Scheduler parameter, they will send notifications on this scheduler.
4.Just
If the observer wants to emit data to the observer, the code is as follows:
Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello Android"); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: "+s); } });
If the above code wants to send data, it must first implement Observable .OnSubscribe interface, we can use just function instead. The sample code is as follows:
Observable .just("hello Android") .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: "+s); } });
Output:
onNext: hello Android Sequence complete.
Just convert a single data into an Observable that emits that data.
Similar functions are:
- ##just(T t1);
- just(T t1,T t2);
- just(T t1,T t2,T t3);
- just(T t1,T t2,T t3,T t4) ;
- just(T t1,T t2,T t3,T t4,T t5);
- …
- The school opened an interest class, and several classmates came to the class at the beginning of the term. The following is the definition of the classmate class:
public static class Student { public String name; public int age; public Student(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return "Student{" + "name='" + name + '\'' + ", age=" + age + '}'; } }
- A collection is created here to store the information of each classmate.
private ArrayList initPersons() { ArrayList<Student> persons = new ArrayList<>(); persons.add(new Student("张三", 16)); persons.add(new Student("李四", 17)); persons.add(new Student("王二麻子", 18)); return persons; }
- Next, the teacher comments. Everyone must call out. Here, the sign-in status of each student is explained by printing.
ArrayList persons = initPersons(); for (int i = 0; i < persons.size(); i++) { //打印每个同学 Log.i(TAG,persons.get(i).toString()); }
//1.定义被观察者,用来发送一个队列的事件
Observable<Student> observable = Observable.from(persons);
//2.当开始订阅的时候 接收被观察者发送过来的一系列事件
observable.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
Log.i(TAG, "call: "+student.toString());
}
});
6.Interval&TimerCreate an Observable that emits a sequence of integers at a fixed time interval. RxJava implements this operator as an interval method. It accepts a parameter representing the time interval and a parameter representing the time unit. //3000毫米发送一个请求 该请求包含了一个自增长的整数型变量 Observable<Long> observable = Observable.interval(3000, TimeUnit.MILLISECONDS, Schedulers.io()); observable.subscribe(new Action1<Long>() { @Override public void call(Long i) { // such as printf RxIoScheduler-2call: 685 Log.i(TAG, Thread.currentThread().getName()+"call: "+i); } });Output:
com.m520it.rxjava I/IT520: RxIoScheduler-2call: 0 com.m520it.rxjava I/IT520: RxIoScheduler-2call: 1 com.m520it.rxjava I/IT520: RxIoScheduler-2call: 2 com.m520it.rxjava I/IT520: RxIoScheduler-2call: 3 ...The above code continuously prints data in the child thread according to a certain time interval. This operation is similar to the Timer task, and you can also think of it as replacing the Timer timer. Similar functions include the timer() function.
Observable<Long> timer = Observable.timer(3000, 2000, TimeUnit.MILLISECONDS); timer.subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.i(TAG, "call: "+aLong); } });7.RangeCreate an Observable that emits a specific sequence of integers. The Range operator emits an ordered sequence of integers within a range. You can specify the start and length of the range.
比如下面这个班级的所有学生都在一个集合里,现在要打印出来,可以是这样的:
private ArrayList initPersons() { ArrayList<Student> persons = new ArrayList<>(); persons.add(new Student("张三", 16)); persons.add(new Student("李四", 17)); persons.add(new Student("王二麻子", 18)); return persons; } final ArrayListstudents = initPersons(); //这里发射一个起始值0,长度students.size()的索引 用来遍历队列 Observable .range(0,students.size()) .subscribe(new Action1 () { @Override public void call(Integer index) { Log.i(TAG, "call: "+students.get(index)); } });
8.Repeat
创建一个发射特定数据重复多次的Observable
Repeat重复地发射数据。某些实现允许你重复的发射某个数据序列,还有一些允许你限制重复的次数。
//设置重复发送3次 Observable<String> observable = Observable.just("Hello Android").repeat(3); Action1<String> action1 = new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: " + s); } }; observable.subscribe(action1);
以上就是深入浅出RxJava_03[被观察者创建操作]的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!

This article analyzes the top four JavaScript frameworks (React, Angular, Vue, Svelte) in 2025, comparing their performance, scalability, and future prospects. While all remain dominant due to strong communities and ecosystems, their relative popul

This article addresses the CVE-2022-1471 vulnerability in SnakeYAML, a critical flaw allowing remote code execution. It details how upgrading Spring Boot applications to SnakeYAML 1.33 or later mitigates this risk, emphasizing that dependency updat

Node.js 20 significantly enhances performance via V8 engine improvements, notably faster garbage collection and I/O. New features include better WebAssembly support and refined debugging tools, boosting developer productivity and application speed.

The article discusses implementing multi-level caching in Java using Caffeine and Guava Cache to enhance application performance. It covers setup, integration, and performance benefits, along with configuration and eviction policy management best pra

Java's classloading involves loading, linking, and initializing classes using a hierarchical system with Bootstrap, Extension, and Application classloaders. The parent delegation model ensures core classes are loaded first, affecting custom class loa

This article explores methods for sharing data between Cucumber steps, comparing scenario context, global variables, argument passing, and data structures. It emphasizes best practices for maintainability, including concise context use, descriptive

This article explores integrating functional programming into Java using lambda expressions, Streams API, method references, and Optional. It highlights benefits like improved code readability and maintainability through conciseness and immutability

Iceberg, an open table format for large analytical datasets, improves data lake performance and scalability. It addresses limitations of Parquet/ORC through internal metadata management, enabling efficient schema evolution, time travel, concurrent w


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

EditPlus Chinese cracked version
Small size, syntax highlighting, does not support code prompt function

ZendStudio 13.5.1 Mac
Powerful PHP integrated development environment

Safe Exam Browser
Safe Exam Browser is a secure browser environment for taking online exams securely. This software turns any computer into a secure workstation. It controls access to any utility and prevents students from using unauthorized resources.

Dreamweaver Mac version
Visual web development tools

VSCode Windows 64-bit Download
A free and powerful IDE editor launched by Microsoft
