ReactiveX – asynchronous programming done right?
- Processes, standards and quality
- Technologies
- Others
Today applications need to be more and more asynchronous and process more information in parallel. Every programming language has its own way of dealing with asynchronous programming. Some are designed for such purposes (like Erlang) but the others, more popular ones, are not. Is there no hope for Java, .Net or JavaScript developers? I think there is.
Asynchronicity – current state
Nowadays it is not uncommon for a processor to have multiple cores – even our smartphones have such processors (not to mention those used on servers and in data centers). So, developers want to use them all to perform multiple actions in parallel. Also, front-end developers deal with a lot of events that drive their applications, such as mouse clicks, keyboard typing, push messages from servers, etc.
The first problem with mainstream programming languages it that they all have different APIs for dealing with asynchronicity. So, we have Thread in Java, async/await in .NET, callbacks, Promise in JavaScript, and so on.
Different APIs haven’t solved other difficulties that developers had and those are common to many languages. There is no easy way to handle exceptions on multiple threads (simple try/catch can’t help us here).
Dealing with events and defining callbacks for them usually leads to callback hell (that’s what JavaScript is famous for) and code that is hard to maintain and understand. And what if we want to combine different events with push messages, so that they can work together and react to them in a proper way? All of us (no matter what our favorite language is) know how hard it is to design and implement such a scenario.
ReactiveX – the answer to current needs
An API for asynchronous programming with observable streams
On their main page the authors of ReactiveX tell us that it is exactly what we need: API for asynchronous programming. However, they add one thing that is not so obvious for someone who hasn’t heard about ReactiveX before – observable streams. They are the heart of ReactiveX and the very thing that makes it so powerful. Let’s learn more about them.
Observable streams
Observable stream is just a simple stream of data but… with special powers. It can emit three things:
- an element
- an error
- a termination state
The element can be of any type – from simple int value to complicated objects. For example, we can have a stream containing subsequent numbers or a stream containing rows from database query results. The error is emitted when something bad happens while creating the stream elements and the termination state is emitted when there are no more elements left to return. We can clearly see similarities to the Iterator pattern here.
Such a stream can be observed (the API calls it subscribing) to capture those events and consume them – this reminds us of the Observer pattern. That’s nothing new, we all know that. But where are those special forces then? Once again, let’s hear from the authors:
ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming
There’s one more thing in ReactiveX besides Iterator and Observer patterns – functional programming. Observable streams in ReactiveX are like functions in functional programming – one stream (or more) can be an input to another one – they can be combined or their elements can be filtered/mapped. This ability to combine multiple streams into one, to filter them, etc. is what gives ReactiveX so much power.
Let’s take, for example, two streams from which we would like to create another stream, containing pairs of corresponding elements from both of them. This can be any data (e.g. from two different database queries).
It is only a matter of using one operator, which takes both streams and creates the one that we need. Both input streams can emit elements in parallel and the zip operator takes care of waiting for next corresponding element in each stream. This really simplifies things, as we all know how complicated it is to implement such scenario by hand, even for only two input streams.
There are a lot of predefined operators: from simple ones (such as map, filter, skip, average), to more complex and even language specific ones (we will see them later in action). They can be found on ReactiveX page and some of them are explained using interactive diagrams.
Real life example
Let’s take a look at some code and the example that is closer to our daily work. It shows multiple operators chained together to simplify the implementation of a common feature, with only a couple lines of code.
The task is to optimise the number of requests that are made to retrieve search suggestions from the server. It is clear that making a request each time the user types a letter is not very efficient.
var keyups = Rx.Observable.fromEvent($suggestionInput, 'keyup');
First of all, we are creating an Observable object (which represents the observable stream described before) from keyup events on some text input field. There are a lot of constructors which let us create observable objects from different things.
var plucked = keyups.pluck('target', 'value');
Next, we use pluck operator which emits value of a specified property (target.value in our example) for each element of the source stream (keyup events in our example). This gives us a stream with values from our text input, after each keyup. Making a request for each such value will generate a lot of unnecessary traffic, as it yields even very short values and can emit multiple same values when using arrow keys in suggestion input. Therefore, we filter the stream and allow only values which are longer than two:
var filtered = plucked.filter(text => text.length > 2);
But this does not solve our problem with repeated values while using „arrow keys”, so lets do something to prevent that:
var debounced = filtered.debounce(300);
The debounce operator emits an item from the source stream (in our example these are the values longer than two characters) after 300 milliseconds have passed, without the source stream emitting any item. So, we let the user type whatever he likes in the input field and only emit an element 300 milliseconds after he stops typing. This significantly reduces the number of elements, but still allows for two same elements to be emitted subsequently, so we need one more operator:
var distinct = debounced.distinctUntilChanged();
The name of the distinctUntilChanged operator speaks for itself and it indeed transforms the source stream into the stream that doesn’t emit two same elements next to each other.
In the five lines of code we have a non-trivial stream of user-typed values. How is this stream useful for us? What can we do with it?
As already mentioned before, we can observe a stream and react to the values it emits. To observe a stream we need to subscribe to it.
distinct.subscribe(
function(element) {
console.log('Next element: ' + element);
},
function(error) {
console.log('Error: ' + error);
},
function() {
console.log('Stream completed');
}
);
The subscribe method takes three functions as parameters. The first one is called each time a new element is emitted by the observed stream. This is the place where we can react to stream’s elements. The second function is called when the stream emits an error element and gives us the ability to handle that error. Finally, the third function is called when the stream has no more elements to emit. This is not the case for our stream of user events, as this one is infinite. If we don’t need the completed handler, or the error handler, then we can just omit those parameters in the subscribe method.
Besides observing a stream, we can do something else with it. As we remember, we can combine two streams together. Let’s see how our stream of user-typed values can be combined with another one.
As I’ve already said, we want to retrieve search suggestions from the server, while the user is typing in our input field. One possible solution would be to make a HTTP request to our server in the subscribe method each time the user stops entering a new value. But we can do it in a more reactive way.
There is an operator called flatMap, which projects each element of source stream into an observable stream and flattens the sequence of resulting streams into one. Given the fact that RxJS can automatically convert Promise into an observable stream, this allows us to create a stream of search suggestions that emits a new value (a list of suggestions) each time the user stops typing a string longer than two characters and different than the previous one:
var suggestions = distinct.flatMap(value => $.ajax({
url: 'http://example.com/suggestions?v=' + value,
dataType: 'json'
}).promise());
We can now subscribe to the suggestions stream and show them to the user:
suggestions.subscribe(
function(listOfSuggestions) {
$suggestionsContainer
.empty()
.append($.map(listOfSuggestions, s => $('').text(s)));
});
This code simply clears the suggestions container and appends each suggestion from the list returned by the server as a new <li> element (using jQuery map function).
Finally, let’s look at the whole example with method chaining to see how simple, short and convenient it is:
Rx.Observable.fromEvent($suggestionInput, 'keyup')
.pluck('target', 'value')
.filter(text => text.length > 2)
.debounce(300)
.distinctUntilChanged()
.flatMap(value => $.ajax({
url: 'http://example.com/suggestions?v=' + value,
dataType: 'json'
}).promise())
.subscribe(
function(listOfSuggestions) {
$suggestionsContainer
.empty()
.append($.map(listOfSuggestions, s => $('
').text(s))); });
Cross-language
ReactiveX is not only for JavaScript. It is for many languages and in each of them it has a similar API and the same core operators. I want to show one more example, this time in Java, to prove that:
String[] usernames = usersRepository.getUsernamesForPart(usernamePart); Observable users = Observable.from(usernames) .flatMap(name -> getUserObservable(webService, name));
In this code fragment we retrieve an array of user names for a given user name part (it can be a user-typed value from a search box to find users). We create an observable from it (notice that naming is the same as in JavaScript) and use the already known flatMap operator. We only need to know what exactly the getUserObservable() method does:
private Observable getUserObservable(WebService webService, String username) {
Observable fullnameObservable = webService
.getUserFullName(username)
.subscribeOn(Schedulers.io());
Observable positionObservable = webService
.getUserPosition(username)
.subscribeOn(Schedulers.io());
return fullnameObservable.zipWith(positionObservable,
(fullname, userPosition) -> new User(username, fullname, userPosition));
}
It makes two requests to the webService (which already returns them as observables, enabling us to use all the ReactiveX magic with them). The subscribeOn() method basically says where the execution of that observable should take place and in this example both requests are made on separate threads, thus run in parallel.
Finally, we use the zip operator we saw at the begging on a graphical diagram, to combine the results of those two requests into one object, once both requests are completed.
So, the users observable now emits the User object every time it manages to retrieve user details for each user from the input array.
Summary
In this article I showed you only a little part of what ReactiveX can do. But I hope that even such a small part with the fundamental ideas that drive ReactiveX gives you hope that asynchronous programming can be simple and manageable again, even in mainstream languages, such as Java, C# or JavaScript.
ReactiveX is also another example of how ideas from functional programming can be used to make imperative languages easier and more powerful.