RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects), and operators inspired by Array#extras (map, filter, reduce, every, etc.) to allow handling asynchronous events as collections.
ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.
The essential concepts in RxJS which solve async event management are as follows.
Schedulers: are centralised dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g. setTimeout or requestAnimationFrame or others.
The first call for sure is to go to RxJS main web page:
The documentation is very detailed and well presented, most of the concepts are covered deeply, which is great but sometimes it can make it harder to catch on quickly.
So as I was reading through some of the more complex concepts I found that there is much more first-time rx user-friendly tutorial:
As for combining React with Rx, there are plenty of articles you might want to have a look at:
Other articles:
The issue with RxJS might be that it is just a utility, and it doesn't show people how to architect their app so that it solves some common difficult problems:
Since RxJS is modeled after monadic programming, all those could be solved if architected correctly, but they are non-trivial without proper experience.
MobX is a state manager, and RxJS is a library for handling async events.
As for MobX, documentation says:
Can MobX be combined with RxJS?
Yes, you can use toStream and fromStream from mobx-utils to use RxJS and other TC 39 compatible observables with mobx.
When to use RxJS instead of MobX?
For anything that involves explicitly working with the concept of time, or when you need to reason about the historical values / events of an observable (and not just the latest), RxJS is recommended as it provides more low-level primitives. Whenever you want to react to state instead of events, MobX offers an easier and more high-level approach. In practice, combining RxJS and MobX might result in really powerful constructions. Use for example RxJS to process and throttle user events and as a result of that update the state. If the state has been made observable by MobX, it will then take care of updating the UI and other derivations accordingly.
It may also be a good idea to use MobX with RxJS, but you will need to install some other dependencies.
In root project directory install necessary dependencies:
npm install rxjs
npm install rxjs-compat
Let's focus on a real-life example with a fetch call that requires a developer to perform multiple data manipulation steps and additional API calls to get the necessary data to load up the application.
Here is my Main component connected to a redux state;
class Main extends Component {
subscription;
constructor() {
super();
this.search$ = new Subject();
this.search = this.search$.asObservable().pipe(
debounceTime(500),
);
}
componentDidMount() {
this.subscription = this.search.subscribe((text) => {
this.callApiToGetHeroes(text);
});
}
componentWillUnmount() {
this.subscription.unsubscribe();
}
onSearch = (e) => {
this.search$.next(e.target.value);
}
render() {
const { listOfHeroes, team } = this.props;
return (
<div className="main_view">
<Header />
<SearchBar onSearch={this.onSearch} />
<div className="main_view_list_container">
<HeroList heroesList={listOfHeroes} />
<ChosenList chosenList={team} />
</div>
</div>
);
}
}
Main.propTypes = {
listOfHeroes: PropTypes.arrayOf(PropTypes.object),
team: PropTypes.arrayOf(PropTypes.object),
getHeroes: PropTypes.func,
};
Main.defaultProps = {
listOfHeroes: [],
team: [],
getHeroes: null,
};
const mapStateToProps = (state) => ({ team: state.team, listOfHeroes: state.list });
const mapDispatchToProps = (dispatch) => ({
getHeroes: () => dispatch(getHeroesAction()),
});
export default connect(mapStateToProps, mapDispatchToProps)(Main);
To create reactive input in the Main component, it needs to have an observable creator – in this case, a Subject.
To throttle API requests, I added debounceTime(500)to the observable so that it will wait 500 ms before another call.
The Subject is just one way to create an observable (more on the topic);
Important notice: unsubscribe() from an observable on componentWillUnmount is used to prevent observables from existing outside the component after its being destroyed so that we can avoid memory leaks.
API call:
import 'babel-polyfill';
import { fromFetch } from 'rxjs/fetch';
import {
switchMap,
map,
tap,
flatMap,
} from 'rxjs/operators';
import { forkJoin } from 'rxjs';
import 'rxjs/add/observable/of';
import { ADD_TO_CHOSEN, REMOVE_FROM_CHOSEN, FETCH_HEROES_SUCCESS } from './types';
const fetchUrl = 'https://swapi.co/api/people/';
const imgUrl = 'https://i.pravatar.cc/200?img=';
export const addHero = (hero) => ({ type: ADD_TO_CHOSEN, hero });
export const removeHero = (hero) => ({ type: REMOVE_FROM_CHOSEN, hero });
export const getHeros = (heros) => ({ type: FETCH_HEROES_SUCCESS, heros });
const getWorlds = (list) =>
forkJoin(list.map((hero) =>
fromFetch(hero.homeworld)
.pipe(
switchMap((resp) => resp.json()),
map((resp) => ({ ...hero, homeworld: resp.name })),
)));
const getSpecies = (list) =>
forkJoin(list.map((hero) =>
fromFetch(hero.species[0])
.pipe(
switchMap((resp) => resp.json()),
map((resp) => ({ ...hero, species: [resp.name] })),
)));
const handleResponse = (resp) =>{
if (resp.ok) {
return resp.json();
} else {
return of({ error: true, message: `Error ${resp.status}` });
}
}
export const getHeroesAction = () =>
(dispatch) =>
fromFetch(fetchUrl)
.pipe(
switchMap((response) => handleResponse(response)),
map((response) => response.results),
flatMap((response) => getWorlds(response)),
flatMap((response) => getSpecies(response)),
tap((completeHeroes) => dispatch(getHeros(completeHeroes))),
catchError((err) => {
console.error(err);
return of({ error: true, message: err.message });
}),
);
So I will try to explain what's going on there (click here to check operators documentation for full description)
We start with fromFetch witch converts basic fetch call to Observable<Response> (<type of observable>) it can also be done with fromPromise(deprecated works for version < 6.) or from.
The pipe operator allows manipulating the stream of data before observable emits with some of Rx operators. In this case, switchMap handles the response from the server (it also cancels previous observables). And the map operator allows transforming observable.
(If Rx maps are a bit confusing go here) or here
Next, the Rx magic begins - the problem is the structure of response data - some of the values in-hero object are just urls to get the values from another source - with Rx we can do it all in one stream! <B
forkJoin works a little like Promise.all - it joins and waits for all inner observables to emit and then returns new observable with data coming from inner observables. It's perfect for this case with multiple http requests.
flatMap is used here to flatten observables created with forkJoin and continue on the stream;
And the next two operators are quite straight forward - tap
allows to create side effects - but it doesn't change the observable in any way and
catchError allows to catch an error and transform it in the way we would like to.
Well, it’s pretty straight forward - just to test some stream inside react application I think that the best approach would be to take our pipe out of the component and keep it in a separate file with other operators - then testing the effects of each stream becomes quite easy. Just for example:
export const simpleMapTest = (observable$) => {
return observable$.pipe(
map(data => data + ' search'),
)
};
And the test:
it('should change value of the observable', (done) => {
const observable$ = of('text');
simpleMapTest(observable$).subscribe((value) => {
expect(value).toBe('text search');
done();
});
});
As for testing components, we can’t really subscribe to component observables but we can test the effects of our streams on this component. Let's take the debounce operator from the input we create earlier. The test for this action would look something like this:
it('should execute call API to get Heroes only once', () => {
const wrapper = shallow(<MainComponent />);
const shallowComponent = wrapper.instance();
shallowComponent.callApiToGetHeroes = jest.fn();
wrapper.update();
shallowComponent.onSearch({ target: { value: 'text' } });
shallowComponent.onSearch({ target: { value: 'text1' } });
shallowComponent.onSearch({ target: { value: 'text3' } });
setTimeout(() => {
expect(shallowComponent.callApiToGetHeroes).toHaveBeenCalledTimes(1);
expect(shallowComponent.callApiToGetHeroes).toHaveBeenCalledWidth('text3');
}, 0);
});
Rx.js is a great tool in the hands of experienced developers, it can be implemented in React environment quite easily but the downside is for someone who never worked with Rx in any project the first contact with reactive approach might be a quite hard event.
Rx.js is much more than topics covered in this article so I encourage you all to do some research and after you grab the basic concepts you can go into rx.js marbles.
Photo by Artem Sapegin on Unsplash