search
HomeJavajavaTutorialRxJava basic usage case sharing source code analysis [with code]

This article details the basic usage of RxJava. RxJava is a magical framework with very simple usage, but the internal implementation is a bit complicated and the code logic is a bit convoluted. Online articles about RxJava source code analysis, source code posts are few and incomplete. The complete source code analysis is listed below for reference.

1. Basic usage of RxJava

  Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {
                
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {

            }
        });

2. First look at the .subscribe(new Observer()) method

 public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }

This is just the passed in observer parameter A simple encapsulation (ObserverableSubscriber)

Continue to look at the subscribe method

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that&#39;s not the appropriate approach
             * so I won&#39;t mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can&#39;t listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren&#39;t we throwing the hook&#39;s return value.
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

You only need to pay attention here

  RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
    public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }
RxJavaHooks.onObservableStart(observable, observable.onSubscribe) 
这个方法返回的是它的第二个参数,也就是Observable它自己的onSubscribe 对象,
所以在subscribe 方法里面调用了  onSubscribe.call(subscriber)方法

The subscriber here is the parameter passed in

protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

It can be seen that the onSubscribe object is the parameter passed in by create, then the whole process is very clear

The entire process will only be executed when the subscribe method is called: subscribe ===>Call onSubscribe.call(observer ) method also passes the observer in

Related recommendations:

RxJava Operator (8) Aggregate_PHP Tutorial

Video:JavaScript Basic strengthening video tutorial

The above is the detailed content of RxJava basic usage case sharing source code analysis [with code]. For more information, please follow other related articles on the PHP Chinese website!

Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
How does the JVM manage garbage collection across different platforms?How does the JVM manage garbage collection across different platforms?Apr 28, 2025 am 12:23 AM

JVMmanagesgarbagecollectionacrossplatformseffectivelybyusingagenerationalapproachandadaptingtoOSandhardwaredifferences.ItemploysvariouscollectorslikeSerial,Parallel,CMS,andG1,eachsuitedfordifferentscenarios.Performancecanbetunedwithflagslike-XX:NewRa

Why can Java code run on different operating systems without modification?Why can Java code run on different operating systems without modification?Apr 28, 2025 am 12:14 AM

Java code can run on different operating systems without modification, because Java's "write once, run everywhere" philosophy is implemented by Java virtual machine (JVM). As the intermediary between the compiled Java bytecode and the operating system, the JVM translates the bytecode into specific machine instructions to ensure that the program can run independently on any platform with JVM installed.

Describe the process of compiling and executing a Java program, highlighting platform independence.Describe the process of compiling and executing a Java program, highlighting platform independence.Apr 28, 2025 am 12:08 AM

The compilation and execution of Java programs achieve platform independence through bytecode and JVM. 1) Write Java source code and compile it into bytecode. 2) Use JVM to execute bytecode on any platform to ensure the code runs across platforms.

How does the underlying hardware architecture affect Java's performance?How does the underlying hardware architecture affect Java's performance?Apr 28, 2025 am 12:05 AM

Java performance is closely related to hardware architecture, and understanding this relationship can significantly improve programming capabilities. 1) The JVM converts Java bytecode into machine instructions through JIT compilation, which is affected by the CPU architecture. 2) Memory management and garbage collection are affected by RAM and memory bus speed. 3) Cache and branch prediction optimize Java code execution. 4) Multi-threading and parallel processing improve performance on multi-core systems.

Explain why native libraries can break Java's platform independence.Explain why native libraries can break Java's platform independence.Apr 28, 2025 am 12:02 AM

Using native libraries will destroy Java's platform independence, because these libraries need to be compiled separately for each operating system. 1) The native library interacts with Java through JNI, providing functions that cannot be directly implemented by Java. 2) Using native libraries increases project complexity and requires managing library files for different platforms. 3) Although native libraries can improve performance, they should be used with caution and conducted cross-platform testing.

How does the JVM handle differences in operating system APIs?How does the JVM handle differences in operating system APIs?Apr 27, 2025 am 12:18 AM

JVM handles operating system API differences through JavaNativeInterface (JNI) and Java standard library: 1. JNI allows Java code to call local code and directly interact with the operating system API. 2. The Java standard library provides a unified API, which is internally mapped to different operating system APIs to ensure that the code runs across platforms.

How does the modularity introduced in Java 9 impact platform independence?How does the modularity introduced in Java 9 impact platform independence?Apr 27, 2025 am 12:15 AM

modularitydoesnotdirectlyaffectJava'splatformindependence.Java'splatformindependenceismaintainedbytheJVM,butmodularityinfluencesapplicationstructureandmanagement,indirectlyimpactingplatformindependence.1)Deploymentanddistributionbecomemoreefficientwi

What is bytecode, and how does it relate to Java's platform independence?What is bytecode, and how does it relate to Java's platform independence?Apr 27, 2025 am 12:06 AM

BytecodeinJavaistheintermediaterepresentationthatenablesplatformindependence.1)Javacodeiscompiledintobytecodestoredin.classfiles.2)TheJVMinterpretsorcompilesthisbytecodeintomachinecodeatruntime,allowingthesamebytecodetorunonanydevicewithaJVM,thusfulf

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser is a secure browser environment for taking online exams securely. This software turns any computer into a secure workstation. It controls access to any utility and prevents students from using unauthorized resources.

EditPlus Chinese cracked version

EditPlus Chinese cracked version

Small size, syntax highlighting, does not support code prompt function

SublimeText3 Linux new version

SublimeText3 Linux new version

SublimeText3 Linux latest version

SecLists

SecLists

SecLists is the ultimate security tester's companion. It is a collection of various types of lists that are frequently used during security assessments, all in one place. SecLists helps make security testing more efficient and productive by conveniently providing all the lists a security tester might need. List types include usernames, passwords, URLs, fuzzing payloads, sensitive data patterns, web shells, and more. The tester can simply pull this repository onto a new test machine and he will have access to every type of list he needs.