Heim  >  Artikel  >  Java  >  Eine detaillierte Einführung in RxJava_01[Was ist RxJava]

Eine detaillierte Einführung in RxJava_01[Was ist RxJava]

黄舟
黄舟Original
2017-03-04 09:38:141346Durchsuche


Dieses Tutorial basiert auf RxJava1. >1 Was ist RxJava?

Rx ist die Abkürzung für Reactive Erweiterungen, übersetzt als Antworterweiterung. Das heißt, über den Kern-Framework-Code, bei dem eine Partei Informationen sendet und die andere Partei auf die Informationen reagiert und diese verarbeitet.

  • Das Framework wurde von einem Team unter der Leitung des Microsoft-Architekten Erik Meijer entwickelt und wurde im November 2012 als Open Source veröffentlicht.

  • Die Rx-Bibliothek unterstützt .NET, JavaScript, C++ usw. und unterstützt mittlerweile fast alle gängigen Programmiersprachen.

  • Die meisten Sprachbibliotheken von Rx werden von der Organisation ReactiveX verwaltet. Die beliebtesten sind RxJava/RxJS/Rx.NET und die Community-Website ist reactivex.io.

  • RxJava ist ein beliebtes Framework, dessen Quellcode auf GitHub basiert. Zusätzlich zur Unterstützung von RxJava gibt es auch ein Support-Framework für Android-Systeme, RxAndroid

  • 2. Vereinfachter RxJava-Code

  • Wenn wir in Android-Projekten Daten aus dem Hintergrund abrufen und die Benutzeroberfläche aktualisieren möchten, sehen wir uns im Allgemeinen ein Beispiel an :

Das Obige Die Lesbarkeit des Codes nach mehreren Verschachtelungsebenen ist zu schlecht! Wenn Sie RxJava verwenden, können Sie es so schreiben:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();
Der Vorteil dieser Schreibweise besteht darin, dass sie den Verschachtelungsgrad reduziert und die Lesbarkeit des Codes verbessert. RxJava kann auch spezifische Operationen für jeden Methodenthread bereitstellen.

3. Einführung des Frameworks
Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

Derzeit wurde RxJava auf Version 2.0 aktualisiert, aber um RxJava besser zu verstehen, können wir mit dem Lernen ab Version 1.0 beginnen. Damit unser Android-Projekt RxJava besser nutzen kann, können wir die Gradle-Skriptabhängigkeit in das Projekt einführen:

Jetzt unterstützt unser Projekt bereits die Funktionen von RxJava.

4. Der Kern der Reaktionsfähigkeit
compile &#39;io.reactivex:rxandroid:1.2.1&#39;
compile &#39;io.reactivex:rxjava:1.1.6&#39;

Die sogenannte Reaktionsfähigkeit ist nichts anderes als die Existenz von zwei Teilen, ein Teil ist für das Senden von Ereignissen/Nachrichten verantwortlich und der andere Teil ist für die Beantwortung von Ereignissen/Nachrichten verantwortlich.

Wenn wir früher Nachrichten lesen wollten, mussten wir normalerweise Zeitungen lesen. Wenn Sie beispielsweise an einer Zeitung oder Zeitschrift interessiert sind, müssen Sie zunächst drei Dinge tun:

Geben Sie Ihre Privatadresse an

  1. Finden Sie die entsprechende Zeitung

  2. Gehen Sie zur Zeitung und abonnieren Sie die Zeitung für den ganzen Monat

  3. Nachdem Sie den oben genannten Prozess durchlaufen haben, werden neue Zeitungsmaterialien angezeigt erscheint jeden Tag, die Zeitung schickt Ihnen das Magazin nach Hause.

Codeabstraktion des obigen Beispiels, die Schritte sind wie folgt: Eine detaillierte Einführung in RxJava_01[Was ist RxJava]

Stellen Sie Beobachter bereit (weil Sie sind eine Person, der der Inhalt des Magazins am Herzen liegt, also sind Sie die Person, die das Ereignis beobachtet)

  1. Geben Sie die Person an, die beobachtet wird (solange ein neues Magazin herauskommt, Die Personen, die sich darum kümmern, müssen benachrichtigt werden, damit das Zeitungsbüro beobachtet wird. Objekt)

  2. Abonnement (das heißt, der Beobachter und das Beobachtete müssen miteinander in Beziehung gesetzt werden, damit so bald wenn sich das beobachtete Objekt ändert, wird das Objekt, das das Ereignis beobachtet, sofort benachrichtigt)

Der Democode für das obige Beispiel lautet wie folgt:

Eine detaillierte Einführung in RxJava_01[Was ist RxJava]

Die Ausgabe lautet wie folgt:

//1.创建被观察者
Observable<String> observable = 
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //4.开始发送事件 
                //事件有3个类型 分别是onNext() onCompleted() onError()
                //onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。
                subscriber.onNext("Hello Android !");
                subscriber.onNext("Hello Java !");
                subscriber.onNext("Hello C !");
                subscriber.onCompleted();
            }
        });

//2.创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//3.订阅
observable.subscribe(subscriber);
Wie der Code ausgeführt wird

com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted

Im obigen Code, wenn der Beobachter-Abonnent den abonniert Beobachtetes Observable, das System ruft automatisch call() innerhalb des Observable-Objekts zurück.

  • In der call()-Methodenentität von Observable, nach dem Senden von Ereignissen wie onNext/onCompleted/onError.

  • Dann kann der Teilnehmer zur entsprechenden Methode zurückrufen.

  • 5. Observable-Varianten

  • Für das normale Observable-Senden sind drei Methoden erforderlich: onNext, onError, onCompleted, während Single als Variante von Observable nur zwei Methoden erfordert:

onSuccess – Single gibt einen einzelnen Wert an diese Methode aus

  • onError – Wenn der erforderliche Wert nicht ausgegeben werden kann, gibt Single ein Throwable-Objekt an diese Methode aus

  • Single ruft nur eine dieser beiden Methoden auf, und zwar nur einmal. Nach dem Aufruf einer beliebigen Methode wird die Abonnementbeziehung beendet.

6. Beobachtervariante

Beobachterobjekt, oben verwenden wir stattdessen das Abonnentenobjekt. Weil das Objekt selbst Observer erbt.
final Single<String> single = Single.create(new Single.OnSubscribe<String>() {
            @Override
            public void call(SingleSubscriber<? super String> singleSubscriber) {
                //先调用onNext() 最后调用onCompleted() 
                //singleSubscriber.onSuccess("Hello Android !");
                //只调用onError();
                singleSubscriber.onError(new NullPointerException("mock Exception !"));
            }
        });

Observer<String> observer = new Observer<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};
single.subscribe(observer);

Dieses Objekt implementiert onNext()&onCompleted()&onError()-Ereignisse. Wenn wir uns mehr Gedanken über das Ereignis machen, müssen wir nur die entsprechende Methode implementieren:

Wenn Sie sich im obigen Code nur um das onNext()-Ereignis kümmern, aber onCompleted()- und onError()-Ereignisse implementieren müssen, erscheint dieser Code sehr aufgebläht. Angesichts dieser Nachfrage hat das RxJava-Framework spezifische Anpassungen im Abonnement vorgenommen. Der Code lautet wie folgt:

//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//订阅
observable.subscribe(subscriber);
Ich weiß nicht, ob Ihnen aufgefallen ist, dass subscribe() keine Beobachter mehr abonniert. sondern auf bestimmte onNext-Schnittstellenobjekte. Ähnliche Funktionen sind wie folgt. Wir können das entsprechende Abonnement nach Bedarf implementieren:

//为指定的onNext事件创建独立的接口
Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i(TAG, "call: "+s);
    }
};

//订阅
observable.subscribe(onNextAction);

public Subscription subscribe(final Observer Observer)

  • public Subscription subscribe(final Action1 onNext)

  • public Subscription subscribe(final Action1 onNext, Action1 onError)

  • public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

  • 这里还有一个forEach函数有类似的功能:

    • public void forEach(final Action1 onNext)

    • public void forEach(final Action1 onNext, Action1 onError)

    • public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

    7.Subject变种

    上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。

    针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。

    AsyncSubject

    一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。

    以下贴出代码:

    //创建被观察者
    final AsyncSubject<String> subject = AsyncSubject.create();
    //创建观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i(TAG, "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError");
        }
    
        @Override
        public void onNext(String s) {
            Log.i(TAG, "s:" + s);
    
        }
    };
    //订阅事件
    subject.subscribe(subscriber);
    //被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。
    subject.onNext("Hello Android ");
    subject.onNext("Hello Java ");
    subject.onCompleted();

    输出:

    s:Hello Java 
    onCompleted

    然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

    上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:

    subject.onNext("Hello Android ");
    subject.onNext("Hello Java ");
    //因为发送了异常 所以onNext()无法被打印
    subject.onError(null);

    BehaviorSubject

    当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:

    BehaviorSubject subject=BehaviorSubject.create("NROMAL");
    
    Subscriber subscriber = new Subscriber() {
        @Override
        public void onCompleted() {
            Log.i(TAG, "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError");
        }
    
        @Override
        public void onNext(Object o) {
            Log.i(TAG, "onNext: " + o);
        }
    };
    
    //subject.onNext("Hello Android !");
    //subject.onNext("Hello Java !");
    //subject.onNext("Hello C !");
    //这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效
    //如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效
    subject.subscribe(subscriber);
    
    subject.onNext("Hello CPP !");
    subject.onNext("Hello IOS !");

    PublishSubject

    PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

    需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。

    代码如下:

    PublishSubject subject= PublishSubject.create();
    
    Action1<String> onNextAction1 = new Action1<String>(){
    
        @Override
        public void call(String s) {
            Log.i(TAG, "onNextAction1 call: "+s);
        }
    };
    
    Action1<String> onNextAction2 = new Action1<String>(){
    
        @Override
        public void call(String s) {
            Log.i(TAG, "onNextAction2 call: "+s);
        }
    };
    
    subject.onNext("Hello Android !");
    subject.subscribe(onNextAction1);
    subject.onNext("Hello Java !");
    subject.subscribe(onNextAction2);
    subject.onNext("Hello IOS !");

    输出如下:

    onNextAction1 call: Hello Java !
    onNextAction1 call: Hello IOS !
    onNextAction2 call: Hello IOS !

    ReplaySubject

    ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

    代码如下:

    ReplaySubject subject= ReplaySubject.create();
    
    Action1<String> onNextAction1 = new Action1<String>(){
    
        @Override
        public void call(String s) {
            Log.i(TAG, "onNextAction1 call: "+s);
        }
    };
    
    Action1<String> onNextAction2 = new Action1<String>(){
    
        @Override
        public void call(String s) {
            Log.i(TAG, "onNextAction2 call: "+s);
        }
    };
    
    subject.onNext("Hello Android !");
    subject.subscribe(onNextAction1);
    subject.onNext("Hello Java !");
    subject.subscribe(onNextAction2);
    subject.onNext("Hello IOS !");

    输出如下:

    onNextAction1 call: Hello Android !
    onNextAction1 call: Hello Java !
    onNextAction2 call: Hello Android !
    onNextAction2 call: Hello Java !
    onNextAction1 call: Hello IOS !
    onNextAction2 call: Hello IOS !

    Subject总结

    • AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()

    • BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()

    • PublishSubject只会打印订阅后的任何事件。

    • ReplaySubject无论订阅在何时都会调用发送的事件。

     以上就是深入浅出RxJava_01[什么是RxJava] 的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!


    Stellungnahme:
    Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn