Many people already using RxJava in their application for reactive programming. The context of this blog is the little bit test of What RxJava, and also show you real-life examples. It’s impossible to tell you everything about RxJava in this one blog.
Typical Code
// simple code public List<User> getUsers() throws Exception {} // Asynchronous callback public void getUsers(UserCallback callback) {} // Asynchronous execution with callback on main thread public AsyncTask<Void, Void, Article> fetchUser() {}
In above code, User ‘s data come from the network or database.Something that you can’t really synchronously call to because of the block main thread.In that case, you can resort to a callback interface, but the callback interface has a couple of problems:
- It’s hard to control lifecycle.
- You can’t generally cancel it.
- It’s hard to change multiple callbacks.(if you have to do multiple things you have to coordinate all these callbacks)
- There’s no real standard way to do an error handling.
- Then there is also the question of what thread is this callback returning? You really want to return the original to the main thread usually.
AsyncTask is sort of solution to that is still not great because there is still the problem of changing and there’s still the problem of error handling.Error handling with AsyncTask is kind of ugly. Let’s turn it to reactive. We want the user to become a stream of users. So, anytime users updated it’s just going to notify us that there’s a new value.
@GET("/users") Flowable<Response<List<User>>> getUser(@Query("since") int since, @Query("per_page") int perPage);
When we apply RxJava to this problem, we create an observable that returns us, users. Once you invoke this method, nothing is actually executed. It’s setting up a stream where once you subscribe to it, it will start giving you these user objects once you execute, or subscribe to this code. An observable also has built-in error handling. You can cancel it, it can be either synchronous or asynchronous.Error handling is part of the stream and it propagates through the stream automatically there’s specialize call.Whenever data changed it gets pushed into you.You never have to pull it out.
Subscription s = getUser().subscribe(new Subscriber<User>() { @Override public void onNext(User user) { // this can be called multiple times } @Override public void onError(Throwable e) { // no more events, and automatically unsubscribed } @Override public void onCompleted() { // no more events, and automatically unsubscribed } });
Unsubscribe
s.unsubscribe();
If you are done with a stream, you want to unsubscribe. For example, if you have a fragment or an activity, you’re dealing with lifecycle, you want to cancel the subscription. You want to cancel the observable emitting events. If you don’t unsubscribe, you might leak memory.
RecyclerView Pagination using Rxjava
A common application feature is to load automatically more items as the user scrolls through the items. This is done by triggering a request for more data once the user crosses a threshold of remaining items before they’ve hit the end.
Define a dependency to RxJava 2.0 and retrofit
android { .... //for lambda expression compileOptions { sourceCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8 } ... } ext { retrofitVersion = "2.2.0" } dependencies { ... compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxjava:2.0.9' //An Adapter for adapting RxJava 2.x types. compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0' compile "com.squareup.retrofit2:retrofit:${retrofitVersion}" compile "com.squareup.retrofit2:converter-gson:${retrofitVersion}" compile 'com.github.bumptech.glide:glide:3.7.0' testCompile 'junit:junit:4.12' compile 'com.jakewharton:butterknife:8.5.1' annotationProcessor 'com.jakewharton:butterknife-compiler:8.5.1' }
Our application to access the network we need to declare the INTERNET permission in the Android manifest file.
<uses-permission android:name="android.permission.INTERNET" />
We have the following User model class:
public class User implements Parcelable { public int id; public String login; public String avatar_url; }
Observing Users
Observe the task’s result or outcome on the main thread. You would declare you’re Flowable
to be observed on the main thread.
public interface GitHubApi { //https://api.github.com/users?since=0&per_page=10 @GET("/users") Flowable<Response<List<User>>> getUser (@Query("since") int since, @Query("per_page") int perPage); }
Now, to use this interface during runtime, we’ll need to build a Retrofit object
public static GitHubApi createGitHubService() { Retrofit.Builder builder = new Retrofit.Builder() .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .addConverterFactory(GsonConverterFactory.create()) .baseUrl("https://api.github.com"); return builder.build().create(GitHubApi.class); }
CompositeDisposable
is a disposable container that can hold onto multiple other disposables and offers add and removal disposable.
private CompositeDisposable compositeDisposable; @Override protected void onCreate(Bundle savedInstanceState) { ... compositeDisposable = new CompositeDisposable(); ... } @Override protected void onDestroy() { super.onDestroy(); //prevent memory leak compositeDisposable.dispose(); }
PublishProcessor
PublishProcessor can’t be instantiated through new
but must be created via the create()
method. Example usage.
private PublishProcessor<Integer> pagination = PublishProcessor.create();
Flowable The difference from Observable is that the function of Backpressure is available.
private Flowable<Response<List<User>>> getUserList(int fromId) { return gitHubApi.getUser(fromId, Constants.PAGE_LIMIT) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
Flowable creates a user stream that informs data every time it is subscribed.
Disposable disposable = pagination.onBackpressureDrop() .doOnNext(integer -> { requestOnWay = true; loadUser.setVisibility(View.VISIBLE); }) .concatMap(new Function<Integer, Publisher<Response<List<User>>>>() { @Override public Publisher<Response<List<User>>> apply(Integer fromId) throws Exception { return getUserList(fromId); } }) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Response<List<User>>>() { @Override public void accept(Response<List<User>> gitHubUsers) throws Exception { if (gitHubUsers.isSuccessful()) { mAdapter.setUsers(gitHubUsers.body()); } else { Log.e(TAG, gitHubUsers.code() + " " + gitHubUsers.message()); } requestOnWay = false; loadUser.setVisibility(View.INVISIBLE); } }) .doOnError(throwable -> { if (throwable instanceof HttpException) { Response<?> response = ((HttpException) throwable).response(); Log.d(TAG, response.message()); } }).subscribe(); compositeDisposable.add(disposable); pagination.onNext(0);
concatMap
you take item emission and turn them into observables that get folded back into the stream.
RecyclerView Pagination
A common application feature automatically loads more items as the user scrolls through the items (infinite scroll). This is done by triggering a request for more data once the user crosses a threshold of remaining items before they’ve hit the end. It’s important to fetch data before the user gets to the end of the list. Adding a threshold value. therefore, helps anticipate the need to append more data.
Implementing with RecycleView
Every Adapter
has support for binding to the OnScrollListener
events which are triggered whenever user scrolls. Using this, we can define a basic UserPagnation
which supports most use cases by creating our own class that extends OnScrollListener.
abstract public class UserPagination extends RecyclerView.OnScrollListener { private LinearLayoutManager layoutManager; private int visibleThreshold = 5; private int currentPage = 0; private int previousTotalItemCount = 0; private boolean loading = true; private int startingPageIndex = 0; public UserPagination(RecyclerView.LayoutManager layoutManager) { this.layoutManager = (LinearLayoutManager) layoutManager; } @Override public void onScrolled(RecyclerView recyclerView, int dx, int dy) { super.onScrolled(recyclerView, dx, dy); int lastVisibleItemPosition = layoutManager.findLastVisibleItemPosition(); int totalItemCount = layoutManager.getItemCount(); if (totalItemCount < previousTotalItemCount) { this.currentPage = this.startingPageIndex; this.previousTotalItemCount = totalItemCount; if (totalItemCount == 0) { this.loading = true; } } if (loading && (totalItemCount > previousTotalItemCount)) { loading = false; previousTotalItemCount = totalItemCount; } if (!loading && (lastVisibleItemPosition + visibleThreshold) > totalItemCount) { currentPage++; onLoadMore(currentPage, totalItemCount, recyclerView); loading = true; } } abstract public void onLoadMore(int currentPage, int totalItemCount, View view); }
This is an abstract class, and that in order to use this, you must extend this base class and define the onLoadMore
method to actually retrieve the new data. For example:
recyclerView.addOnScrollListener(new UserPagination(layoutManager) { @Override public void onLoadMore(int currentPage, int totalItemCount, View view) { if (!requestOnWay) { pagination.onNext(mAdapter.getLastVisibleItemId()); } } });