Как заставить одну наблюдаемую последовательность ждать завершения другой перед отправкой?

86

Скажем, у меня есть Observableтакой:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

Затем у меня есть второй Observable:

var two = someOtherObservable.take(1);

Теперь я хочу это subscribe()сделать two, но я хочу убедиться, что это oneбыло выполнено до того, как twoподписчик будет уволен.

Какой метод буферизации я могу использовать, twoчтобы второй ожидал завершения первого?

Полагаю, я хочу сделать паузу twoдо oneзавершения.

Стивен
источник
1
Я считаю, что ответ на этот вопрос - метод .exhaustMap (), однако я бы не стал притворяться, что знаю, как его реализовать - полное описание здесь: blog.angular-university.io/rxjs-higher-order-mapping
Питер Никси,

Ответы:

53

Я могу придумать пару способов

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
Paulpdaniels
источник
1
В итоге я использовал pauseи resumeвместо publishи connect, но пример два - это, по сути, путь, который я выбрал.
Стивен
1
Будет ли этот метод всегда oneразрешать первый observable ( ) перед вторым ( two) внутри функции subscribe ()?
Джон
Почему бы не использовать Observable.forkJoin()? См. Ссылку learnrxjs.io/operators/combination/forkjoin.html
mspasiuk
16
@mspasiuk в соответствии с требованиями OP, они хотели, чтобы второй подписался только после завершения первого. forkJoinподписывается одновременно.
paulpdaniels 06
17

Если вы хотите убедиться, что порядок выполнения сохраняется, вы можете использовать flatMap в качестве следующего примера.

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

Результат будет:

"1"
"11"
"111"
"finished"
Никос Цокос
источник
15

skipUntil () с last ()

skipUntil: игнорировать испускаемые элементы до тех пор, пока не будет испущен другой наблюдаемый объект

last: испустить последнее значение из последовательности (т.е. дождаться его завершения, затем испустить)

Обратите внимание, что все, что испускается из переданного наблюдаемого объекта skipUntil, отменяет пропуск, поэтому нам нужно добавить last()- чтобы дождаться завершения потока.

main$.skipUntil(sequence2$.pipe(last()))

Официально: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


Возможная проблема: обратите внимание, что last()само по себе приведет к ошибке, если ничего не передается. У last()оператора есть defaultпараметр, но только когда он используется вместе с предикатом. Я думаю, что если эта ситуация является для вас проблемой (если она sequence2$может быть завершена без излучения), тогда один из них должен работать (в настоящее время не проверено):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

Обратите внимание, что undefinedэто допустимый элемент для отправки, но на самом деле может иметь любое значение. Также обратите внимание, что это труба, к которой прикреплена, sequence2$а не main$труба.

Simon_Weaver
источник
Очень неуклюжая демонстрация: angular-vgznak.stackblitz.io Вам нужно щелкнуть, чтобы открыть лоток консоли
Simon_Weaver
Ваш синтаксис неверен. skipUntil нельзя напрямую прикрепить к наблюдаемому объекту, иначе вы получите следующую ошибку: "Свойство" skipUntil "не существует для типа" Observable <any> ". Вам необходимо сначала запустить его через .pipe ()
London804
Да, это старый ответ до того, как потребовалась труба. Спасибо, что упомянули об этом. Я бы обновил его сейчас, но я на своем телефоне. Не стесняйтесь редактировать ответ.
Simon_Weaver
13

Вот еще одна возможность воспользоваться селектором результатов switchMap

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });

Поскольку селектор результатов switchMap устарел, вот обновленная версия

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
  /* do something */ 
});
Джо Кинг
источник
8

Вот способ сделать это многоразовым (это машинописный текст, но вы можете адаптировать его к js):

export function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) =>
        new Observable<T>(observer =>
            signal.pipe(first())
                .subscribe(_ =>
                    source.subscribe(observer)
                )
        );
}

и вы можете использовать его как любой оператор:

var two = someOtherObservable.pipe(waitFor(one), take(1));

По сути, это оператор, который откладывает подписку на наблюдаемый источник до тех пор, пока наблюдаемый сигнал не испустит первое событие.

Андрей Татар
источник
6

Если вторая наблюдаемая горячая , есть другой способ сделать паузу / возобновить :

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () { 
  /* resume paused source2 */ 
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});

Также вы можете использовать буферизованную версию pausableBuffered для хранения данных во время паузы.

Антон
источник
2

Вот еще один, но мне кажется более простым и интуитивно понятным (или, по крайней мере, естественным, если вы привыкли к обещаниям) подход. По сути, вы создаете Observable, используя Observable.create()для переноса oneи twoкак один Observable. Это очень похоже на то, как Promise.all()может работать.

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      // observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
});

Итак, что здесь происходит? Сначала мы создаем новый Observable. Функция, переданная в Observable.create(), с метко названной onSubscription, передается наблюдателю (построенному из параметров, которые вы передаете subscribe()), который аналогичен resolveи rejectобъединен в один объект при создании нового обещания. Вот как мы заставляем волшебство работать.

В onSubscription, мы подписываемся на первый Observable (в приведенном выше примере он был вызван one). Как мы будем поступать, nextи решать errorвам, но значение по умолчанию, указанное в моем примере, должно быть подходящим в целом. Однако, когда мы получаем completeсобытие, что означает, что oneтеперь оно выполнено, мы можем подписаться на следующий Observable; тем самым запускает второй Observable после завершения первого.

Пример наблюдателя, предоставленный для второго Observable, довольно прост. По сути, secondтеперь действует так же, как twoи в OP. В частности, secondбудет выдано первое и только первое значение, выданное someOtherObservable(из-за take(1)), а затем завершится, если ошибки нет.

пример

Вот полный рабочий пример, который вы можете скопировать / вставить, если хотите, чтобы мой пример работал в реальной жизни:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
}).subscribe(
  function onNext(value) {
    console.log(value);
  },
  function onError(error) {
    console.error(error);
  },
  function onComplete() {
    console.log("Done!");
  }
);

Если вы посмотрите на консоль, приведенный выше пример напечатает:

1

6

Выполнено!

c1moore
источник
Это был прорыв, который мне понадобился для создания моего собственного оператора «кластера (T, X, D)», который обрабатывает только первые X, излучаемые источником в течение промежутка времени T, и выдает результаты с интервалом D-задержки. Спасибо!
wonkim00
Я рад, что это помогло, это было очень поучительно, когда я тоже это понял.
c1moore
2

Вот пользовательский оператор, написанный на TypeScript, который ожидает сигнала перед выдачей результатов:

export function waitFor<T>(
    signal$: Observable<any>
) {
    return (source$: Observable<T>) =>
        new Observable<T>(observer => {
            // combineLatest emits the first value only when
            // both source and signal emitted at least once
            combineLatest([
                source$,
                signal$.pipe(
                    first(),
                ),
            ])
                .subscribe(([v]) => observer.next(v));
        });
}

Вы можете использовать это так:

two.pipe(waitFor(one))
   .subscribe(value => ...);
Серджиу
источник
1
красивый узор! Вы даже можете сделать three.pipe (waitFor (one), waitFor (two), take (1))
Дэвид Ринк
1

ну, я знаю, что это довольно старый, но я думаю, что вам может понадобиться следующее:

var one = someObservable.take(1);

var two = someOtherObservable.pipe(
  concatMap((twoRes) => one.pipe(mapTo(twoRes))),
  take(1)
).subscribe((twoRes) => {
   // one is completed and we get two's subscription.
})
Itay Oded
источник
0

Вы можете использовать результат, полученный из предыдущего Observable, благодаря оператору mergeMap (или его псевдониму flatMap ) следующим образом:

 const one = Observable.of('https://api.github.com/users');
 const two = (c) => ajax(c);//ajax from Rxjs/dom library

 one.mergeMap(two).subscribe(c => console.log(c))
Ткторза
источник
отсюда: learnrxjs.io/learn-rxjs/operators/transformation/mergemap - «Если важен порядок выдачи и подписки внутренних наблюдаемых, попробуйте concatMap!»
gsziszi,