RxJava Operators — Understanding Map, FlatMap, SwitchMap and ConcatMap
--
Today, we are going to learn a few important map operators of RxJava i.e Map, FlatMap, ConcatMap, and SwitchMap. This article summarises the usage of each operator, the difference between them, and use case scenarios that help you choose the best operator that fulfills your requirements.
If you are new to RxJava operators, Operators Introduction is a good place to get started.
In short, Map, FlatMap, ConcatMap, and SwitchMap applies a function or modifies the data emitted by an Observable.
- Map modifies each item emitted by a source Observable and emits the modified item.
- FlatMap, SwitchMap, and ConcatMap also apply a function on each emitted item but instead of returning the modified item, it returns the Observable itself which can emit data again.
- FlatMap and ConcatMap work are pretty much the same. They merge items emitted by multiple Observables and returns a single Observable.
- The difference between FlatMap and ConcatMap is the order in which the items are emitted.
- FlatMap can interleave items while emitting i.e the emitted items order is not maintained.
- ConcatMap preserves the order of items. But the main disadvantage of ConcatMap is, it has to wait for each Observable to complete its work thus asynchronous is not maintained.
- SwitchMap is a bit different from FlatMap and ConcatMap. SwitchMap unsubscribes from the previous source Observable whenever new items started emitting, thus always emitting the items from current Observable.
Now, let’s see how each operator works with the help of an example.
1. Map( )
Map operator transforms each item emitted by an Observable and emits the modified item.
Let’s say we have an Observable that makes a network call (assume the network call is made) and emits the User objects with name and gender. But in our requirement, we need an email address to be present for each user, which is missing in the network response. Then we can alter each User object by applying Map() operation.
- getUsersObservable() : assume this method is making a network call and fetching user objects. This returns an Observable that emits User objects with name and gender.
- map() operator applies Function<User, User> on each User object and adds email address and returns the modified User object.
import
io.reactivex.Observable;
import
io.reactivex.ObservableEmitter;
import
io.reactivex.ObservableOnSubscribe;
import
io.reactivex.Observer;
import
io.reactivex.android.schedulers.AndroidSchedulers;
import
io.reactivex.disposables.Disposable;
import
io.reactivex.functions.Function;
import
io.reactivex.schedulers.Schedulers;
public
class
MapOperatorActivity extends
AppCompatActivity {
private
static
final
String TAG = MapOperatorActivity.class.getSimpleName();
private
Disposable disposable;
@Override
protected
void
onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_map_operator);
getUsersObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new
Function<User, User>() {
@Override
public
User apply(User user) throws
Exception {
// modifying user object by adding email address
// turning user name to uppercase
user.setEmail(String.format("%s@rxjava.wtf", user.getName()));
user.setName(user.getName().toUpperCase());
return
user;
}
})
.subscribe(new
Observer<User>() {
@Override
public
void
onSubscribe(Disposable d) {
disposable = d;
}
@Override
public
void
onNext(User user) {
Log.e(TAG, "onNext: "
+ user.getName() + ", "
+ user.getGender() + ", "
+ user.getAddress().getAddress());
}
@Override
public
void
onError(Throwable e) {
Log.e(TAG, "onError: "
+ e.getMessage());
}
@Override
public
void
onComplete() {
Log.e(TAG, "All users emitted!");
}
});
}
/**
* Assume this method is making a network call and fetching Users
* an Observable that emits list of users
* each User has name and email, but missing email id
*/
private
Observable<User> getUsersObservable() {
String[] names = new
String[]{"mark", "john", "trump", "obama"};
final
List<User> users = new
ArrayList<>();
for
(String name : names) {
User user = new
User();
user.setName(name);
user.setGender("male");
users.add(user);
}
return
Observable
.create(new
ObservableOnSubscribe<User>() {
@Override
public
void
subscribe(ObservableEmitter<User> emitter) throws
Exception {
for
(User user : users) {
if
(!emitter.isDisposed()) {
emitter.onNext(user);
}
}
if
(!emitter.isDisposed()) {
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
@Override
protected
void
onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
public
class
User {
String name;
String email;
String gender;
Address address;
// getters and setters
}
If you run the example, you can notice the email address added to each User. You can also notice that the name is modified to uppercase.
Output
onSubscribe
onNext: MARK, male, mark@rxjava.wtf
onNext: JOHN, male, john@rxjava.wtf
onNext: TRUMP, male, trump@rxjava.wtf
onNext: OBAMA, male, obama@rxjava.wtf
All users emitted!
2. FlatMap()
To better understand FlatMap, consider a scenario where you have a network call to fetch Users with name and gender. Then you have another network that gives you the address of each user. Now the requirement is to create an Observable that emits Users with name, gender, and address properties.
To achieve this, you need to get the users first, then make a separate network call for each user to fetch his address. This can be done easily using FlatMap operator.
- getUsersObservable() : assume it makes a network call and returns an Observable that emits User (name and gender) objects.
- getAddressObservable(): assume it makes another network call just to fetch user address. This also returns an Observable that emits User by adding address node to existing name and gender.
- flatMap() operator makes getAddressObservable() call each time a User is emitted and returns an Observable that emits User including the address filed.
- Finally, flatMap() returns an Observable by merging two Observables together.
- Thread.sleep(sleepTime); added here to simulate network latency.
import
android.os.Bundle;
import
android.support.v7.app.AppCompatActivity;
import
android.util.Log;
import
java.util.ArrayList;
import
java.util.List;
import
java.util.Random;
import
info.androidhive.rxandroidexamples.R;
import
info.androidhive.rxandroidexamples.operators.model.Address;
import
info.androidhive.rxandroidexamples.operators.model.User;
import
io.reactivex.Observable;
import
io.reactivex.ObservableEmitter;
import
io.reactivex.ObservableOnSubscribe;
import
io.reactivex.Observer;
import
io.reactivex.android.schedulers.AndroidSchedulers;
import
io.reactivex.disposables.Disposable;
import
io.reactivex.functions.Function;
import
io.reactivex.schedulers.Schedulers;
public
class
FlatMapActivity extends
AppCompatActivity {
private
static
final
String TAG = FlatMapActivity.class.getSimpleName();
private
Disposable disposable;
@Override
protected
void
onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_flat_map);
getUsersObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new
Function<User, Observable<User>>() {
@Override
public
Observable<User> apply(User user) throws
Exception {
// getting each user address by making another network call
return
getAddressObservable(user);
}
})
.subscribe(new
Observer<User>() {
@Override
public
void
onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
disposable = d;
}
@Override
public
void
onNext(User user) {
Log.e(TAG, "onNext: "
+ user.getName() + ", "
+ user.getGender() + ", "
+ user.getAddress().getAddress());
}
@Override
public
void
onError(Throwable e) {
}
@Override
public
void
onComplete() {
Log.e(TAG, "All users emitted!");
}
});
}
/**
* Assume this as a network call
* returns Users with address filed added
*/
private
Observable<User> getAddressObservable(final
User user) {
final
String[] addresses = new
String[]{
"1600 Amphitheatre Parkway, Mountain View, CA 94043",
"2300 Traverwood Dr. Ann Arbor, MI 48105",
"500 W 2nd St Suite 2900 Austin, TX 78701",
"355 Main Street Cambridge, MA 02142"
};
return
Observable
.create(new
ObservableOnSubscribe<User>() {
@Override
public
void
subscribe(ObservableEmitter<User> emitter) throws
Exception {
Address address = new
Address();
address.setAddress(addresses[new
Random().nextInt(2) + 0]);
if
(!emitter.isDisposed()) {
user.setAddress(address);
// Generate network latency of random duration
int
sleepTime = new
Random().nextInt(1000) + 500;
Thread.sleep(sleepTime);
emitter.onNext(user);
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
/**
* Assume this is a network call to fetch users
* returns Users with name and gender but missing address
*/
private
Observable<User> getUsersObservable() {
String[] maleUsers = new
String[]{"Mark", "John", "Trump", "Obama"};
final
List<User> users = new
ArrayList<>();
for
(String name : maleUsers) {
User user = new
User();
user.setName(name);
user.setGender("male");
users.add(user);
}
return
Observable
.create(new
ObservableOnSubscribe<User>() {
@Override
public
void
subscribe(ObservableEmitter<User> emitter) throws
Exception {
for
(User user : users) {
if
(!emitter.isDisposed()) {
emitter.onNext(user);
}
}
if
(!emitter.isDisposed()) {
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
@Override
protected
void
onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
If you run this example you can see the output like below. Here, name and gender are fetched from one observable, and address is fetched from another observable. Also, notice that the order of items is not maintained as source observable. You can see the order changed each time you run this example.
Output
onSubscribe
onNext: John, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
onNext: Obama, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
onNext: Mark, male, 1600 Amphitheatre Parkway, Mountain View, CA 94043
onNext: Trump, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
All users emitted!
3. ConcatMap()
Now consider the same example of FlatMap but replacing the operator with ConcatMap.
Technically both operators produce the same output but the sequence the data emitted changes.
- ConcatMap() maintains the order of items and waits for the current Observable to complete its job before emitting the next one.
- ConcatMap is more suitable when you want to maintain the order of execution.
import
android.support.v7.app.AppCompatActivity;
import
android.os.Bundle;
import
android.util.Log;
import
java.util.ArrayList;
import
java.util.List;
import
java.util.Random;
import
info.androidhive.rxandroidexamples.R;
import
info.androidhive.rxandroidexamples.operators.model.Address;
import
info.androidhive.rxandroidexamples.operators.model.User;
import
io.reactivex.Observable;
import
io.reactivex.ObservableEmitter;
import
io.reactivex.ObservableOnSubscribe;
import
io.reactivex.Observer;
import
io.reactivex.android.schedulers.AndroidSchedulers;
import
io.reactivex.disposables.Disposable;
import
io.reactivex.functions.Function;
import
io.reactivex.schedulers.Schedulers;
public
class
ConcatMapOperatorActivity extends
AppCompatActivity {
private
static
final
String TAG = ConcatMapOperatorActivity.class.getSimpleName();
private
Disposable disposable;
@Override
protected
void
onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_concat_map);
getUsersObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(new
Function<User, Observable<User>>() {
@Override
public
Observable<User> apply(User user) throws
Exception {
// getting each user address by making another network call
return
getAddressObservable(user);
}
})
.subscribe(new
Observer<User>() {
@Override
public
void
onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
disposable = d;
}
@Override
public
void
onNext(User user) {
Log.e(TAG, "onNext: "
+ user.getName() + ", "
+ user.getGender() + ", "
+ user.getAddress().getAddress());
}
@Override
public
void
onError(Throwable e) {
}
@Override
public
void
onComplete() {
Log.e(TAG, "All users emitted!");
}
});
}
/**
* Assume this as a network call
* returns Users with address filed added
*/
private
Observable<User> getAddressObservable(final
User user) {
final
String[] addresses = new
String[]{
"1600 Amphitheatre Parkway, Mountain View, CA 94043",
"2300 Traverwood Dr. Ann Arbor, MI 48105",
"500 W 2nd St Suite 2900 Austin, TX 78701",
"355 Main Street Cambridge, MA 02142"
};
return
Observable
.create(new
ObservableOnSubscribe<User>() {
@Override
public
void
subscribe(ObservableEmitter<User> emitter) throws
Exception {
Address address = new
Address();
address.setAddress(addresses[new
Random().nextInt(2) + 0]);
if
(!emitter.isDisposed()) {
user.setAddress(address);
// Generate network latency of random duration
int
sleepTime = new
Random().nextInt(1000) + 500;
Thread.sleep(sleepTime);
emitter.onNext(user);
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
/**
* Assume this is a network call to fetch users
* returns Users with name and gender but missing address
*/
private
Observable<User> getUsersObservable() {
String[] maleUsers = new
String[]{"Mark", "John", "Trump", "Obama"};
final
List<User> users = new
ArrayList<>();
for
(String name : maleUsers) {
User user = new
User();
user.setName(name);
user.setGender("male");
users.add(user);
}
return
Observable
.create(new
ObservableOnSubscribe<User>() {
@Override
public
void
subscribe(ObservableEmitter<User> emitter) throws
Exception {
for
(User user : users) {
if
(!emitter.isDisposed()) {
emitter.onNext(user);
}
}
if
(!emitter.isDisposed()) {
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
@Override
protected
void
onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
If you run the example, you can see the order is maintained as source observable i.e Mark, John, Trump, Obama and it always maintains the same order.
Output
onSubscribe
onNext: Mark, male, 1600 Amphitheatre Parkway, Mountain View, CA 94043
onNext: John, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
onNext: Trump, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
onNext: Obama, male, 1600 Amphitheatre Parkway, Mountain View, CA 94043
All users emitted!
FlatMap & ConcatMap — Flight Tickets Listing
Flight Ticket Listing is a good example of FlatMap & ConcatMap operators when used with Retrofit networking calls.
4. SwitchMap()
SwithMap() on the other hand is completely a different operator from FlatMap and ConcatMap.
SwitchMap always returns the latest Observable and emits the items from it.
import
android.support.v7.app.AppCompatActivity;
import
android.os.Bundle;
import
android.util.Log;
import
java.util.concurrent.TimeUnit;
import
info.androidhive.rxandroidexamples.R;
import
io.reactivex.Observable;
import
io.reactivex.ObservableSource;
import
io.reactivex.Observer;
import
io.reactivex.android.schedulers.AndroidSchedulers;
import
io.reactivex.disposables.Disposable;
import
io.reactivex.functions.Function;
import
io.reactivex.schedulers.Schedulers;
public
class
SwitchMapOperatorActivity extends
AppCompatActivity {
private
static
final
String TAG = SwitchMapOperatorActivity.class.getSimpleName();
private
Disposable disposable;
@Override
protected
void
onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_switch_map_operator);
Observable<Integer> integerObservable =
Observable.fromArray(new
Integer[]{1, 2, 3, 4, 5, 6});
// it always emits 6 as it un-subscribes the before observer
integerObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.switchMap(new
Function<Integer, ObservableSource<Integer>>() {
@Override
public
ObservableSource<Integer> apply(Integer integer) throws
Exception {
return
Observable.just(integer).delay(1, TimeUnit.SECONDS);
}
})
.subscribe(new
Observer<Integer>() {
@Override
public
void
onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
disposable = d;
}
@Override
public
void
onNext(Integer integer) {
Log.d(TAG, "onNext: "
+ integer);
}
@Override
public
void
onError(Throwable e) {
}
@Override
public
void
onComplete() {
Log.d(TAG, "All users emitted!");
}
});
}
@Override
protected
void
onDestroy() {
super.onDestroy();
disposable.dispose();
}
}
Output
onSubscribe
onNext: 6
All users emitted!
Choosing between Map operators
- Consider using Map operator where there is offline operations needs to be done on emitted data. As explained in the article, we got something from the server but that doesn’t fulfill our requirement. In that case, Map can be used to alter the emitted data.
- Choose FlatMap when the order is not important. Let’s say you are building an Airline Ticket Fair app that fetches the prices of each airline separately and display on the screen. For this both FlatMap and ConcatMap can be used. But if the order is not important and wants to send all the network calls simultaneously, I would consider FlatMap over ConcatMap. If you consider ConcatMap in this scenario, the time takes to fetch the prices takes a very long time as the ConcatMap won’t make simultaneous calls in order to maintain item order.
- SwitchMap is best suited when you want to discard the response and consider the latest one. Let’s say you are writing an Instant Search app that sends a search query to the server each time user types something. In this case, multiple requests will be sent to the server with multiple queries, but we want to show the result of the latest typed query only. In this case, SwitchMap is the best operator to use.
- Another use case of SwitchMap is, you have a feed screen in which feed is refreshed each time user performs pulldown to refresh. In this scenario, SwitchMap is best suited as it can ignore the older feed response and consider only the latest request.
Thanks for the reading ….