RxJava Operators — Understanding Map, FlatMap, SwitchMap and ConcatMap

Today, we are going to learn a few important map operators of RxJava i.e Map, FlatMap, ConcatMap, and SwitchMap. This article summarises the usage of each operator, the difference between them, and use case scenarios that help you choose the best operator that fulfills your requirements.

If you are new to RxJava operators, Operators Introduction is a good place to get started.

In short, Map, FlatMap, ConcatMap, and SwitchMap applies a function or modifies the data emitted by an Observable.

  • Map modifies each item emitted by a source Observable and emits the modified item.
  • FlatMap, SwitchMap, and ConcatMap also apply a function on each emitted item but instead of returning the modified item, it returns the Observable itself which can emit data again.
  • FlatMap and ConcatMap work are pretty much the same. They merge items emitted by multiple Observables and returns a single Observable.
  • The difference between FlatMap and ConcatMap is the order in which the items are emitted.
  • FlatMap can interleave items while emitting i.e the emitted items order is not maintained.
  • ConcatMap preserves the order of items. But the main disadvantage of ConcatMap is, it has to wait for each Observable to complete its work thus asynchronous is not maintained.
  • SwitchMap is a bit different from FlatMap and ConcatMap. SwitchMap unsubscribes from the previous source Observable whenever new items started emitting, thus always emitting the items from current Observable.

Now, let’s see how each operator works with the help of an example.

1. Map( )

Map operator transforms each item emitted by an Observable and emits the modified item.

Let’s say we have an Observable that makes a network call (assume the network call is made) and emits the User objects with name and gender. But in our requirement, we need an email address to be present for each user, which is missing in the network response. Then we can alter each User object by applying Map() operation.

  • getUsersObservable() : assume this method is making a network call and fetching user objects. This returns an Observable that emits User objects with name and gender.
  • map() operator applies Function<User, User> on each User object and adds email address and returns the modified User object.

import io.reactivex.Observable;

import io.reactivex.ObservableEmitter;

import io.reactivex.ObservableOnSubscribe;

import io.reactivex.Observer;

import io.reactivex.android.schedulers.AndroidSchedulers;

import io.reactivex.disposables.Disposable;

import io.reactivex.functions.Function;

import io.reactivex.schedulers.Schedulers;

public class MapOperatorActivity extends AppCompatActivity {

private static final String TAG = MapOperatorActivity.class.getSimpleName();

private Disposable disposable;

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_map_operator);

getUsersObservable()

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.map(new Function<User, User>() {

@Override

public User apply(User user) throws Exception {

// modifying user object by adding email address

// turning user name to uppercase

user.setEmail(String.format("%s@rxjava.wtf", user.getName()));

user.setName(user.getName().toUpperCase());

return user;

}

})

.subscribe(new Observer<User>() {

@Override

public void onSubscribe(Disposable d) {

disposable = d;

}

@Override

public void onNext(User user) {

Log.e(TAG, "onNext: " + user.getName() + ", " + user.getGender() + ", " + user.getAddress().getAddress());

}

@Override

public void onError(Throwable e) {

Log.e(TAG, "onError: " + e.getMessage());

}

@Override

public void onComplete() {

Log.e(TAG, "All users emitted!");

}

});

}

/**

* Assume this method is making a network call and fetching Users

* an Observable that emits list of users

* each User has name and email, but missing email id

*/

private Observable<User> getUsersObservable() {

String[] names = new String[]{"mark", "john", "trump", "obama"};

final List<User> users = new ArrayList<>();

for (String name : names) {

User user = new User();

user.setName(name);

user.setGender("male");

users.add(user);

}

return Observable

.create(new ObservableOnSubscribe<User>() {

@Override

public void subscribe(ObservableEmitter<User> emitter) throws Exception {

for (User user : users) {

if (!emitter.isDisposed()) {

emitter.onNext(user);

}

}

if (!emitter.isDisposed()) {

emitter.onComplete();

}

}

}).subscribeOn(Schedulers.io());

}

@Override

protected void onDestroy() {

super.onDestroy();

disposable.dispose();

}

}

public class User {

String name;

String email;

String gender;

Address address;

// getters and setters

}

If you run the example, you can notice the email address added to each User. You can also notice that the name is modified to uppercase.

Output

onSubscribe

onNext: MARK, male, mark@rxjava.wtf

onNext: JOHN, male, john@rxjava.wtf

onNext: TRUMP, male, trump@rxjava.wtf

onNext: OBAMA, male, obama@rxjava.wtf

All users emitted!

2. FlatMap()

To better understand FlatMap, consider a scenario where you have a network call to fetch Users with name and gender. Then you have another network that gives you the address of each user. Now the requirement is to create an Observable that emits Users with name, gender, and address properties.

To achieve this, you need to get the users first, then make a separate network call for each user to fetch his address. This can be done easily using FlatMap operator.

  • getUsersObservable() : assume it makes a network call and returns an Observable that emits User (name and gender) objects.
  • getAddressObservable(): assume it makes another network call just to fetch user address. This also returns an Observable that emits User by adding address node to existing name and gender.
  • flatMap() operator makes getAddressObservable() call each time a User is emitted and returns an Observable that emits User including the address filed.
  • Finally, flatMap() returns an Observable by merging two Observables together.
  • Thread.sleep(sleepTime); added here to simulate network latency.

import android.os.Bundle;

import android.support.v7.app.AppCompatActivity;

import android.util.Log;

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import info.androidhive.rxandroidexamples.R;

import info.androidhive.rxandroidexamples.operators.model.Address;

import info.androidhive.rxandroidexamples.operators.model.User;

import io.reactivex.Observable;

import io.reactivex.ObservableEmitter;

import io.reactivex.ObservableOnSubscribe;

import io.reactivex.Observer;

import io.reactivex.android.schedulers.AndroidSchedulers;

import io.reactivex.disposables.Disposable;

import io.reactivex.functions.Function;

import io.reactivex.schedulers.Schedulers;

public class FlatMapActivity extends AppCompatActivity {

private static final String TAG = FlatMapActivity.class.getSimpleName();

private Disposable disposable;

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_flat_map);

getUsersObservable()

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.flatMap(new Function<User, Observable<User>>() {

@Override

public Observable<User> apply(User user) throws Exception {

// getting each user address by making another network call

return getAddressObservable(user);

}

})

.subscribe(new Observer<User>() {

@Override

public void onSubscribe(Disposable d) {

Log.e(TAG, "onSubscribe");

disposable = d;

}

@Override

public void onNext(User user) {

Log.e(TAG, "onNext: " + user.getName() + ", " + user.getGender() + ", " + user.getAddress().getAddress());

}

@Override

public void onError(Throwable e) {

}

@Override

public void onComplete() {

Log.e(TAG, "All users emitted!");

}

});

}

/**

* Assume this as a network call

* returns Users with address filed added

*/

private Observable<User> getAddressObservable(final User user) {

final String[] addresses = new String[]{

"1600 Amphitheatre Parkway, Mountain View, CA 94043",

"2300 Traverwood Dr. Ann Arbor, MI 48105",

"500 W 2nd St Suite 2900 Austin, TX 78701",

"355 Main Street Cambridge, MA 02142"

};

return Observable

.create(new ObservableOnSubscribe<User>() {

@Override

public void subscribe(ObservableEmitter<User> emitter) throws Exception {

Address address = new Address();

address.setAddress(addresses[new Random().nextInt(2) + 0]);

if (!emitter.isDisposed()) {

user.setAddress(address);

// Generate network latency of random duration

int sleepTime = new Random().nextInt(1000) + 500;

Thread.sleep(sleepTime);

emitter.onNext(user);

emitter.onComplete();

}

}

}).subscribeOn(Schedulers.io());

}

/**

* Assume this is a network call to fetch users

* returns Users with name and gender but missing address

*/

private Observable<User> getUsersObservable() {

String[] maleUsers = new String[]{"Mark", "John", "Trump", "Obama"};

final List<User> users = new ArrayList<>();

for (String name : maleUsers) {

User user = new User();

user.setName(name);

user.setGender("male");

users.add(user);

}

return Observable

.create(new ObservableOnSubscribe<User>() {

@Override

public void subscribe(ObservableEmitter<User> emitter) throws Exception {

for (User user : users) {

if (!emitter.isDisposed()) {

emitter.onNext(user);

}

}

if (!emitter.isDisposed()) {

emitter.onComplete();

}

}

}).subscribeOn(Schedulers.io());

}

@Override

protected void onDestroy() {

super.onDestroy();

disposable.dispose();

}

}

If you run this example you can see the output like below. Here, name and gender are fetched from one observable, and address is fetched from another observable. Also, notice that the order of items is not maintained as source observable. You can see the order changed each time you run this example.

Output

onSubscribe

onNext: John, male, 2300 Traverwood Dr. Ann Arbor, MI 48105

onNext: Obama, male, 2300 Traverwood Dr. Ann Arbor, MI 48105

onNext: Mark, male, 1600 Amphitheatre Parkway, Mountain View, CA 94043

onNext: Trump, male, 2300 Traverwood Dr. Ann Arbor, MI 48105

All users emitted!

3. ConcatMap()

Now consider the same example of FlatMap but replacing the operator with ConcatMap.

Technically both operators produce the same output but the sequence the data emitted changes.

  • ConcatMap() maintains the order of items and waits for the current Observable to complete its job before emitting the next one.
  • ConcatMap is more suitable when you want to maintain the order of execution.

import android.support.v7.app.AppCompatActivity;

import android.os.Bundle;

import android.util.Log;

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import info.androidhive.rxandroidexamples.R;

import info.androidhive.rxandroidexamples.operators.model.Address;

import info.androidhive.rxandroidexamples.operators.model.User;

import io.reactivex.Observable;

import io.reactivex.ObservableEmitter;

import io.reactivex.ObservableOnSubscribe;

import io.reactivex.Observer;

import io.reactivex.android.schedulers.AndroidSchedulers;

import io.reactivex.disposables.Disposable;

import io.reactivex.functions.Function;

import io.reactivex.schedulers.Schedulers;

public class ConcatMapOperatorActivity extends AppCompatActivity {

private static final String TAG = ConcatMapOperatorActivity.class.getSimpleName();

private Disposable disposable;

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_concat_map);

getUsersObservable()

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.concatMap(new Function<User, Observable<User>>() {

@Override

public Observable<User> apply(User user) throws Exception {

// getting each user address by making another network call

return getAddressObservable(user);

}

})

.subscribe(new Observer<User>() {

@Override

public void onSubscribe(Disposable d) {

Log.e(TAG, "onSubscribe");

disposable = d;

}

@Override

public void onNext(User user) {

Log.e(TAG, "onNext: " + user.getName() + ", " + user.getGender() + ", " + user.getAddress().getAddress());

}

@Override

public void onError(Throwable e) {

}

@Override

public void onComplete() {

Log.e(TAG, "All users emitted!");

}

});

}

/**

* Assume this as a network call

* returns Users with address filed added

*/

private Observable<User> getAddressObservable(final User user) {

final String[] addresses = new String[]{

"1600 Amphitheatre Parkway, Mountain View, CA 94043",

"2300 Traverwood Dr. Ann Arbor, MI 48105",

"500 W 2nd St Suite 2900 Austin, TX 78701",

"355 Main Street Cambridge, MA 02142"

};

return Observable

.create(new ObservableOnSubscribe<User>() {

@Override

public void subscribe(ObservableEmitter<User> emitter) throws Exception {

Address address = new Address();

address.setAddress(addresses[new Random().nextInt(2) + 0]);

if (!emitter.isDisposed()) {

user.setAddress(address);

// Generate network latency of random duration

int sleepTime = new Random().nextInt(1000) + 500;

Thread.sleep(sleepTime);

emitter.onNext(user);

emitter.onComplete();

}

}

}).subscribeOn(Schedulers.io());

}

/**

* Assume this is a network call to fetch users

* returns Users with name and gender but missing address

*/

private Observable<User> getUsersObservable() {

String[] maleUsers = new String[]{"Mark", "John", "Trump", "Obama"};

final List<User> users = new ArrayList<>();

for (String name : maleUsers) {

User user = new User();

user.setName(name);

user.setGender("male");

users.add(user);

}

return Observable

.create(new ObservableOnSubscribe<User>() {

@Override

public void subscribe(ObservableEmitter<User> emitter) throws Exception {

for (User user : users) {

if (!emitter.isDisposed()) {

emitter.onNext(user);

}

}

if (!emitter.isDisposed()) {

emitter.onComplete();

}

}

}).subscribeOn(Schedulers.io());

}

@Override

protected void onDestroy() {

super.onDestroy();

disposable.dispose();

}

}

If you run the example, you can see the order is maintained as source observable i.e Mark, John, Trump, Obama and it always maintains the same order.

Output

onSubscribe

onNext: Mark, male, 1600 Amphitheatre Parkway, Mountain View, CA 94043

onNext: John, male, 2300 Traverwood Dr. Ann Arbor, MI 48105

onNext: Trump, male, 2300 Traverwood Dr. Ann Arbor, MI 48105

onNext: Obama, male, 1600 Amphitheatre Parkway, Mountain View, CA 94043

All users emitted!

FlatMap & ConcatMap — Flight Tickets Listing

Flight Ticket Listing is a good example of FlatMap & ConcatMap operators when used with Retrofit networking calls.

4. SwitchMap()

SwithMap() on the other hand is completely a different operator from FlatMap and ConcatMap.

SwitchMap always returns the latest Observable and emits the items from it.

import android.support.v7.app.AppCompatActivity;

import android.os.Bundle;

import android.util.Log;

import java.util.concurrent.TimeUnit;

import info.androidhive.rxandroidexamples.R;

import io.reactivex.Observable;

import io.reactivex.ObservableSource;

import io.reactivex.Observer;

import io.reactivex.android.schedulers.AndroidSchedulers;

import io.reactivex.disposables.Disposable;

import io.reactivex.functions.Function;

import io.reactivex.schedulers.Schedulers;

public class SwitchMapOperatorActivity extends AppCompatActivity {

private static final String TAG = SwitchMapOperatorActivity.class.getSimpleName();

private Disposable disposable;

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_switch_map_operator);

Observable<Integer> integerObservable =

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5, 6});

// it always emits 6 as it un-subscribes the before observer

integerObservable

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.switchMap(new Function<Integer, ObservableSource<Integer>>() {

@Override

public ObservableSource<Integer> apply(Integer integer) throws Exception {

return Observable.just(integer).delay(1, TimeUnit.SECONDS);

}

})

.subscribe(new Observer<Integer>() {

@Override

public void onSubscribe(Disposable d) {

Log.d(TAG, "onSubscribe");

disposable = d;

}

@Override

public void onNext(Integer integer) {

Log.d(TAG, "onNext: " + integer);

}

@Override

public void onError(Throwable e) {

}

@Override

public void onComplete() {

Log.d(TAG, "All users emitted!");

}

});

}

@Override

protected void onDestroy() {

super.onDestroy();

disposable.dispose();

}

}

Output

onSubscribe

onNext: 6

All users emitted!

Choosing between Map operators

  • Consider using Map operator where there is offline operations needs to be done on emitted data. As explained in the article, we got something from the server but that doesn’t fulfill our requirement. In that case, Map can be used to alter the emitted data.
  • Choose FlatMap when the order is not important. Let’s say you are building an Airline Ticket Fair app that fetches the prices of each airline separately and display on the screen. For this both FlatMap and ConcatMap can be used. But if the order is not important and wants to send all the network calls simultaneously, I would consider FlatMap over ConcatMap. If you consider ConcatMap in this scenario, the time takes to fetch the prices takes a very long time as the ConcatMap won’t make simultaneous calls in order to maintain item order.
  • SwitchMap is best suited when you want to discard the response and consider the latest one. Let’s say you are writing an Instant Search app that sends a search query to the server each time user types something. In this case, multiple requests will be sent to the server with multiple queries, but we want to show the result of the latest typed query only. In this case, SwitchMap is the best operator to use.
  • Another use case of SwitchMap is, you have a feed screen in which feed is refreshed each time user performs pulldown to refresh. In this scenario, SwitchMap is best suited as it can ignore the older feed response and consider only the latest request.

Thanks for the reading ….

--

--

Senior Software Engineer | Android | Java | Kotlin |Xamarin Native Android |Flutter |Go

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Abhishek Srivastava

Abhishek Srivastava

165 Followers

Senior Software Engineer | Android | Java | Kotlin |Xamarin Native Android |Flutter |Go