Скажем, у меня есть Observable
такой:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
Затем у меня есть второй Observable
:
var two = someOtherObservable.take(1);
Теперь я хочу это subscribe()
сделать two
, но я хочу убедиться, что это one
было выполнено до того, как two
подписчик будет уволен.
Какой метод буферизации я могу использовать, two
чтобы второй ожидал завершения первого?
Полагаю, я хочу сделать паузу two
до one
завершения.
javascript
observable
rxjs
Стивен
источник
источник
Ответы:
Я могу придумать пару способов
import {take, publish} from 'rxjs/operators' import {concat} from 'rxjs' //Method one var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1)); concat(one, two).subscribe(function() {/*do something */}); //Method two, if they need to be separate for some reason var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1), publish()); two.subscribe(function(){/*do something */}); one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
источник
pause
иresume
вместоpublish
иconnect
, но пример два - это, по сути, путь, который я выбрал.one
разрешать первый observable ( ) перед вторым (two
) внутри функции subscribe ()?Observable.forkJoin()
? См. Ссылку learnrxjs.io/operators/combination/forkjoin.htmlforkJoin
подписывается одновременно.Если вы хотите убедиться, что порядок выполнения сохраняется, вы можете использовать flatMap в качестве следующего примера.
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i)); const second = Rx.Observable.of(11).delay(500).do(i => console.log(i)); const third = Rx.Observable.of(111).do(i => console.log(i)); first .flatMap(() => second) .flatMap(() => third) .subscribe(()=> console.log('finished'));
Результат будет:
"1" "11" "111" "finished"
источник
skipUntil () с last ()
skipUntil: игнорировать испускаемые элементы до тех пор, пока не будет испущен другой наблюдаемый объект
last: испустить последнее значение из последовательности (т.е. дождаться его завершения, затем испустить)
Обратите внимание, что все, что испускается из переданного наблюдаемого объекта
skipUntil
, отменяет пропуск, поэтому нам нужно добавитьlast()
- чтобы дождаться завершения потока.Официально: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
Возможная проблема: обратите внимание, что
last()
само по себе приведет к ошибке, если ничего не передается. Уlast()
оператора естьdefault
параметр, но только когда он используется вместе с предикатом. Я думаю, что если эта ситуация является для вас проблемой (если онаsequence2$
может быть завершена без излучения), тогда один из них должен работать (в настоящее время не проверено):main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last())) main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
Обратите внимание, что
undefined
это допустимый элемент для отправки, но на самом деле может иметь любое значение. Также обратите внимание, что это труба, к которой прикреплена,sequence2$
а неmain$
труба.источник
Вот еще одна возможность воспользоваться селектором результатов switchMap
var one$ = someObservable.take(1); var two$ = someOtherObservable.take(1); two$.switchMap( /** Wait for first Observable */ () => one$, /** Only return the value we're actually interested in */ (value2, value1) => value2 ) .subscribe((value2) => { /* do something */ });
Поскольку селектор результатов switchMap устарел, вот обновленная версия
const one$ = someObservable.pipe(take(1)); const two$ = someOtherObservable.pipe( take(1), switchMap(value2 => one$.map(_ => value2)) ); two$.subscribe(value2 => { /* do something */ });
источник
Вот способ сделать это многоразовым (это машинописный текст, но вы можете адаптировать его к js):
export function waitFor<T>(signal: Observable<any>) { return (source: Observable<T>) => new Observable<T>(observer => signal.pipe(first()) .subscribe(_ => source.subscribe(observer) ) ); }
и вы можете использовать его как любой оператор:
var two = someOtherObservable.pipe(waitFor(one), take(1));
По сути, это оператор, который откладывает подписку на наблюдаемый источник до тех пор, пока наблюдаемый сигнал не испустит первое событие.
источник
Если вторая наблюдаемая горячая , есть другой способ сделать паузу / возобновить :
var pauser = new Rx.Subject(); var source1 = Rx.Observable.interval(1000).take(1); /* create source and pause */ var source2 = Rx.Observable.interval(1000).pausable(pauser); source1.doOnCompleted(function () { /* resume paused source2 */ pauser.onNext(true); }).subscribe(function(){ // do something }); source2.subscribe(function(){ // start to recieve data });
Также вы можете использовать буферизованную версию pausableBuffered для хранения данных во время паузы.
источник
Вот еще один, но мне кажется более простым и интуитивно понятным (или, по крайней мере, естественным, если вы привыкли к обещаниям) подход. По сути, вы создаете Observable, используя
Observable.create()
для переносаone
иtwo
как один Observable. Это очень похоже на то, какPromise.all()
может работать.var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ // observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); });
Итак, что здесь происходит? Сначала мы создаем новый Observable. Функция, переданная в
Observable.create()
, с метко названнойonSubscription
, передается наблюдателю (построенному из параметров, которые вы передаетеsubscribe()
), который аналогиченresolve
иreject
объединен в один объект при создании нового обещания. Вот как мы заставляем волшебство работать.В
onSubscription
, мы подписываемся на первый Observable (в приведенном выше примере он был вызванone
). Как мы будем поступать,next
и решатьerror
вам, но значение по умолчанию, указанное в моем примере, должно быть подходящим в целом. Однако, когда мы получаемcomplete
событие, что означает, чтоone
теперь оно выполнено, мы можем подписаться на следующий Observable; тем самым запускает второй Observable после завершения первого.Пример наблюдателя, предоставленный для второго Observable, довольно прост. По сути,
second
теперь действует так же, какtwo
и в OP. В частности,second
будет выдано первое и только первое значение, выданноеsomeOtherObservable
(из-заtake(1)
), а затем завершится, если ошибки нет.пример
Вот полный рабочий пример, который вы можете скопировать / вставить, если хотите, чтобы мой пример работал в реальной жизни:
var someObservable = Observable.from([1, 2, 3, 4, 5]); var someOtherObservable = Observable.from([6, 7, 8, 9]); var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); }).subscribe( function onNext(value) { console.log(value); }, function onError(error) { console.error(error); }, function onComplete() { console.log("Done!"); } );
Если вы посмотрите на консоль, приведенный выше пример напечатает:
источник
Вот пользовательский оператор, написанный на TypeScript, который ожидает сигнала перед выдачей результатов:
export function waitFor<T>( signal$: Observable<any> ) { return (source$: Observable<T>) => new Observable<T>(observer => { // combineLatest emits the first value only when // both source and signal emitted at least once combineLatest([ source$, signal$.pipe( first(), ), ]) .subscribe(([v]) => observer.next(v)); }); }
Вы можете использовать это так:
two.pipe(waitFor(one)) .subscribe(value => ...);
источник
ну, я знаю, что это довольно старый, но я думаю, что вам может понадобиться следующее:
var one = someObservable.take(1); var two = someOtherObservable.pipe( concatMap((twoRes) => one.pipe(mapTo(twoRes))), take(1) ).subscribe((twoRes) => { // one is completed and we get two's subscription. })
источник
Вы можете использовать результат, полученный из предыдущего Observable, благодаря оператору mergeMap (или его псевдониму flatMap ) следующим образом:
const one = Observable.of('https://api.github.com/users'); const two = (c) => ajax(c);//ajax from Rxjs/dom library one.mergeMap(two).subscribe(c => console.log(c))
источник