ホームページ >バックエンド開発 >PHPチュートリアル >RxJava オペレーター (3) Filtering_PHP チュートリアル
throttleWithTimeout
と debounce の 2 つの演算子に分割されます。まず、throttleWithTimeOut を見てみましょう。以下の図に示すように、このオペレーターは、ソース Observable がデータを送信するたびに、設定された時間より前に新しいデータがある場合にその時間を制限します。終了すると、このデータは破棄され、計時が再開されます。毎回タイマーが期限切れになる前にデータが出力される場合、この現在の制限は極端になり、最後のデータのみが出力されます。 <br><img src="http://www.bkjia.com/uploads/allimg/151205/1151453N3-0.png" style="max-width:90%" style="max-width:90%" alt=""><br><br> まず、Observable を作成し、100 ミリ秒ごとにデータを送信します。送信されるデータが 3 の倍数の場合、次のデータは 300 ミリ秒遅れて送信されます。 <br><br><p></p><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> createObserver() {<br /></li><li>return Observable.create(new Observable.OnSubscribe<Integer>() {<br /></li><li>@Override<br /></li><li>public void call(Subscriber<? super Integer> subscriber) {<br /></li><li>for (int i = 0; i < 10; i++) {<br /></li><li>if (!subscriber.isUnsubscribed()) {<br /></li><li>subscriber.onNext(i);<br /></li><li>}<br /></li><li>int sleep = 100;<br /></li><li>if (i % 3 == 0) {<br /></li><li>sleep = 300;<br /></li><li>}<br /></li><li>try {<br /></li><li>Thread.sleep(sleep);<br /></li><li>} catch (InterruptedException e) {<br /></li><li>e.printStackTrace();<br /></li><li>}<br /></li><li><br /></li><li>}<br /></li><li>subscriber.onCompleted();<br /></li><li>}<br /></li><li>}).subscribeOn(Schedulers.computation());<br /></li><li>}</li></ol></pre> 次に、throttleWithTimeOut を使用して、この Observable をフィルターします。設定したフィルター時間は 200 ミリ秒です。これは、発行間隔が 200 ミリ秒未満のデータがフィルターで除外されることを意味します。 <br><br><p></p><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> throttleWithTimeoutObserver() {<br /></li><li>return createObserver().throttleWithTimeout(200, TimeUnit.MILLISECONDS)<br /></li><li>.observeOn(AndroidSchedulers.mainThread());<br /></li><li><br /></li><li>}</li></ol></pre> 購読してください <br><br><p></p><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("throttleWithTimeout");<br /></li><li>mLButton.setOnClickListener(e -> throttleWithTimeoutObserver().subscribe(i -> log("throttleWithTimeout:" + i)));</li></ol></pre> 実行結果は次のとおりです。3 の倍数ではないデータは、発行後 200 ミリ秒以内に新しいデータを発行するため、フィルターで除外されることがわかります。 <br><img src="http://www.bkjia.com/uploads/allimg/151205/1151453122-1.png" style="max-width:90%" style="max-width:90%" alt=""><br> デバウンス オペレーターは時間をフィルターに使用することもできます。この場合、それは throttleWithTimeOut と同じように使用されますが、デバウンス オペレーターは関数に基づいてフローを制限することもできます。この関数の戻り値は一時的な Observable です。ソース Observable が新しいデータを発行し、関数によって生成された一時的な Observable に従って前のデータが終了していない場合、前のデータはフィルターで除外されます。 <br><img src="http://www.bkjia.com/uploads/allimg/151205/115145L93-2.png" style="max-width:90%" style="max-width:90%" alt=""><br> Observable を生成し、デバウンスを使用してフィルタリングします。発行されたデータが偶数の場合にのみ、onCompleted メソッドが呼び出され、この一時的な Observable が終了したことを示します。 <br><br><p></p><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> debounceObserver() {<br /></li><li>return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).debounce(integer -> {<br /></li><li>log(integer);<br /></li><li>return Observable.create(new Observable.OnSubscribe<Integer>() {<br /></li><li>@Override<br /></li><li>public void call(Subscriber<? super Integer> subscriber) {<br /></li><li>if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {<br /></li><li>log("complete:" + integer);<br /></li><li>subscriber.onNext(integer);<br /></li><li>subscriber.onCompleted();<br /></li><li>}<br /></li><li>}<br /></li><li>});<br /></li><li>})<br /></li><li>.observeOn(AndroidSchedulers.mainThread());<br /></li><li>}</li></ol></pre> 購読してください<br><br><p></p><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mRButton.setOnClickListener(e -> debounceObserver().subscribe(i -> log("debounce:" + i)));</li></ol></pre> 実行結果は次のようになります。onCompleted メソッドを呼び出したデータのみが出力され、その他はフィルターで除外されていることがわかります。 <br><img src="http://www.bkjia.com/uploads/allimg/151205/1151451P1-3.png" style="max-width:90%" style="max-width:90%" alt=""><br><br> 2. Distinct<br> Distinct 演算子の目的は重複を削除することであり、これは非常に理解しやすいです。次の図に示すように、重複するデータはすべて除外されます。また、連続する重複データをフィルターで除外するために使用される演算子 uniqueUntilChanged もあります。 <br><img src="http://www.bkjia.com/uploads/allimg/151205/1151452R6-4.png" style="max-width:90%" alt=""><img src="http://www.bkjia.com/uploads/allimg/151205/1151455E3-5.png" style="max-width:90%" alt=""><br> 2 つの Observable を作成し、Distinct 演算子と DistinctUtilChanged 演算子を使用してそれぞれをフィルター処理します <br><br><p></p><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> distinctObserver() {<br /></li><li>return Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1).distinct();<br /></li><li><br /></li><li>}<br /></li><li><br /></li><li>private Observable<Integer> distinctUntilChangedObserver() {<br /></li><li>return Observable.just(1, 2, 3, 3, 3, 1, 2, 3, 3).distinctUntilChanged();<br /></li><li><br /></li><li>}</li></ol></pre> をサブスクライブします <br><br><p></p><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("distinct");<br /></li><li>mLButton.setOnClickListener(e -> distinctObserver().subscribe(i -> log("distinct:" + i)));<br /></li><li>mRButton.setText("UntilChanged");<br /></li><li>mRButton.setOnClickListener(e -> distinctUntilChangedObserver().subscribe(i -> log("UntilChanged:" + i)));</li></ol></pre> 実行結果は次のとおりで、Distinct が重複する数値 , 2 をすべて除外していることがわかります。 DistinctUtilChanged は重複する数値のみをフィルターします <br><img src="http://www.bkjia.com/uploads/allimg/151205/11514535Z-6.png" style="max-width:90%" style="max-width:90%" alt=""><br><br> 3. ElementAt と Filter<br> これら 2 つの演算子は理解しやすいですが、ElementAt は指定された位置のデータのみを返し、Filter はフィルター条件を満たすデータのみを返します。フォローします<br><img src="http://www.bkjia.com/uploads/allimg/151205/115145E34-7.png" style="max-width:90%" alt=""><img src="http://www.bkjia.com/uploads/allimg/151205/1151453D1-8.png" style="max-width:90%" alt=""><br> 2 つの Observable オブジェクトを作成し、それぞれ ElementAt 演算子と Filter 演算子を使用してフィルタリングします<br><br><p></p><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> elementAtObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).elementAt(2);<br /></li><li>}<br /></li><li><br /></li><li>private Observable<Integer> FilterObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).filter(i -> i < 3);<br /></li><li>}</li></ol></pre> それぞれをサブスクライブします<br><br><p></p><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("elementAt");<br /></li><li>mLButton.setOnClickListener(e -> elementAtObserver().subscribe(i -> log("elementAt:" + i)));<br /></li><li>mRButton.setText("Filter");<br /></li><li>mRButton.setOnClickListener(e -> FilterObserver().subscribe(i -> log("Filter:" + i)));</li></ol></pre> 运行结果如下<br><img src="http://www.bkjia.com/uploads/allimg/151205/1151451453-9.png" style="max-width:90%" style="max-width:90%" alt=""><br><br> 四、First、Last<br> First操作符只会返回第一条数据,并且还可以返回满足条件的第一条数据。如果你看过我以前的博客,就会发现在我们使用Rxjava实现三级缓存的例子里,就是使用first操作符来选择所要使用的缓存。与First相反,Last操作符只返回最后一条满足条件的数据。<br><img src="http://www.bkjia.com/uploads/allimg/151205/115145DO-10.png" style="max-width:90%" alt=""><img src="http://www.bkjia.com/uploads/allimg/151205/115145L02-11.png" style="max-width:90%" alt=""><br> 另外还有一个BlockingObservable方法,这个方法不会对Observable做任何处理,只会阻塞住,当满足条件的数据发射出来的时候才会返回一个BlockingObservable对象。可以使用<code style="box-sizing:border-box;padding:2px 4px;border-radius:4px;white-space:normal;">Observable.toBlocking
或者BlockingObservable.from
方法来将一个Observable对象转化为BlockingObservable对象。BlockingObservable可以和first操作符进行配合使用。<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> FirstObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).first(i -> i > 1);<br /></li><li>}<br /></li><li><br /></li><li>private BlockingObservable<Integer> FilterObserver() {<br /></li><li>return Observable.create(new Observable.OnSubscribe<Integer>() {<br /></li><li>@Override<br /></li><li>public void call(Subscriber<? super Integer> subscriber) {<br /></li><li>for (int i = 0; i < 5; i++) {<br /></li><li>try {<br /></li><li>Thread.sleep(500);<br /></li><li>} catch (InterruptedException e) {<br /></li><li>e.printStackTrace();<br /></li><li>}<br /></li><li>if (!subscriber.isUnsubscribed()) {<br /></li><li>log("onNext:" + i);<br /></li><li>subscriber.onNext(i);<br /></li><li>}<br /></li><li>}<br /></li><li>if (!subscriber.isUnsubscribed()) {<br /></li><li>subscriber.onCompleted();<br /></li><li>}<br /></li><li>}<br /></li><li>}).toBlocking();<br /></li><li>}</li></ol>分别进行订阅
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("First");<br /></li><li>mLButton.setOnClickListener(e -> FirstObserver().subscribe(i -> log("First:" + i)));<br /></li><li>mRButton.setText(" Blocking");<br /></li><li>mRButton.setOnClickListener(e -> {<br /></li><li>log("blocking:" + FilterObserver().first(i -> i > 1));<br /></li><li>});</li></ol>运行结果如下。可以看到first操作符返回了第一个大于1的数2,而BlockingObservable则一直阻塞着,直到第一个大于1的数据发射出来。
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> skipObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).skip(2);<br /></li><li>}<br /></li><li>private Observable<Integer> takeObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).take(2);<br /></li><li>}</li></ol>分别进行订阅
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("Skip");<br /></li><li>mLButton.setOnClickListener(e -> skipObserver().subscribe(i -> log("Skip:" + i)));<br /></li><li>mRButton.setText("Take");<br /></li><li>mRButton.setOnClickListener(e -> takeObserver().subscribe(i -> log("Take:" + i)));</li></ol>运行结果如下,可以看到skip过滤掉了前两项,而take则过滤掉了除了前两项的其他所有项。
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> sampleObserver() {<br /></li><li>return createObserver().sample(1000, TimeUnit.MILLISECONDS);<br /></li><li>}<br /></li><li><br /></li><li>private Observable<Integer> throttleFirstObserver() {<br /></li><li>return createObserver().throttleFirst(1000, TimeUnit.MILLISECONDS);<br /></li><li>}<br /></li><li><br /></li><li><br /></li><li>private Observable<Integer> createObserver() {<br /></li><li>return Observable.create(new Observable.OnSubscribe<Integer>() {<br /></li><li>@Override<br /></li><li>public void call(Subscriber<? super Integer> subscriber) {<br /></li><li>for (int i = 0; i < 20; i++) {<br /></li><li>try {<br /></li><li>Thread.sleep(200);<br /></li><li>} catch (InterruptedException e) {<br /></li><li>e.printStackTrace();<br /></li><li>}<br /></li><li>subscriber.onNext(i);<br /></li><li>}<br /></li><li>subscriber.onCompleted();<br /></li><li>}<br /></li><li>});<br /></li><li>}</li></ol>分别进行订阅
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("sample");<br /></li><li>mLButton.setOnClickListener(e -> sampleObserver().subscribe(i -> log("sample:" + i)));<br /></li><li>mRButton.setText("throttleFirst");<br /></li><li>mRButton.setOnClickListener(e -> throttleFirstObserver().subscribe(i -> log("throttleFirst:" + i)));</li></ol>运行结果如下,可以看到sample操作符会每隔5个数字发射出一个数据来,而throttleFirst则会每隔5个数据发射第一个数据。