Observable creators (eg static operators)
of
immediately emit all arguments provided
var numbers = Rx.Observable.of(10, 20, 30);
from
convert a promise or array or iterable or Observable-like object to Observable
var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
or
function* generateDoubles(seed) {
var i = seed;
while (true) {
yield i;
i = 2 * i; // double it
}
}
var iterator = generateDoubles(3);
var result = Rx.Observable.from(iterator).take(10);
result.subscribe(x => console.log(x));
// Results in the following:
// 3 6 12 24 48 96 192 384 768 1536
interval
var observable = Rx.Observable.interval(1000 /* number of milliseconds */);
create
var observable = Rx.Observable.create(function (observer) {
observer.next(1); // synchronous
observer.next(2); // synchronous
observer.next(3); // synchronous
setTimeout(() => { // asynchronous
observer.next(4);
observer.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
// just before subscribe
// got value 1
// got value 2
// got value 3
// just after subscribe
// got value 4
// done
merge
var merged = Rx.Observable.merge(observable1, observable2);
more Creation Operators: http://reactivex.io/rxjs/manual/overview.html#creation-operators
Observable operators: Persistence
scan
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.scan(count => count + 1, 0) // works like 'reduce'
.subscribe(count => console.log(`Clicked ${count} times`));
Observable operators: Flow control
throttleTime
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000) // at most allow 1 click per second
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
filter
delay
debounceTime
take
takeUntil
distinct
distinctUntilChanged
Observable operators: Pure Transforms
map
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.map(event => event.clientX) // map means map :)
.scan((count, clientX) => count + clientX, 0)
.subscribe(count => console.log(count));
pluck
pairwise
sample
Interesting examples
An observable that returns both synchronously AND asynchronously
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300); // happens asynchronously
}, 1000);
});
console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');
// "before"
// "Hello"
// 42
// 100
// 200
// "after"
// 300
Subscribe/Unsubscribe interface implemented in pure javascript (no rxjs)
function subscribe(observer) {
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalID);
};
}
var unsubscribe = subscribe({next: (x) => console.log(x)});
// Later:
unsubscribe(); // dispose the resources
So why use rxjs if it can be done in pure js? “The reason why we use Rx types like Observable, Observer, and Subscription is to get safety (such as the Observable Contract) and composability with Operators.”
adding/removing Child subscriptions
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);
var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
}, 1000);
// second: 0
// first: 0
// second: 1
// first: 1
// second: 2