ホームページ >Java >&#&チュートリアル >RxJava_03 [オブザーバー作成操作]をわかりやすく詳しく紹介

RxJava_03 [オブザーバー作成操作]をわかりやすく詳しく紹介

黄舟
黄舟オリジナル
2017-03-04 09:44:261822ブラウズ

このチュートリアルは RxJava1 に基づいています。必要に応じて、対応する関数を作成できます。
Create - 最も独創的なObservable作成関数

    Defer - サブスクリプション作成後に作成されるObservable
  1. Empty/Never/Throw - データを送信しない/異常なデータを送信するObservableを作成します
  2. Just - 1〜9の値を発行するObservableを作成します
  3. From - キューを発行するObservableを作成します
  4. Interval&Timer - タイマーのようなObservableを作成します
  5. Range - 以下のObservableを作成します特定の整数の Observable タイプ
  6. Repeat - 繰り返しの数を設定する Observable を作成します
  7. 1.Create
  8. Create オペレーターを使用して、Observable を最初から作成し、このオペレーターにオブザーバーを渡します。パラメーター関数として、Observable のように動作するようにこの関数を作成し、オブザーバーの onNext、onError、および onCompleted メソッドを適切に呼び出します。

  9. 整形式の有限 Observable は、オブザーバーの onCompleted または onError の呼び出しを 1 回だけ試行する必要があり、それ以降はオブザーバーの他のメソッドを呼び出すことはできません。

オブザーバーが存在しないときに Observable がデータの送信や負荷の高い操作を停止できるように、create メソッドに渡される関数でオブザーバーの isUnsubscribed ステータスを確認することをお勧めします。

サンプルコード:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {
        try {
            if (!observer.isUnsubscribed()) {
                for (int i = 1; i < 5; i++) {
                    observer.onNext(i);
                }
                observer.onCompleted();
            }
        } catch (Exception e) {
            observer.onError(e);
        }
    }
 } ).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

出力:

Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

2. Defer

Observable はオブザーバーがサブスクライブするまで作成されず、オブザーバーごとに新しい Observable が作成されます。

Defer オペレーターは、オブザーバーがサブスクライブするまで待機し、その後、Observable ファクトリ メソッドを使用して Observable を生成します。これはすべての Observer に対して行われるため、各サブスクライバーは同じ Observable をサブスクライブしていると思っていても、実際には各サブスクライバーは独自の個別のデータ シーケンスを取得しています。

コードは次のとおりです:

Observable<String> defer = Observable.defer(new Func0<Observable<String>>() {
    //当observable被创建的时候顺便调用observable内部的call()方法并在方法中发送消息
    //每subscribe()就会在call()中返回一个新的实例对象
    @Override
    public Observable<String> call() {
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello Android !");
            }
        });
        return observable;
    }
});
defer.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
    }
});

defer.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
    }
});

3.Empty/Never/Throw

Empty

データを発行せず、正常に終了するObservableを作成します

Never

データを発行せず、実行するObservableを作成しますnot terminate

Throw

データを発行せず、エラーで終了する Observable を作成する

これら 3 つの演算子によって生成される Observable の動作は非常に特殊で制限されています。これはテストに便利で、他の Observable と組み合わせて使用​​したり、Observable を必要とする他の演算子のパラメーターとして使用したりすることもあります。

RxJava は、これらの演算子を empty、never、error として実装します。エラー演算子には Throwable パラメータが必要で、これにより Observable が終了します。これらの演算子はデフォルトでは特定のスケジューラで実行されませんが、empty と error には Scheduler というオプションのパラメータがあり、Scheduler パラメータを渡すと、このスケジューラで通知が送信されます。

4.Just

オブザーバーがデータをオブザーバーに送信したい場合、コードは次のとおりです:

Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hello Android");
        }
    })
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: "+s);
        }
    });

上記のコードがデータを送信したい場合は、まず Observable.OnSubscribe インターフェイスを実装する必要があります。代わりに just 関数を使用してください。サンプルコードは次のとおりです:

Observable
    .just("hello Android")
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: "+s);
        }
    });

出力:

onNext: hello Android
Sequence complete.

単一のデータを、そのデータを出力する Observable に変換するだけです。

同様の関数は次のとおりです:

just(T t1);

    just(T t1,T t2);
  • just(T t1,T t2,T t3);
  • just (T t1,T t2,T t3,T t4);
  • just(T t1,T t2,T t3,T t4,T t5);
  • RxJava がこの操作を実行します演算子は just 関数として実装されており、1 ~ 9 個のパラメータを受け取り、これらのデータをパラメータ リストの順序で出力する Observable を返します。
  • 5.From

  • 1 から 9 までのデータを送信する just 関数と比較して、From 関数はデータのキューを送信し、出力長は無制限です。

学校は興味のあるクラスを開き、学期の初めに数人のクラスメートがクラスに来ました。以下はクラスメート クラスの定義です:

public static class Student {

    public String name;
    public int age;

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "name=&#39;" + name + &#39;\&#39;&#39; +
                ", age=" + age +
                &#39;}&#39;;
    }
}

    ここでコレクションが作成され、各クラスメートの情報が保存されます。
  1. private ArrayList initPersons() {
        ArrayList<Student> persons = new ArrayList<>();
        persons.add(new Student("张三", 16));
        persons.add(new Student("李四", 17));
        persons.add(new Student("王二麻子", 18));
        return persons;
    }

  2. 次の先生のコメントは、全員に声をかけなさいというもので、各生徒のサインイン状況をプリントで説明します。
  3. ArrayList persons = initPersons();
    for (int i = 0; i < persons.size(); i++) {
        //打印每个同学
        Log.i(TAG,persons.get(i).toString());
    }

  4. RxJava は次のように変換します:
  5. //1.定义被观察者,用来发送一个队列的事件
    Observable<Student> observable = Observable.from(persons);
    
    //2.当开始订阅的时候 接收被观察者发送过来的一系列事件
    observable.subscribe(new Action1<Student>() {
        @Override
        public void call(Student student) {
            Log.i(TAG, "call: "+student.toString());
        }
    });

    6.Interval&Timer

  6. 一定の間隔で一連の整数を発行する Observable を作成します。

RxJava は、この演算子を間隔メソッドとして実装します。時間間隔を表すパラメータと時間単位を表すパラメータを受け取ります。

//3000毫米发送一个请求 该请求包含了一个自增长的整数型变量
Observable<Long> observable = Observable.interval(3000,         TimeUnit.MILLISECONDS, Schedulers.io());

observable.subscribe(new Action1<Long>() {
    @Override
    public void call(Long i) {
        // such as printf RxIoScheduler-2call: 685
        Log.i(TAG, Thread.currentThread().getName()+"call: "+i);
    }
});

出力:

com.m520it.rxjava I/IT520: RxIoScheduler-2call: 0
com.m520it.rxjava I/IT520: RxIoScheduler-2call: 1
com.m520it.rxjava I/IT520: RxIoScheduler-2call: 2
com.m520it.rxjava I/IT520: RxIoScheduler-2call: 3
...

上記のコードは、一定の時間間隔に従って子スレッドのデータを継続的に出力します。この操作はタイマー タスクに似ており、タイマー タイマーを置き換えるものと考えることもできます。

同様の関数には timer() 関数があります。

Observable<Long> timer = Observable.timer(3000, 2000, TimeUnit.MILLISECONDS);
    timer.subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            Log.i(TAG, "call: "+aLong);
        }
    });

7.Range

特定の整数シーケンスを出力する Observable を作成します。

Range 演算子は、範囲内の整数の順序付けされたシーケンスを出力します。範囲の開始と長さを指定できます。

比如下面这个班级的所有学生都在一个集合里,现在要打印出来,可以是这样的:

private ArrayList initPersons() {
    ArrayList<Student> persons = new ArrayList<>();
    persons.add(new Student("张三", 16));
    persons.add(new Student("李四", 17));
    persons.add(new Student("王二麻子", 18));
    return persons;
}


final ArrayList students = initPersons();
//这里发射一个起始值0,长度students.size()的索引 用来遍历队列
Observable
        .range(0,students.size())
        .subscribe(new Action1() {
            @Override
            public void call(Integer index) {
                Log.i(TAG, "call: "+students.get(index));
            }
        });

8.Repeat

创建一个发射特定数据重复多次的Observable

Repeat重复地发射数据。某些实现允许你重复的发射某个数据序列,还有一些允许你限制重复的次数。

//设置重复发送3次
Observable<String> observable = Observable.just("Hello Android").repeat(3);

Action1<String> action1 = new Action1<String>() {

    @Override
    public void call(String s) {
        Log.i(TAG, "call: " + s);
    }
};
observable.subscribe(action1);

 以上就是深入浅出RxJava_03[被观察者创建操作]的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!


声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。