RxJava in Android

A lot of people might already be using Rxjava.It’s quite hard to figure out when you need RxJava.The context for this blog is the little bit test of What RxJava can look like, How you can get started with it, and also show you real life examples.It’s impossible to tell you everything about RxJava in this one blog.

Why Reactive?

The asynchronous source is going to break imperative programming.I don’t mean that it become impossible to write your program with you know the single asynchronous source or multiple. It’s just that the amount of complexity become so much and compounds. Such that it become unmaintained when you get into this like state soup.That’s exactly what we’re going to look at trying to solve.

Typical Code

In above code, Users needs to come from the network or needs to come from a database.Something that you can’t really synchronously call to because you don’t want to block the main thread.In that case, you can resort to a callback interface, but the callback interface has a couple of problems:

  1. It’s hard to control lifecycle.
  2. You can’t generally cancel it.
  3. It’s hard to change multiple callbacks.(if you have to do multiple things you have to coordinate all these callbacks)
  4. There’s no real standard way to do an error handling.
  5. Then there is also the question of what thread is this callback returning? You really want to return the original on the main thread usually.

AsyncTask is sort of solution to that is still not great because there is still the problem of changing and there’s still the problem of error handling.Error handling with AsyncTask is kind of ugly.

Let’s turn it to reactive. We want the user to become a stream of users. So, anytime users updated it’s just going to notify us that there’s a new value.

When we apply RxJava to this problem, we create an observable that returns us, users. Once you invoke this method, nothing is actually executed. It’s setting up a stream where once you subscribe to it, it will start giving you these user objects once you execute, or subscribe to this code.

An observable also has built-in error handling. You can cancel it, it can be either synchronous or asynchronous.Error handling is part of the stream and it propagates through the stream automatically there’s specialize call.Whenever data changed it gets pushed into you.You never have to pull it out.

The Stream

Subscribing is the way to get the data out of these observables. You just have to call this method, then subscribe to it and give it a subscriber. Every time user is produced, we get an onNext event. If something happens, you get an error with a trouble, so it might be any exception, including exceptions that happened when you were handling stuff within your observable. If it’s completed, it will tell you as well.

Subscription

Subscriptions(Disposables) is  kind of like a handle on an active stream and by storing that in some collection we can then use the lifecycle to automatically disconnect from all the streams at the appropriate moment (we can disconnect from that stream and we don’t end up leaking anything)

Unsubscribe

If you are done with a stream, you want to unsubscribe. For example, if you have a fragment or an activity, you’re dealing with lifecycle, you want to cancel the subscription. You want to cancel the observable emitting events. If you don’t unsubscribe, you might leak memory.

Schedulers

If you want to multithreading into your Observable, you can do so by Scheduler operators. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

The SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOnmultiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

RecyclerView Pagination using Rxjava

A common application feature is to load automatically more items as the user scrolls through the items. This is done by triggering a request for more data once the user crosses a threshold of remaining items before they’ve hit the end.

Define a dependency to RxJava 2.0 and retrofit

Our application to access the network we need to declare the INTERNET permission in the Android manifest file.

We have the following User model class:

Observing Users

One of the most common operations when dealing with asynchronous tasks on Android is to observe the task’s result or outcome on the main thread. With RxJava instead, you would declare you’re Flowable to be observed on the main thread

Now, to use this interface during runtime, we’ll need to build a Retrofit object

CompositeDisposable is a disposable container that can hold onto multiple other disposables and offers add and removal disposable.

PublishProcessor

The subject does not coordinate backpressure for its subscribers and implements a weaker onSubscribe which calls onLoad requests from the incoming Subscriptions. This makes it possible to subscribe the PublishSubject to multiple sources unlike the standard contract on Subscriber.

Due to the nature Flowables are constructed, the PublishProcessor can’t be instantiated through new but must be created via the create() method. Example usage.

Flowable

Flowable is the producer’s class that implemented Publisher.It is introduced from Rxjava2 and provides almost the same function as Observable which exists from Rxjava1. The difference from Observable is that the function of Backpressure is available.

Also, Flowable is a cold producer. A cold producer creates a user stream that informs data every time it is subscribed. Basically, Rxjava deals with cold producers.

Backpressure

Backpressure is a function that controls the amount of data notification. In Rxjava2, only Flowable has this function. Observable does not have this function.

This function uses when notification speed of data is earlier than the processing of receiving side. If the notification speed is faster than the receiving side processing, the processing on the receiving side can not make it in time, and data waiting to be processed accumulates. Backpressure solves this problem.

From Rxjava2, let’s also realize that the function of Backpressure is no longer available in Observable.

concatMap

Maps each value to an Observable, then flattens all of these inner Observables using concatAll.

concatMap is the way that you take item emission and kind turn them into observables that get folded back into the stream.So, once we get that data we put it into you know the network call.concatMap get back into the stream that’s move back to the main thread.

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable. Each new inner Observable is concatenated with the previous inner Observable.

RecyclerView Pagination

A common application feature automatically loads more items as the user scrolls through the items (infinite scroll). This is done by triggering a request for more data once the user crosses a threshold of remaining items before they’ve hit the end.

RecyclerView Pagnation

 

It’s important to fetch data before the user gets to the end of the list. Adding a threshold value. therefore, helps anticipate the need to append more data.

Implementing with RecycleView

Every Adapter  has support for binding to the OnScrollListener events which are triggered whenever user scrolls. Using this, we can define a basic UserPagnation which supports most use cases by creating our own class that extends OnScrollListener.

This is an abstract class, and that in order to use this, you must extend this base class and define the onLoadMoremethod to actually retrieve the new data. For example:

 

Download this project from GitHub.