By Subham Aggarwal | 7/19/2017 | General |Beginners

Meet RxJava in Android

Meet RxJava in Android

If you’re an active Android developer, chances are good you’ve heard of RxJava. It’s one of the most talked about libraries for enabling Reactive Programming in Android development. It’s regarded as the go-to framework for simplifying concurrency/asynchronous tasks inherent in mobile programming.

Why Consider RxJava?

At its core, RxJava simplifies development because it raises the level of abstraction around threading. In other words, as a developer you don’t have to be overly concerned about the details of how to perform operations that should occur on different threads. This is particularly attractive since threading is a challenge to get right and, if not correctly implemented, can cause some of the most problematic bugs to debug and fix.

 

Of course, this doesn’t mean RxJava is bulletproof when it comes to threading and it is still important to understand what’s happening behind the scenes; however, RxJava can definitely make things easier.

Let’s look at an example.

Network Call - RxJava vs AsyncTask

Imagine we want to get some data over the network and then update the UI as a result. One way to do this is to first create an inner AsyncTask subclass in our Activity/Fragment, then perform the network operation in the background, and finally take the result of that operation and update the UI in the main thread.

public class NetworkRequestTask extends AsyncTask<Void, Void, User> {

   private final int userId;

   public NetworkRequestTask(int userId) {
       this.userId = userId;
   }

   @Override protected User doInBackground(Void... params) {
       return networkService.getUser(userId);
   }

   @Override protected void onPostExecute(User user) {
       nameTextView.setText(user.getName());
       // ...set other views
   }
}
  
private void onButtonClicked(Button button) {
  new NetworkRequestTask(123).execute()
}

 

This approach may seem harmless, but it does have some issues and limitations. Primarily that memory/context leaks are easily created since NetworkRequestTask is an inner class and as such holds an implicit reference to the outer class. Also, what if we want to chain another long operation after the network call? We’d have to nest two AsyncTasks which can significantly reduce readability.

 

In contrast, an RxJava approach to performing a network call might look something like this:

private Subscription subscription;

private void onButtonClicked(Button button) {
  subscription = networkService.getObservableUser(123)
                     .subscribeOn(Schedulers.io())
                     .observeOn(AndroidSchedulers.mainThread())
                     .subscribe(new Action1<User>() {
                         @Override public void call(User user) {
                             nameTextView.setText(user.getName());
                             // ... set other views
                         }
                     });
}

 

Using this approach, we solve the problem (of potential memory leaks caused by a running thread holding a reference to the outer context) by keeping a reference to the returned Subscription object. This Subscription object is in turn tied to the Activity/Fragment object’s #onDestroy() method guaranteeing that the Action1#call operation does not execute when the Activity/Fragment needs to be destroyed.

 

Also, notice that that the return type of #getObservableUser(...) (i.e. an Observable<User>) is chained with further calls to it. Through this fluid API, we’re able to solve the second issue of using an AsyncTask which is that it allows further network call/long operation chaining. Pretty neat, huh?

 

Let’s dive deeper into some RxJava concepts.

Observable, Observer, and Operator - The 3 O’s of RxJava Core

In the RxJava world, everything can be modeled as streams. A stream emits an item or items over time, and each emission can be consumed/observed.

If you think about it, a stream is not a new concept: click events, location update, and push notifications can all be a stream, and so on.

 

The stream abstraction is implemented through 3 core constructs which we can call “the 3 O’s”; namely: the Observable, Observer, and the Operator. The Observable emits items (the stream); the Observer consumes those items. Emissions from Observable objects can further be modified, transformed, and manipulated by chaining Operator calls.

Observable

An Observable is the stream abstraction in RxJava. It is similar to an Iterator in that, given a sequence, it iterates through and produces those items in an orderly fashion. A consumer can then consume those items through the same interface, regardless of the underlying sequence.

 

Say we wanted to emit the numbers 1, 2, 3, in that order. To do so, we can use the Observable<T>#create(OnSubscribe<T>) method.

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
  @Override public void call(Subscriber<? super Integer> subscriber) {
      subscriber.onNext(1);
      subscriber.onNext(2);
      subscriber.onNext(3);
      subscriber.onCompleted();
  }
});

 

Invoking subscriber.onNext(Integer) emits an item in the stream and, when the stream is finished emitting, subscriber.onCompleted() is then invoked.

 

This approach to creating an Observable is fairly verbose. For this reason, there are convenience methods for creating Observable instances which should be preferred in almost all cases.

The simplest way to create an Observable is using Observable#just(...). As the method name suggests, it just emits the item(s) that you pass into it as method arguments.

Observable.just(1, 2, 3); // 1, 2, 3 will be emitted, respectively

Observer

The next component to the Observable stream is the Observer (or Observers) subscribed to it. Observers are notified whenever something “interesting” happens in the stream. Observers are notified via the following events:

  • Observer#onNext(T) - invoked when an item is emitted from the stream
  • Observable#onError(Throwable) - invoked when an error has occurred within the stream
  • Observable#onCompleted() - invoked when the stream is finished emitting items.

To subscribe to a stream, simply call Observable<T>#subscribe(...) and pass in an Observer instance.

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
  @Override public void call(Subscriber<? super Integer> subscriber) {
      subscriber.onNext(1);
      subscriber.onNext(2);
      subscriber.onNext(3);
      subscriber.onCompleted();
  }
});

 

The above code will emit the following in Logcat:

In onNext(): 1
In onNext(): 2
In onNext(): 3
In onNext(): 4
In onCompleted()

 

There may also be some instances where we are no longer interested in the emissions of an Observable. This is particularly relevant in Android when, for example, an Activity/Fragment needs to be reclaimed in memory.

 

To stop observing items, we simply need to call Subscription#unsubscribe() on the returned Subscription object.

Subscription subscription = someInfiniteObservable.subscribe(new Observer<Integer>() {
  @Override public void onCompleted() {
      // ...
  }

  @Override public void onError(Throwable e) {
      // ...
  }

  @Override public void onNext(Integer integer) {
      // ...
  }
});

// Call unsubscribe when appropriate
subscription.unsubscribe();

 

As seen in the code above, upon subscribing to an Observable, we hold the reference to the returned Subscription object and later invoke subscription#unsubscribe() when necessary. In Android, this is best invoked within Activity#onDestroy() or Fragment#onDestroy().

Operator

Items emitted by an Observable can be transformed, modified, and filtered through Operators before notifying the subscribed Observer object(s). Some of the most common operations found in functional programming (such as map, filter, reduce, etc.) can also be applied to an Observable stream. Let’s look at map as an example:

Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, Integer>() {
  @Override public Integer call(Integer integer) {
      return integer * 3;
  }
}).subscribe(new Observer<Integer>() {
  @Override public void onCompleted() {
      // ...
  }

  @Override public void onError(Throwable e) {
      // ...
  }

  @Override public void onNext(Integer integer) {
      // ...
  }
});

 

The code above will take each emission from the Observable and multiply each by 3, producing the stream 3, 6, 9, 12, 15, respectively. Applying an Operator typically returns another Observable as a result, which is convenient as this allows us to chain multiple operations to obtain a desired result.

Given the stream above, say we wanted to only receive even numbers. This can be achieved by chaining a filter operation.

Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, Integer>() {
  @Override public Integer call(Integer integer) {
      return integer * 3;
  }
}).filter(new Func1<Integer, Boolean>() {
  @Override public Boolean call(Integer integer) {
      return integer % 2 == 0;
  }
}).subscribe(new Observer<Integer>() {
  @Override public void onCompleted() {
      // ...
  }

  @Override public void onError(Throwable e) {
      // ...
  }

  @Override public void onNext(Integer integer) {
      // ...
  }
});

 

There are many operators built-in the RxJava toolset that modify the Observable stream; if you can think of a way to modify the stream, chances are, there’s an Operator for it. Unlike most technical documentation, reading the RxJava/ReactiveX docs is fairly simple and to-the-point. Each operator in the documentation comes along with a visualization on how the Operator affects the stream. These visualizations are called “marble diagrams.”

By Subham Aggarwal | 7/19/2017 | General

{{CommentsModel.TotalCount}} Comments

Your Comment

{{CommentsModel.Message}}

Recent Stories

Top DiscoverSDK Experts

User photo
3355
Ashton Torrence
Web and Windows developer
GUI | Web and 11 more
View Profile
User photo
3220
Mendy Bennett
Experienced with Ad network & Ad servers.
Mobile | Ad Networks and 1 more
View Profile
User photo
3060
Karen Fitzgerald
7 years in Cross-Platform development.
Mobile | Cross Platform Frameworks
View Profile
Show All
X

Compare Products

Select up to three two products to compare by clicking on the compare icon () of each product.

{{compareToolModel.Error}}

Now comparing:

{{product.ProductName | createSubstring:25}} X
Compare Now