RxConfusions - Part 1 : Map and FlatMap
If you don't know what RxJava is or you aren't comfortable with the terms Observable
, Observer
, Operator
and Subscriber
, I suggest you have a look at Grokking RxJava series by Dan Lew.
If you have surfed through some RxJava content on the web, you should have come accross the map()
operator. It is exactly what you think it is. But you might have seen people using the flatMap()
operator in similar senarios which might have confused you a bit. Let's use them both in a small example and observe the effects of these operators.
Let's use this simple Observer
for logging
public void onCompleted() {
System.out.println("onCompleted");
}
public void onError(Throwable e) {
System.out.printf("onError: %s\n", e);
}
public void onNext(String s) {
System.out.printf("onNext: %s\n", s);
}
Simple use of map operator
public static void main(String[] args) {
Printer printer = new Printer();
String[] names = new String[] {"John", "Samuel", "Micheal", "Tom"};
printer.printTitlesUsingMap(names);
}
private void printTitlesUsingMap(String[] names) {
Observable.from(names)
.map(Printer::getTitle)
.subscribe(this);
}
private static String getTitle(String name) {
return String.format("%s - %d", name, name.length());
}
Output :
onNext: John - 4
onNext: Micheal - 7
onNext: Tom - 3
onNext: Samuel - 6
onCompleted
Lets make it a little interesting
private static String getTitle(String name) throws IllegalArgumentException{
+ if("Micheal".equals(name)) {
+ throw new IllegalArgumentException("Too many Micheals");
+ }
return String.format("%s - %d", name, name.length());
}
Output :
onNext: John - 4
onNext: Samuel - 6
onError: java.lang.IllegalArgumentException: Too many Micheals
Even though there are too many Micheals, you don't want to stop printing once a Micheal arrives. You might be thinking lets use filter()
to filter out Micheal
but let us assume the printer doesn't know when getTitle()
fails. Operators like map()
can't change the flow items, they just modify the items which are emitted. This is when operators like flatMap()
come in handly
Enter the FlatMap
private void printTitlesUsingFlatMap(String[] names) {
Observable.from(names)
.flatMap(Printer::getTitleObservable)
.subscribe(this);
}
private static Observable<String> getTitleObservable(String name) {
return Observable.create(subscriber -> {
if(!subscriber.isUnsubscribed()) {
+ try {
+ subscriber.onNext(getTitle(name));
+ } catch (IllegalArgumentException e) {
+ //do nothing
+ }
+ subscriber.onCompleted();
}
});
}
Notice that we are calling subscriber.onComplete()
for every name. Guess the output... guessed it? Now have a look
Output :
onNext: John - 4
onNext: Samuel - 6
onNext: Tom - 3
onCompleted
Is that what you expected? Probably not. Now have a look at what happens if don't call subscriber.onComplete()
when there is an exception.
return Observable.create(subscriber -> {
if(!subscriber.isUnsubscribed()) {
try {
subscriber.onNext(getTitle(name));
+ subscriber.onCompleted();
} catch (IllegalArgumentException e) {
//do nothing
}
}
});
Make a guess before you look at the output.
onNext: John - 4
onNext: Samuel - 6
onNext: Tom - 3
No onComplete()
at all.
Now make a guess who is the subscriber
to our getTitleObservable()
function.
The
flatMap()
operator is our subscriber
If you are thiking - 'Wait, flatMap()
is an Operator not a Subscriber', you should watch Demistifying RxJava Subscribers by Jake Wharton. Basically, everything is a subscriber.
FlatMap collects all the observables returned for each item in the stream and emits all the items emitted by those observables. Simply put, it is a merge of all observables returned by your mapping function.
It calls onComplete()
only when all the observables are completed.
In this example getTitleObservale("Micheal")
wasn't completed. So FlatMap is waiting for it to complete(Yes, it is in the memory).
Visualise what happened in the above example. Now think about the map()
operator. If you feel like - 'They are two very different operators', then I have done my job. If you don't, try confusing me as well.
I am no expert in this topic. Just sharing as I learn. Please drop me a tweet or a comment if I got anything wrong. And share it to people who you think will find this useful.
EDIT
Artem Zinnatullin's post suggests a better way to for our getTitleObservable().
return Observable.fromCallable(() -> getTitle(name));
By using this method, you don't need to worry about calling the right functions to the subscriber. Thanks to pakoito for pointing that out.