RxJava : What is RxJava, how RxJava is designed and works in android.

This article is not a guide on how to use RxJava features but a more fundamental exploration, what it is that RxJava is solving in our codebase and how it is doing it?

RxJava has become the single most important skill for Android development. Most of you must have worked with it in some form, either in your own codebase or through other third party libraries, like Fast Android Networking and Retrofit.

There are a huge number of articles and tutorials written on how to use RxJava, then why I am writing this redundant article?

Let’s start with the official definition of RxJava from its GitHub repository:

Definition of RxJava:

RxJava is a Java VM implementation of Reactive Extensions.

So, what is Reactive Extension?

The official doc describes Reactive Extension(ReactiveX) as a library for composing asynchronous and event-based programs by using observable sequences.

The definition presents three different terms: asynchronous, event-based, and observable sequences. It also says composing programs.

  1. Asynchronous: It implies that the different parts of a program run simultaneously.
  2. Event-Based: The program executes the codes based on the events generated while the program is running. For Example, a Button click triggers an event and then the program’s event handler receives this event and does some work.
  3. Observable sequences: It will be better understood by the mechanics of it. So, we will come back to it later in this article.
  4. RxJava free us from the callback hell by providing the composing style of programming. We can plug in various transformations that resembles the Functional programming.

The Reactive Extension is further elaborated: It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

Here, observer pattern is the key to understand the architecture, which is a software design pattern, where Subject maintains its Observers and notifies them automatically of any state change, usually by calling one of their methods.

Now you must think, what’s new about it? We have been doing these for decades. These are just fancy words for a Publisher-Subscriber library. Also to make things look more enhanced, Functional is associated with the RxJava.

The doc says: It is sometimes called “functional reactive programming” but this is a misnomer. ReactiveX may be functional, and it may be reactive, but “functional reactive programming” is a different animal. One main point of difference is that functional reactive programming operates on values that change continuously over time, while ReactiveX operates on discrete values that are emitted over time.

So, RxJava only has some patterns similar to the principles of Functional programming.

Now, you must be thinking why RxJava is so popular then?

You will really appreciate the power of RxJava when you write a thread management program. My deep appreciations for RxJava developed when I later explored it in comparison to the JPost library I wrote. JPost is an extended publisher-subscriber library for Java and Android which I created sometime back.

In the JPost subscribers communicates with each other over controlled channels synchronously/asynchronously. The thread management was done using ExecutorService, carefully designed for preventing memory leaks and enforce Thread safety. If I were to design this system using RxJava now then it would take fewer lines of code and much less effort(but less learning of course).

Below is the architectural diagram of my JPost library.

RxJAVA

Now, that I have expresses my deep appreciation for RxJava, let’s try to see and understand the anatomy of it.

The RxJava is an implementation of a Reactive Stream specification. I found below paragraph as the best definition or rather summation of a Reactive Stream.

Reactive Streams is a programming concept for handling asynchronous data streams in a non-blocking manner while providing backpressure to stream publishers. It has evolved into a specification that is based on the concept of Publisher<T> and Subscriber<T>. A Publisher is the source of events T in the stream, and a Subscriber is a consumer for those events. A Subscriber subscribes to a Publisher by invoking a “factory method” in the Publisher that will push the stream items <T> starting a new Subscription. This is also called Reactor Pattern.

There are a number of terms in the above definition.

  1. Data Stream: It is the flow of data(events/objects/items) in a sequence.
  2. non-blocking: The thread does not have to wait for the monitor lock. In concurrent programming, a monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become true.
  3. Backpressure: We will try to understand it later when context become favorable for its discussion.
  4. Mechanism of Publisher and Subscriber: It is described below:

How does this Reactive Stream implement?

RxJava: Observable subscriber

The base implementation is based on 4 API components:

Publisher: These interfaces(APIs) define the structure of RxJava.

Code:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Subscriber: A Subscriber exposes the following callback methods.

Code:
public interface Subscriber<T> {
    //signals to the Publisher to start sending events
    public void onSubscribe(Subscription s);     
    
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

3. Subscription: When the Subscriber becomes ready to start handling events, it signals the Publisher through its Subscription

Note:

To avoid the name clash, the RxJava 1 rx.Subscription has been renamed into io.reactivex.Disposable in RxJava 2.

Code:
public interface Subscription {
    public void request(long n); //request n items
    public void cancel();
}

Upon receiving this signal, the Publisher begins to invoke Subscriber::onNext(T) for each event T. This continues until either completion of the stream (Subscriber::onComplete()) or an error occurs during processing (Subscriber::onError(Throwable)).

4. Processor: It represents a processing stage—which is both a Subscriber and a Publisher and obeys the contracts of both.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

In the Reactive-Streams specification, the Subject-like behavior, namely being a consumer and supplier of events at the same time, is done by the Processor interface.

Publishers

Let’s see how publishers are structured in RxJava. RxJava provides following types of event publishers:

  • Flowable: A Publisher that emits 0..N elements, and then completes successfully or with an error.
  • Observable: Similar to Flowables but without a backpressure strategy. They were introduced in RxJava 1.x.
  • Single: It completes with a value successfully or an error.(doesn’t have onComplete callback, instead onSuccess(val)).
  • Maybe: It completes with/without a value or completes with an error.
  • Completable: It just signals if it has completed successfully or with an error.

To create a stream in RxJava we can use one of the Publisher’s factory methods.

Flowable<Integer> flowable = Flowable.just(6, 4, 10, 20);
Flowable<String> flowable = Flowable.fromIterable(List.of("red", "green", "blue"));
There are a number of factory methods or we can use Observable.create(…) to implement the emissions of events by calling onNext(val), onComplete(), onError(throwable).

Observable<Integer> stream = Observable.create(subscriber -> {
    System.out.println("Started emitting");

    System.out.println("Emitting 1st");
    subscriber.onNext(1);

    System.out.println("Emitting 2nd");
    subscriber.onNext(2);
    System.out.println("Emitting onComplete");
    subscriber.onComplete();
});

Remember we came across Observable sequences when defining Reactive Extension. Now we can see that the all Publishers like Observable and Flowable take some items and pass onto its subscribers. So, these items are called as Observable sequences or Data Stream.

An important property of the Stream is that it’s lazy, meaning that the code inside create() doesn’t get executed without subscribing to the stream.

Cold and Hot Publishers

Cold publishers: They only begin emitting when there is a subscriber, and all subscribers receive the exact set of historical data, regardless of when they subscribe. The subscriber is taking elements only when it is ready to process that item, and items do not need to be buffered in a publishers because they are requested in a pull fashion. The Flowable and the Observable we created above are cold publishers.

Hot publishers: They begins generating items and emits them immediately when they are created. They emits items at its own pace, and it is up to its subscriber to keep up. When the subscriber is not able to consume items as quickly as they are produced by a publisher, they need to be buffered or handled in some other way, as they will fill up the memory, finally causing OutOfMemoryException. They can be considered as push scenarios.

Subjects are one way to handle hot observables. Subjects keep the reference to their subscribers and allow ‘multicasting’ an event to them.

Note:z

The AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject and UnicastSubject in 2.x don’t support backpressure (as part of the 2.x Observable family).

The AsyncProcessor, BehaviorProcessor, PublishProcessor, ReplayProcessor and UnicastProcessor are backpressure-aware.

Subscriber

The Subscriber as mentioned in the above text subscribes to a Publisher and then receives events in one of its callback methods.

Canceling subscription

When a Subscriber subscribes a Publisher then in RxJava2, a Disposable instance is returned which can be used to cancelled/disposed a Subscriber externally via Disposable::dispose().

Inside the create() method, we can check if there are still active subscribers to our Flowable/Observable.

There are operators that also unsubscribe from the stream so the source knows to stop producing events. For example, take(limit) is a simple operator. Its role is to count the number of events and then unsubscribes from its source once it received the specified amount and calls onComplete() to its subscriber.

Error handling

In reactive, exceptions are first class citizens. Since streams are generally asynchronous, it doesn’t make sense to throw an exception, so instead any exception is passed as an event in the stream. The Observable contract specifies that exceptions are terminal operations. That means in case an error reaches the Subscriber, after invoking the ‘onError’ handler, it also unsubscribes. There are operators to continue the subscription in case it is required.

Operators

The real power of RxJava is realized in the manipulation of the data streams. There are a lot of operators available in RxJava that simplifies this process. They are applied between the source Observable / Flowable and the Subscriber.

Example:

flatMap: It takes the emissions (objects, collections, or arrays) from an Observable, and maps those elements to individual Observables, then flattens the emissions from all of those into a single Observable.

Observable.fromArray("janishar ali anwar")
        .flatMap(word -> Observable.fromArray(word.split(" ")))
        .zipWith(Observable.range(1, Integer.MAX_VALUE),
                (string, count) -> String.format("%2d. %s", count, string))
        .subscribe(
                val -> System.out.print(val + " "),
                err -> {
                    System.out.println("\nerror ");
                },
                () -> System.out.println("\ncompleted"));
 

Output: 1. janishar 2. ali 3. anwar
completed

Other important operators are:

filter, map, delay, interval, reduce, collect, defer, zipWith, merge, concat, timeout, retry, onErrorReturn, toList.

Schedulers

As I mentioned about my library JPost, where I had to manage the Thread at a low level. RxJava Schedulers are for this purpose. They provide high-level constructs to manage with concurrency and deal with details by itself. They create workers who are responsible for scheduling and running code. By default RxJava will not introduce concurrency and will run the operations on the subscription thread.

There are two methods through which we can introduce Schedulers into our chain of operations:

  • subscribeOn: It specify which Scheduler invokes the code contained in Observable.create().
  • observeOn: It allows control to which Scheduler executes the code in the downstream operators.

RxJava provides some general use Schedulers:

  • Schedulers.computation() : Used for CPU intensive tasks.
  • Schedulers.io(): Used for IO bound tasks.
  • Schedulers.from(Executor): Use with custom ExecutorService.
  • Schedulers.newThread(): It always creates a new thread when a worker is needed. Since it’s not thread pooled and always creates a new thread instead of reusing one, this scheduler is not very useful.
Backpressure

In cases where a publisher is emitting items more rapidly than an operator or subscriber can consume them, then the items that are overflowing from the publisher need to be handled. If for example we try to maintain an ever-expanding buffer of items emitted by the faster publisher to eventually combine with items emitted by the slower one. This could result in OutOfMemoryError.

Backpressure relates to a feedback mechanism through which the subscriber can signal to the producer how much data it can consume and so to produce only that amount.

There are a number of way to handle the backpressure like buffering, batching, skipping etc. and there are operators to deal with them.

Let’s see some scenarios:

The Subscriber has an onSubscribe(Subscription) method, Subscription::request(long n), it is through this it can signal upstream that it’s ready to receive a number of items and after it processes the items request another batch. There is a default implementation which requests Long.MAX_VALUE which basically means “send all you have”. But we have not seen the code in the producer that takes consideration of the number of items requested by the subscriber.

Let’s see the Flowable example:

Flowable.create(subscriber -> {
    int count = 0;
    while (true) {
        count++;
        subscriber.onNext(count + "\n");
    }

}, BackpressureStrategy.DROP)
        .observeOn(Schedulers.newThread(), false, 3)
        .subscribe(
                val -> {
                    Thread.sleep(1000);
                    System.out.println(val);
                },
                err -> {
                    System.out.println("error ");
                    err.printStackTrace();
                },
                () -> System.out.println("completed")
        );
 

The 2nd parameter BackpressureStrategy allows us to specify what to do in the case of overproduction.

  • BackpressureStrategy.BUFFER: It buffers in memory the events that overflow. If we don’t provide some threshold, it might lead to OutOfMemoryError.
  • BackpressureStrategy.DROP: It simply drop the overflowing events
  • BackpressureStrategy.LATEST: It keeps only recent event and discards previous unconsumed events.
  • BackpressureStrategy.ERROR: We get an error in the Subscriber immediately.
  • BackpressureStrategy.MISSING: We use this when we don’t care about backpressure(we let one of the downstream operators onBackpressureXXX handle it)

By default the subscriber requests Long.MAX_VALUE since the code flowable.subscribe(onNext, onError, onComplete) uses a default onSubscribe. So unless we override onSubscribe, it would overflow. We can control this through operator observeOn() that make its own request to the upstream Publisher(256 by default), but can take a parameter to specify the request size.

In the above example, initially, the Subscriber requests Long.MAX_VALUE, but as the subscription travels upstream through the operators to the source Flowable, the operator observeOn subscribes to the source and requests just 3 items from the source instead. Since we used BackpressureStrategy.DROP, all the items emitted outside the 3, get discarded and thus never reach our subscriber.

There are also specialized operators to handle backpressure the onBackpressureXXX operators:

  1. onBackpressureBuffer
  2. onBackpressureDrop
  3. onBackpressureLatest
.onBackpressureDrop(val -> System.out.println("Dropping " + val))
These operators request Long.MAX_VALUE(unbounded amount) from upstream and then take it upon themselves to manage the requests from downstream. In the case of onBackpressureBuffer it adds in an internal queue and sends downstream the events as requested, onBackpressureDrop just discards events that are received from upstream more than requested from downstream, onBackpressureLatest also drops emitted events excluding the last emitted event(most recent). The last onBackpressureXXX operator overrides the previous one if they are chained.

Save

Save

Leave a Reply

Your email address will not be published. Required fields are marked *