>  기사  >  Java  >  RxJava_01에 대한 자세한 소개[RxJava란?]

RxJava_01에 대한 자세한 소개[RxJava란?]

黄舟
黄舟원래의
2017-03-04 09:38:141375검색


이 튜토리얼은 RxJava1을 기반으로 합니다. >1. RxJava란

Rx는 Reactive의 약자입니다. 확장은 응답 확장으로 번역됩니다. 즉, 한쪽이 정보를 보내고 다른 쪽은 이에 응답하고 처리하는 핵심 프레임워크 코드를 통해 이루어집니다.

  • 이 프레임워크는 Microsoft 설계자 Erik Meijer가 이끄는 팀에 의해 개발되었으며 2012년 11월에 오픈 소스로 공개되었습니다.

  • Rx 라이브러리는 .NET, JavaScript, C++ 등을 지원하며 이제 거의 모든 인기 있는 프로그래밍 언어를 지원합니다.

  • 2. RxJava 단순화된 코드
  • 일반적으로 Android 프로젝트에서 백그라운드에서 데이터를 가져와 인터페이스를 새로 고치려는 경우 코드는 대략 다음과 같습니다. :

    new Thread() {
        @Override
        public void run() {
            super.run();
            for (File folder : folders) {
                File[] files = folder.listFiles();
                for (File file : files) {
                    if (file.getName().endsWith(".png")) {
                        final Bitmap bitmap = getBitmapFromFile(file);
                        getActivity().runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                imageCollectorView.addImage(bitmap);
                            }
                        });
                    }
                }
            }
        }
    }.start();
  • 위 코드는 여러 단계 중첩 후 가독성이 너무 떨어집니다! RxJava를 사용하는 경우 다음과 같이 작성할 수 있습니다.
  • Observable.from(folders)
        .flatMap(new Func1<File, Observable<File>>() {
            @Override
            public Observable<File> call(File file) {
                return Observable.from(file.listFiles());
            }
        })
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                return file.getName().endsWith(".png");
            }
        })
        .map(new Func1<File, Bitmap>() {
            @Override
            public Bitmap call(File file) {
                return getBitmapFromFile(file);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) {
                imageCollectorView.addImage(bitmap);
            }
        });

    이런 방식으로 작성하면 중첩 수준이 줄어들고 코드 가독성이 향상된다는 장점이 있습니다. RxJava는 코드를 단순화할 수도 있습니다. 각 메서드에 대해 특정 실행 스레드를 제공합니다.

  • 3. 프레임워크 소개
  • 현재 RxJava는 2.0 버전으로 업그레이드 되었지만, RxJava를 더 잘 이해하기 위해 1.0 버전부터 학습을 시작해보겠습니다. Android 프로젝트에서 RxJava를 더 잘 사용할 수 있도록 프로젝트에 Gradle 스크립트 종속성을 도입할 수 있습니다.

    compile &#39;io.reactivex:rxandroid:1.2.1&#39;
    compile &#39;io.reactivex:rxjava:1.1.6&#39;
  • 이제 우리 프로젝트는 이미 RxJava 기능을 지원합니다.

4. 반응성의 핵심

소위 반응성은 이벤트/메시지 전송을 담당하는 부분과 다른 부분으로 구성됩니다. 이벤트/메시지에 응답하는 역할을 담당합니다.

과거에는 뉴스를 보려면 주로 신문을 읽어야 했습니다. 예를 들어, 신문이나 잡지에 관심이 있다면 먼저

집 주소 제공

3가지 작업을 수행해야 합니다. 해당 신문

신문에 가서 한 달간 신문을 구독하세요

    위의 과정을 거쳐 새로운 신문자료가 매일 발행됩니다. 신문에서 잡지를 집으로 보내드립니다.
  1. 위 예의 코드 추상화, 단계는 다음과 같습니다.

  2. 관찰자를 제공합니다(왜냐하면 당신은 매거진의 내용에 관심을 갖는 사람이므로 이벤트를 관찰하는 사람입니다)

관찰되는 사람을 제공합니다(새로운 잡지가 나오는 한, 관심있는 사람들에게 알려야하므로 신문사를 관찰하고 있습니다 개체)

RxJava_01에 대한 자세한 소개[RxJava란?]

구독(즉, 관찰자와 관찰자가 서로 연관되어 있어야 합니다. 관찰된 객체가 변경됨에 따라 이벤트를 관찰하는 객체에게 즉시 통보됩니다)

  1. 위 예제의 데모 코드는 다음과 같습니다.
  2. //1.创建被观察者
    Observable<String> observable = 
            Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    //4.开始发送事件 
                    //事件有3个类型 分别是onNext() onCompleted() onError()
                    //onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。
                    subscriber.onNext("Hello Android !");
                    subscriber.onNext("Hello Java !");
                    subscriber.onNext("Hello C !");
                    subscriber.onCompleted();
                }
            });
    
    //2.创建观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i(TAG, "onCompleted ");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError: "+e.getLocalizedMessage());
        }
    
        @Override
        public void onNext(String s) {
            Log.i(TAG, "onNext: "+s);
        }
    };
    
    //3.订阅
    observable.subscribe(subscriber);
  3. 출력은 다음과 같습니다:

    com.m520it.rxjava I/IT520: onNext: Hello Android !
    com.m520it.rxjava I/IT520: onNext: Hello Java !
    com.m520it.rxjava I/IT520: onNext: Hello C !
    com.m520it.rxjava I/IT520: onCompleted

    코드 실행 방법
  4. 위 코드에서 관찰자 구독자가 관찰된 Observable을 구독하면 시스템은 관찰 가능한 객체 내에서 call()을 자동으로 콜백합니다.

RxJava_01에 대한 자세한 소개[RxJava란?]Observable의 call() 메소드 엔터티에서 onNext/onCompleted/onError와 같은 이벤트를 보낸 후.

그러면 구독자는 해당 메서드를 다시 호출할 수 있습니다.

    5. Observable 변형
  • 일반적인 Observable 전송에는 onNext, onError, onCompleted 세 가지 메서드가 필요한 반면, Single은 Observable의 변형으로 두 가지 메서드만 필요합니다.

  • onSuccess - Single이 이 메서드에 단일 값을 내보냅니다.

  • onError - 필요한 값을 내보낼 수 없는 경우 Single은 이 메서드에 Throwable 개체를 내보냅니다.

싱글은 이 두 메서드 중 하나만 호출하며, 어떤 메서드든 호출한 후에는 구독 관계가 종료됩니다.

final Single<String> single = Single.create(new Single.OnSubscribe<String>() {
            @Override
            public void call(SingleSubscriber<? super String> singleSubscriber) {
                //先调用onNext() 最后调用onCompleted() 
                //singleSubscriber.onSuccess("Hello Android !");
                //只调用onError();
                singleSubscriber.onError(new NullPointerException("mock Exception !"));
            }
        });

Observer<String> observer = new Observer<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};
single.subscribe(observer);

6. Observer 변형

    Observer 관찰자 개체, 위에서는 대신 Subscriber 개체를 사용합니다. 객체 자체가 Observer를 상속하기 때문입니다.
  • 이 개체는 onNext()&onCompleted()&onError() 이벤트를 구현합니다. 어떤 이벤트가 더 중요하다면 해당 메서드만 구현하면 됩니다.

    //创建观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i(TAG, "onCompleted ");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError: "+e.getLocalizedMessage());
        }
    
        @Override
        public void onNext(String s) {
            Log.i(TAG, "onNext: "+s);
        }
    };
    
    //订阅
    observable.subscribe(subscriber);
  • 위 코드에서 onNext() 이벤트에만 관심이 있지만 onCompleted()&onError() 이벤트를 구현해야 한다면 이러한 코드는 매우 부풀려 보일 것입니다. 이러한 요구를 고려하여 RxJava 프레임워크는 구독에서 특정 조정을 수행했습니다. 코드는 다음과 같습니다:
  • //为指定的onNext事件创建独立的接口
    Action1<String> onNextAction = new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: "+s);
        }
    };
    
    //订阅
    observable.subscribe(onNextAction);

    subscribe()가 더 이상 관찰자를 구독하지 않는다는 것을 눈치채셨는지 모르겠지만 특정 onNext 인터페이스. 비슷한 기능은 다음과 같습니다. 필요에 따라 해당 구독을 구현할 수 있습니다.

공개 구독 구독(최종 관찰자 관찰자)

  • public Subscription subscribe(final Action1 onNext)

  • public Subscription subscribe(final Action1 onNext, Action1 onError)

  • public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

  • 这里还有一个forEach函数有类似的功能:

    • public void forEach(final Action1 onNext)

    • public void forEach(final Action1 onNext, Action1 onError)

    • public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

    7.Subject变种

    上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。

    针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。

    AsyncSubject

    一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。

    以下贴出代码:

    //创建被观察者
    final AsyncSubject<String> subject = AsyncSubject.create();
    //创建观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i(TAG, "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError");
        }
    
        @Override
        public void onNext(String s) {
            Log.i(TAG, "s:" + s);
    
        }
    };
    //订阅事件
    subject.subscribe(subscriber);
    //被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。
    subject.onNext("Hello Android ");
    subject.onNext("Hello Java ");
    subject.onCompleted();

    输出:

    s:Hello Java 
    onCompleted

    然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

    上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:

    subject.onNext("Hello Android ");
    subject.onNext("Hello Java ");
    //因为发送了异常 所以onNext()无法被打印
    subject.onError(null);

    BehaviorSubject

    当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:

    BehaviorSubject subject=BehaviorSubject.create("NROMAL");
    
    Subscriber subscriber = new Subscriber() {
        @Override
        public void onCompleted() {
            Log.i(TAG, "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError");
        }
    
        @Override
        public void onNext(Object o) {
            Log.i(TAG, "onNext: " + o);
        }
    };
    
    //subject.onNext("Hello Android !");
    //subject.onNext("Hello Java !");
    //subject.onNext("Hello C !");
    //这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效
    //如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效
    subject.subscribe(subscriber);
    
    subject.onNext("Hello CPP !");
    subject.onNext("Hello IOS !");

    PublishSubject

    PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

    需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。

    代码如下:

    PublishSubject subject= PublishSubject.create();
    
    Action1<String> onNextAction1 = new Action1<String>(){
    
        @Override
        public void call(String s) {
            Log.i(TAG, "onNextAction1 call: "+s);
        }
    };
    
    Action1<String> onNextAction2 = new Action1<String>(){
    
        @Override
        public void call(String s) {
            Log.i(TAG, "onNextAction2 call: "+s);
        }
    };
    
    subject.onNext("Hello Android !");
    subject.subscribe(onNextAction1);
    subject.onNext("Hello Java !");
    subject.subscribe(onNextAction2);
    subject.onNext("Hello IOS !");

    输出如下:

    onNextAction1 call: Hello Java !
    onNextAction1 call: Hello IOS !
    onNextAction2 call: Hello IOS !

    ReplaySubject

    ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

    代码如下:

    ReplaySubject subject= ReplaySubject.create();
    
    Action1<String> onNextAction1 = new Action1<String>(){
    
        @Override
        public void call(String s) {
            Log.i(TAG, "onNextAction1 call: "+s);
        }
    };
    
    Action1<String> onNextAction2 = new Action1<String>(){
    
        @Override
        public void call(String s) {
            Log.i(TAG, "onNextAction2 call: "+s);
        }
    };
    
    subject.onNext("Hello Android !");
    subject.subscribe(onNextAction1);
    subject.onNext("Hello Java !");
    subject.subscribe(onNextAction2);
    subject.onNext("Hello IOS !");

    输出如下:

    onNextAction1 call: Hello Android !
    onNextAction1 call: Hello Java !
    onNextAction2 call: Hello Android !
    onNextAction2 call: Hello Java !
    onNextAction1 call: Hello IOS !
    onNextAction2 call: Hello IOS !

    Subject总结

    • AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()

    • BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()

    • PublishSubject只会打印订阅后的任何事件。

    • ReplaySubject无论订阅在何时都会调用发送的事件。

     以上就是深入浅出RxJava_01[什么是RxJava] 的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!


    성명:
    본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.