Как я могу `await` на Rx Observable?

111

Я бы хотел дождаться наблюдаемого, например

const source = Rx.Observable.create(/* ... */)
//...
await source;

Наивная попытка приводит к немедленному разрешению ожидания и не блокирует выполнение

Изменить: псевдокод для моего полного предполагаемого варианта использования:

if (condition) {
  await observable;
}
// a bunch of other code

Я понимаю, что могу переместить другой код в другую отдельную функцию и передать его в обратный вызов подписки, но я надеюсь, что смогу этого избежать.

CheapSteaks
источник
Можете ли вы не переместить оставшийся код (который вы хотите дождаться источника) в .subscribe()вызов метода?
StriplingWarrior

Ответы:

133

Вы должны передать обещание await. Преобразуйте следующее событие наблюдаемого в обещание и дождитесь этого.

if (condition) {
  await observable.first().toPromise();
}

Примечание для редактирования: в этом ответе изначально использовался .take (1), но он был изменен на использование .first (), что позволяет избежать проблемы, связанной с тем, что Promise никогда не разрешается, если поток заканчивается до того, как будет получено значение.

Macil
источник
3
Вы могли бы использовать вместо take (1) await observable.first().toPromise();?
apricity
15
@apricity Если при завершении не было значений, first()приведет к отклонению и take(1)приведет к отложенному обещанию.
Estus Flask
6
@apricity @AgentME На самом деле вы НЕ должны использовать ни то, take(1)ни другое first()в подобных случаях. Поскольку вы ожидаете, что произойдет ровно ОДНО событие, вы должны использовать его, single()который вызовет исключение, если их больше 1, но не выбрасывает исключение, если его нет. Если их несколько, вероятно, что-то не так в вашем коде / модели данных и т. Д. Если вы не используете single, вы в конечном итоге произвольно выберете первый элемент, который возвращается без предупреждения о том, что их больше. Вам нужно будет позаботиться о своем предикате в вышестоящем источнике данных, чтобы всегда поддерживать один и тот же порядок.
ntziolis
3
Не забудьте про импорт:import 'rxjs/add/operator/first';
Стефани
7
Теперь, когда toPromise () устарела, как нам это сделать?
Jus10 03
26

Вероятно, это должно быть

await observable.first().toPromise();

Как было отмечено в комментариях прежде, есть существенная разница между take(1)и first()операторами , когда пусто завершен наблюдаемым.

Observable.empty().first().toPromise()приведет к отклонению с EmptyErrorэтим можно обработать соответствующим образом, потому что действительно не было никакой ценности.

И Observable.empty().take(1).toPromise()приведет к разрешению со undefinedзначением.

Фляга Эстуса
источник
На самом деле take(1)будет не давать отложенное обещание. Это даст обещание, разрешенное с помощью undefined.
Йохан т Харт,
Спасибо, что заметили, это правильно. Я не уверен, почему пост отличался, возможно, в какой-то момент поведение изменилось.
Estus Flask
8

Вам понадобится awaitобещание, поэтому вы захотите его использовать toPromise(). См. Это для более подробной информации toPromise().

Джош Дарем
источник
5

Если toPromiseосуждается для вас, вы можете использовать , .pipe(take(1)).toPromiseно как вы можете видеть здесь , это не рекомендуется.

Поэтому, пожалуйста, используйте toPromise(RxJs 6), как сказано:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

Пример async / await:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

Подробнее читайте здесь .

И удалите эту неправильную заявку, в которой говорится, что toPromiseона устарела.

Эмерика
источник
1

Используйте новый firstValueFrom()или lastValueFrom()вместо toPromise(), который, как указано здесь, устарел, начиная с RxJS 7, и будет удален в RxJS 8.

import { firstValueFrom} from 'rxjs';
import { lastValueFrom } from 'rxjs';

this.myProp = await firstValueFrom(myObservable$);
this.myProp = await lastValueFrom(myObservable$);

Это доступно в RxJS 7+

См .: https://indepth.dev/rxjs-heads-up-topromise-is-being-deprecated/

TetraDev
источник