搜索
首页Javajava教程深入浅出RxJava_01[什么是RxJava] 的详细介绍


本教程基于RxJava1.x版本进行全面讲解,后续课程将陆续更新,敬请关注…

1.什么是RxJava

  • Rx是Reactive Extensions的简写,翻译为响应的扩展。也就是通过由一方发出信息,另一方响应信息并作出处理的核心框架代码。

  • 该框架由微软的架构师Erik Meijer领导的团队开发,并在2012年11月开源。

  • Rx库支持.NET、JavaScript和C++等,现在已经支持几乎全部的流行编程语言了。

  • Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。

  • RxJava作为一个流行的框架,其源码依托在GitHub,除了支持RxJava,针对安卓系统也除了一个支持框架RxAndroid

2.RxJava简化代码

一般我们在安卓项目中,如果想从后台获取数据并刷新界面,代码大概如下,下面我们来看一个例子:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

上面的代码经过多层嵌套后 可读性太差了!如果你用了RxJava 可以这样写:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

这样写的好处就是减少层次嵌套 提高了代码的可读性,除了简化代码,RxJava还可以为每个方法提供特定的运行线程。

3.引入框架

目前RxJava已经升级为2.0版本,但为了能够更好的理解RxJava,我们可以从1.0版本开始学习。也为了让我们的安卓项目能够更好的使用RxJava,可以在项目中引入gradle脚本依赖:

compile &#39;io.reactivex:rxandroid:1.2.1&#39;
compile &#39;io.reactivex:rxjava:1.1.6&#39;

现在 我们的项目已经支持RxJava的功能了。

4.响应式的核心

所谓的响应式,无非就是存在这样的2个部分,一部分负责发送事件/消息,另一部分负责响应事件/消息。

以前如果我们想看新闻,一般需要通过看报纸。比如,你对某个报刊杂志比较感兴趣,那么你首先要做3件事:

  1. 提供你家的地址

  2. 找到对应的报社

  3. 去报社订阅整个月的报纸

经过了上面的流程,以后每天只要有新的报刊资料出来了,报社都会将杂志发送到你家。

这里写图片描述

将上面的例子进行代码抽象,步骤如下:

  1. 提供观察者(因为你是关心杂志内容的人 所以你是观察该事件的人)

  2. 提供被观察者(只要有新的杂志出来 就需要通知关心的人 所以报社是被观察的对象)

  3. 订阅(也就是 观察者&被观察者之间要相互关联 以便被观察的对象一变化 就会马上通知观察该事件的对象)

这里写图片描述

上面示例的演示代码如下:

//1.创建被观察者
Observable<String> observable = 
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //4.开始发送事件 
                //事件有3个类型 分别是onNext() onCompleted() onError()
                //onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。
                subscriber.onNext("Hello Android !");
                subscriber.onNext("Hello Java !");
                subscriber.onNext("Hello C !");
                subscriber.onCompleted();
            }
        });

//2.创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//3.订阅
observable.subscribe(subscriber);

输出如下:

com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted

代码运行的原理

  • 上面的代码中,当观察者subscriber订阅了被观察者observable之后,系统会自动回调observable对象内部的call()。

  • 在observable的call()方法实体中,发送了如onNext/onCompleted/onError事件后。

  • 接着subscriber就能回调到到对应的方法。

5.被观察者变种

普通的Observable发送需要三个方法onNext, onError, onCompleted,而Single作为Observable的变种,只需要两个方法:

  • onSuccess - Single发射单个的值到这个方法

  • onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法

Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。

final Single<String> single = Single.create(new Single.OnSubscribe<String>() {
            @Override
            public void call(SingleSubscriber<? super String> singleSubscriber) {
                //先调用onNext() 最后调用onCompleted() 
                //singleSubscriber.onSuccess("Hello Android !");
                //只调用onError();
                singleSubscriber.onError(new NullPointerException("mock Exception !"));
            }
        });

Observer<String> observer = new Observer<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};
single.subscribe(observer);

6.观察者变种

Observer观察者对象,上面我们用Subscriber对象代替。因为该对象本身就是继承了Observer。

该对象实现了onNext()&onCompleted()&onError()事件,我们如果对哪个事件比较关心,只需要实现对应的方法即可,代码如下:

//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//订阅
observable.subscribe(subscriber);

上面的代码中,如果你只关心onNext()事件,但却不得不实现onCompleted()&onError()事件.这样的代码就显得很臃肿。鉴于这种需求,RxJava框架在订阅方面做了特定的调整,代码如下:

//为指定的onNext事件创建独立的接口
Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i(TAG, "call: "+s);
    }
};

//订阅
observable.subscribe(onNextAction);

不知道大家注意到没有,subscribe()订阅的不再是观察者,而是特定的onNext接口对象。类似的函数如下,我们可以根据需要实现对应的订阅:

  • public Subscription subscribe(final Observer observer)

  • public Subscription subscribe(final Action1 onNext)

  • public Subscription subscribe(final Action1 onNext, Action1 onError)

  • public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

这里还有一个forEach函数有类似的功能:

  • public void forEach(final Action1 onNext)

  • public void forEach(final Action1 onNext, Action1 onError)

  • public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

7.Subject变种

上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。

针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。

AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。

以下贴出代码:

//创建被观察者
final AsyncSubject<String> subject = AsyncSubject.create();
//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "s:" + s);

    }
};
//订阅事件
subject.subscribe(subscriber);
//被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();

输出:

s:Hello Java 
onCompleted

然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:

subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
//因为发送了异常 所以onNext()无法被打印
subject.onError(null);

BehaviorSubject

当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:

BehaviorSubject subject=BehaviorSubject.create("NROMAL");

Subscriber subscriber = new Subscriber() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext: " + o);
    }
};

//subject.onNext("Hello Android !");
//subject.onNext("Hello Java !");
//subject.onNext("Hello C !");
//这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效
//如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效
subject.subscribe(subscriber);

subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");

PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。

代码如下:

PublishSubject subject= PublishSubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction1 call: "+s);
    }
};

Action1<String> onNextAction2 = new Action1<String>(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction2 call: "+s);
    }
};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !

ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

代码如下:

ReplaySubject subject= ReplaySubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction1 call: "+s);
    }
};

Action1<String> onNextAction2 = new Action1<String>(){

    @Override
    public void call(String s) {
        Log.i(TAG, "onNextAction2 call: "+s);
    }
};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !

Subject总结

  • AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()

  • BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()

  • PublishSubject只会打印订阅后的任何事件。

  • ReplaySubject无论订阅在何时都会调用发送的事件。

 以上就是深入浅出RxJava_01[什么是RxJava] 的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!


声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
如何将Maven或Gradle用于高级Java项目管理,构建自动化和依赖性解决方案?如何将Maven或Gradle用于高级Java项目管理,构建自动化和依赖性解决方案?Mar 17, 2025 pm 05:46 PM

本文讨论了使用Maven和Gradle进行Java项目管理,构建自动化和依赖性解决方案,以比较其方法和优化策略。

如何使用适当的版本控制和依赖项管理创建和使用自定义Java库(JAR文件)?如何使用适当的版本控制和依赖项管理创建和使用自定义Java库(JAR文件)?Mar 17, 2025 pm 05:45 PM

本文使用Maven和Gradle之类的工具讨论了具有适当的版本控制和依赖关系管理的自定义Java库(JAR文件)的创建和使用。

如何使用咖啡因或Guava Cache等库在Java应用程序中实现多层缓存?如何使用咖啡因或Guava Cache等库在Java应用程序中实现多层缓存?Mar 17, 2025 pm 05:44 PM

本文讨论了使用咖啡因和Guava缓存在Java中实施多层缓存以提高应用程序性能。它涵盖设置,集成和绩效优势,以及配置和驱逐政策管理最佳PRA

如何将JPA(Java持久性API)用于具有高级功能(例如缓存和懒惰加载)的对象相关映射?如何将JPA(Java持久性API)用于具有高级功能(例如缓存和懒惰加载)的对象相关映射?Mar 17, 2025 pm 05:43 PM

本文讨论了使用JPA进行对象相关映射,并具有高级功能,例如缓存和懒惰加载。它涵盖了设置,实体映射和优化性能的最佳实践,同时突出潜在的陷阱。[159个字符]

Java的类负载机制如何起作用,包括不同的类载荷及其委托模型?Java的类负载机制如何起作用,包括不同的类载荷及其委托模型?Mar 17, 2025 pm 05:35 PM

Java的类上载涉及使用带有引导,扩展程序和应用程序类负载器的分层系统加载,链接和初始化类。父代授权模型确保首先加载核心类别,从而影响自定义类LOA

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
1 个月前By尊渡假赌尊渡假赌尊渡假赌

热工具

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

将Eclipse与SAP NetWeaver应用服务器集成。

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

Dreamweaver Mac版

Dreamweaver Mac版

视觉化网页开发工具

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中