Home  >  Article  >  Java  >  An in-depth explanation of the details of RxJava_06 [Conditions & Combination Operations of the Transmission Process]

An in-depth explanation of the details of RxJava_06 [Conditions & Combination Operations of the Transmission Process]

黄舟
黄舟Original
2017-03-04 09:58:121446browse


This tutorial is based on the RxJava1.x version for a comprehensive explanation. Subsequent courses will be updated one after another, so stay tuned...

The following series of functions are used to determine whether the transmitted data meets certain conditions.

  1. all - Determine whether all the data emitted by the Observable meet a certain condition. If any data in the original Observable does not meet the condition, False is returned

  2. Amb - Given 2 Observables, only the Observable that sends data first will be sent, and the latter will be ignored.

  3. Contains - Determines whether an Observable emits a specific value

  4. DefaultIfEmpty - Emits the value from the original Observable if the original Observable did not emit anything value, emit a default value

  5. SequenceEqual - Determine whether two Observables emit the same data sequence

  6. SkipUntil - SkipUntil subscribes to the original Observable , but ignores its emitters, until the moment the second Observable emits an item, it starts emitting the original Observable.

  7. SkipWhile - SkipWhile subscribes to the original Observable, but ignores it emitter, until the moment a certain condition you specify becomes false, it starts emitting the original Observable.

  8. TakeUntil - Uses a predicate function instead of a second Observable to determine whether the emitted data needs to be terminated. Its behavior is similar to takeWhile.

  9. TakeWhile - TakeWhile emits the original Observable until a certain condition you specify is not true, at which point it stops emitting the original Observable and terminates its own Observable.

1.all

Determine whether all the data emitted by the Observable meet a certain condition. If any data in the original Observable does not meet the condition, False## will be returned.

# If there is group student information, the data is as follows:

private ArrayList<Student> initStudents() {
    ArrayList<Student> persons = new ArrayList<>();
    persons.add(new Student("张三", 16));
    persons.add(new Student("李四", 21));
    persons.add(new Student("王二麻子", 18));
    return persons;
}

When enrolling, you are required to be no more than 20 years old before you can study. In this way, you can use the all function to judge.

Observable<Student> observable = Observable.from(initStudents());
Observable<Boolean> booleanObservable = 
                observable.all(new Func1<Student, Boolean>() {
                    //设置判断规则
                    @Override
                    public Boolean call(Student student) {
                        return student.age<20;
                    }
                });
booleanObservable.subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        //这里打印了false  因为学生里面李四21岁
        Log.i(TAG, "call: "+aBoolean);
    }
});

2.Amb

Given two or more Observables, it only emits all the data of the Observable that first emits data or notifications. Amb will ignore and discard the emission of all other Observables. things.

Observable<String> o1 = Observable.just("a", "b", "c");
Observable<String> o2 = Observable.just("d", "f", "e");

//判断上面的2个被观察者对象哪个先发送数据 发送数据慢的被观察者将被忽略
Observable<String> observable = Observable.amb(o2,o1);

observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i(TAG, "call: "+s);
    }
});

3.Contains

Pass a specified value to Contains. If the original Observable emits that value, the Observable it returns will emit true, otherwise it will emit false.

Observable<String> o1 = Observable.just("a", "b", "c");
Observable<Boolean> observable = o1.contains("a");

observable.subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        //true
        Log.i(TAG, "call: "+aBoolean);
    }
}

4.DefaultIfEmpty

DefaultIfEmpty simply emits the value of the original Observable accurately. If the original Observable does not emit any data and terminates normally (in the form of onCompletedd), the Observable returned by DefaultIfEmpty emits an The default value you provide.

Observable<String> o1 = Observable
        .create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //这里没有发送任何onNext事件,而是调用onCompleted()事件
                subscriber.onCompleted();
            }
        });
    //如果上面的o1不发送任何onNext而结束 则发送一个默认的值d
    Observable<String> o2 = o1.defaultIfEmpty("d");
    o2.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            //打印d
            Log.i(TAG, "call: "+s);
        }
    });

For example, when the observer accesses the network request, the background server does not return the return data, and a default data can be used instead.

5.SequenceEqual

Determine whether two Observables emit the same data sequence. It will compare the emissions of the two Observables. If the two sequences are the same (the same data , the same order, the same termination status), it emits true, otherwise it emits false.

Observable<Integer> o1 = Observable.just(1,2,3);
Observable<Integer> o2 = Observable.just(1,2,3);
Observable<Boolean> observable = Observable.sequenceEqual(o1, o2);

observable.subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        Log.i(TAG, "call: "+aBoolean);
    }
});

6.1 SkipUntil

SkipUntil subscribes to the original Observable, but ignores its emissions until the second Observable emits an item, when it starts emitting the original Observable.

Observable<Integer> o1 = Observable.just(1,2,3);
Observable<Integer> o2 = o1.skipUntil(Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        //如果不发送如下信息  则上面的1,2,3都不会发送
        //如果发送了4 则直接发送1,2,3 但是4 5并没有发送
        subscriber.onNext(4);
        subscriber.onNext(5);
    }
}));

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});

Output:

call: 1
call: 2
call: 3

6.2 SkipUntil

SkipWhile subscribes to the original Observable, but ignores its emissions until the moment a condition you specify becomes false , which starts emitting the original Observable.

Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
Observable<Integer> o2 = o1.skipWhile(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer value) {
        //直到发送的数据"等于"4 才开始从4发送数据 这里打印4,5,6
        return value!=4;
    }
});

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});

Output:

call: 4
call: 5
call: 6

7.1 TakeUntil

Use a function inside takeUntil to decide whether to terminate subsequent data and continue sending. The code is as follows:

Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
Observable<Integer> o2 = o1.takeUntil(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer value) {
        //发送数据直到等于4才停止发送
        return value==4;
    }
});

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});

The above code is similar to:

ArrayList<Integer> datas=new ArrayList<>();
for (int i = 1; i <= 6 ; i++) {
    datas.add(i);
}
int index=0;
while(datas.get(index)<=4){
    Log.i(TAG, "onCreate: "+datas.get(index));
    index++;
}

7.2 TakeWhile

TakeWhile emits the original Observable until the moment when a certain condition you specify does not hold, it stops Emits the original Observable and terminates its own Observable.

Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
Observable<Integer> o2 = o1.takeWhile(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
        //发送数据 直到发送的数据等于才停止发送
        return integer!=5;
    }
});

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});

The above code prints out 1~4, similar to the following while statement:

ArrayList<Integer> datas=new ArrayList<>();
for (int i = 1; i <= 6 ; i++) {
    datas.add(i);
}
int index=0;
while(datas.get(index)!=5){
    Log.i(TAG, "onCreate: "+datas.get(index));
    index++;
}


The function shown below can be used to combine multiple Observables.

  1. Merge - Interleaved merging of emissions from multiple Observables

  2. StartWith - Inserts the specified item at the beginning of the data sequence

  3. Zip - Merge the emission data of two observables according to the rules provided by the function. The total number of emission data shall be based on the minimum number of observable emission.

8.Merge

Merge the emissions of multiple Observables. Using the Merge operator you can merge the output of multiple Observables as if they were a single Observable.

Merge may cause the data emitted by merged Observables to be interleaved (there is a similar operator Concat that will not interleave the data, it will emit the emissions of multiple Observables one after another in order).

Sample code

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

Output

Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

9.StartWith

Insert the specified item at the beginning of the data sequence

If you want To have an Observable emit a specified data sequence before emitting data, you can use the StartWith operator. (If you want to append a data sequence to the end of the data emitted by an Observable, you can use the Concat operator.)

Observable<String> o1 = Observable.just("模拟网络请求");
Observable<String> o2 = o1.startWith("网络请求之前的准备工作");
o2.subscribe(new Action1<String>() {
    @Override
    public void call(String value) {
        Log.i(TAG, value);
    }
});

输出:

网络请求之前的准备工作
模拟网络请求

类似的函数有:

  • Javadoc: startWith(Iterable))

  • Javadoc: startWith(T)) (最多接受九个参数)

  • Javadoc: startWith(Observable))

10.Zip

假设被观察者o1发送的数据为1 2 ,被观察者o2发送的数据为3,4,5。那么zip压缩发送的数据个数以最低个也就是2个为准。并且 新发送的每个数据以Func2的返回数据为准。

  • Javadoc: zip(Observable1,Observable2,Func2));

示例代码如下:

Observable<Integer> o1 = Observable.just(1,2);
Observable<Integer> o2 = Observable.just(3,4,5);

Observable<Integer> zipObservable = Observable
        .zip(o1, o2, new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer o, Integer o2) {
                return o + o2;
            }
        });

zipObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer s) {
        Log.i(TAG, "call: "+s);
    }
});

输出如下:

call: 4 // 1+3
call: 6   // 2+4


 以上就是深入浅出RxJava_06[传输过程的条件&组合操作]的详情的内容,更多相关内容请关注PHP中文网(www.php.cn)!


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn