Many people already using RxJava in their application for reactive programming. The context of this blog is the little bit test of What RxJava, and also show you real-life examples. It’s impossible to tell you everything about RxJava in this one blog.

Typical Code

// simple code
public List<User> getUsers() throws Exception {}
// Asynchronous callback
public void getUsers(UserCallback callback) {}
// Asynchronous execution with callback on main thread
public AsyncTask<Void, Void, Article> fetchUser() {}

In above code, User ‘s data come from the network or database.Something that you can’t really synchronously call to because of the block main thread.In that case, you can resort to a callback interface, but the callback interface has a couple of problems:

  1. It’s hard to control lifecycle.
  2. You can’t generally cancel it.
  3. It’s hard to change multiple callbacks.(if you have to do multiple things you have to coordinate all these callbacks)
  4. There’s no real standard way to do an error handling.
  5. Then there is also the question of what thread is this callback returning? You really want to return the original to the main thread usually.

AsyncTask is sort of solution to that is still not great because there is still the problem of changing and there’s still the problem of error handling.Error handling with AsyncTask is kind of ugly. Let’s turn it to reactive. We want the user to become a stream of users. So, anytime users updated it’s just going to notify us that there’s a new value.

@GET("/users")
Flowable<Response<List<User>>>
getUser(@Query("since") int since, @Query("per_page") int perPage);

When we apply RxJava to this problem, we create an observable that returns us, users. Once you invoke this method, nothing is actually executed. It’s setting up a stream where once you subscribe to it, it will start giving you these user objects once you execute, or subscribe to this code. An observable also has built-in error handling. You can cancel it, it can be either synchronous or asynchronous.Error handling is part of the stream and it propagates through the stream automatically there’s specialize call.Whenever data changed it gets pushed into you.You never have to pull it out.

Subscription s = getUser().subscribe(new Subscriber<User>() {
         @Override public void onNext(User user) {
           // this can be called multiple times
         }
         @Override public void onError(Throwable e) {
                // no more events, and automatically unsubscribed
         }
         @Override public void onCompleted() {
          // no more events, and automatically unsubscribed
         }
});

Unsubscribe

s.unsubscribe();

If you are done with a stream, you want to unsubscribe. For example, if you have a fragment or an activity, you’re dealing with lifecycle, you want to cancel the subscription. You want to cancel the observable emitting events. If you don’t unsubscribe, you might leak memory.

RecyclerView Pagination using Rxjava

A common application feature is to load automatically more items as the user scrolls through the items. This is done by triggering a request for more data once the user crosses a threshold of remaining items before they’ve hit the end.

Define a dependency to RxJava 2.0 and retrofit

android {
 ....
    //for lambda expression
   compileOptions {
      sourceCompatibility JavaVersion.VERSION_1_8
      targetCompatibility JavaVersion.VERSION_1_8
   }
 ...
}
ext {
 retrofitVersion = "2.2.0"
}
dependencies {
 ...
   compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
   compile 'io.reactivex.rxjava2:rxjava:2.0.9'
     //An Adapter for adapting RxJava 2.x types.
   compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'
   compile "com.squareup.retrofit2:retrofit:${retrofitVersion}"
   compile "com.squareup.retrofit2:converter-gson:${retrofitVersion}"
   compile 'com.github.bumptech.glide:glide:3.7.0'
   testCompile 'junit:junit:4.12'
   compile 'com.jakewharton:butterknife:8.5.1'
   annotationProcessor 'com.jakewharton:butterknife-compiler:8.5.1'
}

Our application to access the network we need to declare the INTERNET permission in the Android manifest file.

 <uses-permission android:name="android.permission.INTERNET" />

We have the following User model class:

public class User implements Parcelable {
    public int id;
    public String login;
    public String avatar_url;
}

Observing Users

Observe the task’s result or outcome on the main thread. You would declare you’re Flowable to be observed on the main thread.

public interface GitHubApi {
    //https://api.github.com/users?since=0&per_page=10
    @GET("/users")
    Flowable<Response<List<User>>> getUser
            (@Query("since") int since, @Query("per_page") int perPage);
}

Now, to use this interface during runtime, we’ll need to build a Retrofit object

public static GitHubApi createGitHubService() {
    Retrofit.Builder builder = new Retrofit.Builder()
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .baseUrl("https://api.github.com");
    return builder.build().create(GitHubApi.class);
}

CompositeDisposable is a disposable container that can hold onto multiple other disposables and offers add and removal disposable.

private CompositeDisposable compositeDisposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
     ...
     compositeDisposable = new CompositeDisposable();
     ...
}
@Override
protected void onDestroy() {
    super.onDestroy();
    //prevent memory leak
    compositeDisposable.dispose();
}

PublishProcessor

PublishProcessor can’t be instantiated through new but must be created via the create() method. Example usage.

 private PublishProcessor<Integer> pagination = PublishProcessor.create();

Flowable The difference from Observable is that the function of Backpressure is available.

private Flowable<Response<List<User>>> getUserList(int fromId) {
     return gitHubApi.getUser(fromId, Constants.PAGE_LIMIT)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

Flowable creates a user stream that informs data every time it is subscribed.

Disposable disposable = pagination.onBackpressureDrop()
           .doOnNext(integer -> {
                 requestOnWay = true;
                 loadUser.setVisibility(View.VISIBLE);
            })
            .concatMap(new Function<Integer, Publisher<Response<List<User>>>>() {
                 @Override
                 public Publisher<Response<List<User>>> apply(Integer fromId) throws Exception {
                        return getUserList(fromId);
                 }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<Response<List<User>>>() {
            @Override
            public void accept(Response<List<User>> gitHubUsers) throws Exception {
                   if (gitHubUsers.isSuccessful()) {
                       mAdapter.setUsers(gitHubUsers.body());
                   } else {
                       Log.e(TAG, gitHubUsers.code() + " " + gitHubUsers.message());
                   }
                   requestOnWay = false;
                   loadUser.setVisibility(View.INVISIBLE);
               }
           })
           .doOnError(throwable -> {
           if (throwable instanceof HttpException) {
               Response<?> response = ((HttpException) throwable).response();
                    Log.d(TAG, response.message());
           }
   }).subscribe();
   compositeDisposable.add(disposable);
   pagination.onNext(0);

concatMap

you take item emission and turn them into observables that get folded back into the stream.

RecyclerView Pagination

A common application feature automatically loads more items as the user scrolls through the items (infinite scroll). This is done by triggering a request for more data once the user crosses a threshold of remaining items before they’ve hit the end. RecyclerView Pagnation   It’s important to fetch data before the user gets to the end of the list. Adding a threshold value. therefore, helps anticipate the need to append more data.

Implementing with RecycleView

Every Adapter  has support for binding to the OnScrollListener events which are triggered whenever user scrolls. Using this, we can define a basic UserPagnation which supports most use cases by creating our own class that extends OnScrollListener.

abstract public class UserPagination extends RecyclerView.OnScrollListener {
    private LinearLayoutManager layoutManager;
    private int visibleThreshold = 5;
    private int currentPage = 0;
    private int previousTotalItemCount = 0;
    private boolean loading = true;
    private int startingPageIndex = 0;
    public UserPagination(RecyclerView.LayoutManager layoutManager) {
        this.layoutManager = (LinearLayoutManager) layoutManager;
    }
    @Override
    public void onScrolled(RecyclerView recyclerView, int dx, int dy) {
        super.onScrolled(recyclerView, dx, dy);
        int lastVisibleItemPosition = layoutManager.findLastVisibleItemPosition();
        int totalItemCount = layoutManager.getItemCount();
        if (totalItemCount < previousTotalItemCount) {
            this.currentPage = this.startingPageIndex;
            this.previousTotalItemCount = totalItemCount;
            if (totalItemCount == 0) {
                this.loading = true;
            }
        }
        if (loading && (totalItemCount > previousTotalItemCount)) {
            loading = false;
            previousTotalItemCount = totalItemCount;
        }
        if (!loading && (lastVisibleItemPosition + visibleThreshold) > totalItemCount) {
            currentPage++;
            onLoadMore(currentPage, totalItemCount, recyclerView);
            loading = true;
        }
    }
    abstract public void onLoadMore(int currentPage, int totalItemCount, View view);
}

This is an abstract class, and that in order to use this, you must extend this base class and define the onLoadMoremethod to actually retrieve the new data. For example:

recyclerView.addOnScrollListener(new UserPagination(layoutManager) {
       @Override
       public void onLoadMore(int currentPage, int totalItemCount, View view) {
              if (!requestOnWay) {
                    pagination.onNext(mAdapter.getLastVisibleItemId());
              }
        }
});

Download this project from GitHub.