Home >Java >javaTutorial >An in-depth explanation of the techniques of RxJava_04 [data transmission filtering operation]
This tutorial is a comprehensive explanation based on the RxJava1.x version. Subsequent courses will be updated one after another, so stay tuned...
When the observer sends data to the observer, the data may need to be further filtered during data transmission. The following tutorial covers functions for most filtering operations.
Distinct - Remove duplicate data sent
Filter - Filter specific data based on conditions
First - Get the first data in the sending queue
Last - Get the last data in the sending queue
ElementAt - According to the index Send specific data in the queue
Take - Get the first n items of data in the queue
TakeLast - Get the last n items of data in the queue
Sample - Send by sending time samples based on the data sent
IgnoreElements - Ignore the sent data queue
There is now a class, and the student information has been entered into the student system. It turned out that the same data was repeated. The student data queue is as follows:
private ArrayList<Student> initPersons() { ArrayList<Student> persons = new ArrayList<>(); persons.add(new Student("张三", 16)); persons.add(new Student("李四", 17)); persons.add(new Student("王二麻子", 18)); persons.add(new Student("张三", 22)); return persons; }
The current need is to intercept objects with duplicate names. And print it out:
Observable .from(initPersons()) //过滤重复名字的项 .distinct(new Func1<Student, String>() { @Override public String call(Student student) { return student.name; } }) .subscribe(new Action1<Student>() { @Override public void call(Student student) { Log.i(TAG, "call: "+student); } });
The output is as follows:
IT520: call: Student{name='张三', age=16} IT520: call: Student{name='李四', age=17} IT520: call: Student{name='王二麻子', age=18}
In addition to filtering out the repeated parts of the passed data, you can also clear specific ones according to the rules through filter The data.
The data sent by the following code is 12345. The filtering rule is that the data bar must be less than 4. The sample code is as follows:
Observable.just(1, 2, 3, 4, 5) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer item) { return( item < 4 ); } }) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Output
Next: 1 Next: 2 Next: 3 Sequence complete.
Calling this function allows the sent data queue to send only the first item.
Sample code
Observable.just(1, 2, 3) .first() .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Output
Next: 1 Sequence complete.
Similar functions include firstOrDefault(). This function is used to find if the queue is null during the queue sending process. If the first item is not reached, a default value will be sent.
Calling this function allows the sent data queue to send only the last item.
Sample code
Observable.just(1, 2, 3) .last() .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Output
Next: 3 Sequence complete.
Similar functions include lastOrDefault(T)
ElementAt operator obtains the data item at the specified index position of the data sequence emitted by the original Observable, and then emits it as its own unique data.
If you pass a negative number, or the number of data items in the original Observable is less than index+1, an IndexOutOfBoundsException will be thrown.
The sample code is as follows:
Observable.just(1, 2, 3, 4, 1, 4) .elementAt(3) .subscribe(new Action1<Integer>() { @Override public void call(Integer value) { Log.i(TAG, "call: " + value); } });
Output
com.m520it.rxjava I/IT520: call: 4
Using the Take operator allows you to modify the behavior of the Observable and only return the previous N items of data, and then transmit the completion notification, ignoring the remaining data.
Sample code
Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .take(4) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Output
Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
Use the TakeLast operator to modify the original Observable, you can only emit the Observable's emitted last N item data, ignoring the previous data.
Sample code
Observable.just(1, 2, 3, 4, 5, 6, 7, 8) .takeLast(4) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
Output
Next: 5 Next: 6 Next: 7 Next: 8 Sequence complete.
Sample and send the sent data at a certain frequency
Observable .interval(1000, TimeUnit.MILLISECONDS)//每秒发送1个数字 .sample(2000,TimeUnit.MILLISECONDS)//每2秒采样一次 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.i(TAG, "call: "+aLong); } });
Above The code prints: 0, 2, 4, 6, 8. . . .
If you don't care about the data emitted by an Observable, but want to be notified when it completes or terminates with an error, you can use the ignoreElements operator on the Observable, It ensures that the observer's onNext() method is never called.
Observable.just(1, 2, 3, 4, 1) .ignoreElements()//不发送任何信息 直接发送onCompleted() .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted: "); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i(TAG, "onNext: "+integer); } });
The above is an in-depth introduction to the techniques of RxJava_04 [data transmission filtering operation]. For more related content, please pay attention to the PHP Chinese website (www.php.cn)!