Как я могу создать наблюдаемое с задержкой

92

Вопрос

В целях тестирования я создаю Observableобъекты, которые заменяют наблюдаемое, которое будет возвращено фактическим http-вызовом с Http.

Моя наблюдаемая создается с помощью следующего кода:

fakeObservable = Observable.create(obs => {
  obs.next([1, 2, 3]);
  obs.complete();
});

Дело в том, что эта наблюдаемая излучает немедленно. Есть ли способ добавить настраиваемую задержку к его излучению?


Трек

Я пробовал это:

fakeObservable = Observable.create(obs => {
  setTimeout(() => {
    obs.next([1, 2, 3]);
    obs.complete();
  }, 100);
});

Но похоже, что это не работает.

Адриен Брунелат
источник
Я пытался цепи .create(...)с , .delay(1000)но это не работа:. Observable_1.Observable.create (...) задержка не является функцией.
Адриан Брунелат
1
Чего именно вы пытаетесь достичь?
Günter Zöchbauer
ты подписываешься на наблюдаемое?
shusson
Подделайте задержку ответа Http с моей собственной наблюдаемой. @shusson да, класс, который я тестирую, вызывает сервис (я пытаюсь высмеять) для наблюдаемого, чтобы подписаться на него.
Адриан Брунелат

Ответы:

144

Используя следующий импорт:

import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/delay';

Попробуй это:

let fakeResponse = [1,2,3];
let delayedObservable = Observable.of(fakeResponse).delay(5000);
delayedObservable.subscribe(data => console.log(data));

ОБНОВЛЕНИЕ: RXJS 6

Вышеупомянутое решение больше не работает в более новых версиях RXJS (и, например, angular).

Итак, сценарий таков, что у меня есть массив элементов, которые нужно проверить с помощью API. API принимает только один элемент, и я не хочу убивать API, отправляя все запросы сразу. Так что мне нужен синхронизированный выпуск элементов в потоке Observable с небольшой задержкой между ними.

Используйте следующий импорт:

import { from, of } from 'rxjs';
import { delay } from 'rxjs/internal/operators';
import { concatMap } from 'rxjs/internal/operators';

Затем используйте следующий код:

const myArray = [1,2,3,4];

from(myArray).pipe(
        concatMap( item => of(item).pipe ( delay( 1000 ) ))
    ).subscribe ( timedItem => {
        console.log(timedItem)
    });

По сути, он создает новый «отложенный» Observable для каждого элемента в вашем массиве. Вероятно, есть много других способов сделать это, но это сработало для меня и соответствует «новому» формату RXJS.

MikeOne
источник
2
Свойство «of» не существует для типа typeof Observable. Вы импортируете свой Observable с помощью import {Observable} from 'rxjs/Observable';?
Адриен Брунелат
1
На этой странице: npmjs.com/package/rxjs . Я пришел к выводу, что мне нужно явно импортировать файлы import 'rxjs/add/observable/of';. Вы случайно делаете то же самое? Это все еще странно, так как он не связывается с .delay (...), и он показывает ошибку, когда я пытаюсь rxjs/add/observable/delay...
Адриен Брунелат
4
должен пытаться of(item.pipe ( delay( 1000 ) ))передать of(item))).pipe(delay(1000)массив дал мне ошибки
Дон Томас Бойл
1
Вот что у меня сработало с rxjs6: from ([1, 2, 3, 4, 5, 6, 7]). Pipe (concatMap (num => of (num) .pipe (delay (1000)))). подписаться (x => console.log (x));
Роберт
1
Решение @MikeOne тоже сработало для меня. Печально, что для такого простого дела необходимо так много кода ...
Codev
103

В RxJS 5+ это можно сделать так

import { Observable } from "rxjs/Observable";
import { of } from "rxjs/observable/of";
import { delay } from "rxjs/operators";

fakeObservable = of('dummy').pipe(delay(5000));

В RxJS 6+

import { of } from "rxjs";
import { delay } from "rxjs/operators";

fakeObservable = of('dummy').pipe(delay(5000));

Если вы хотите отложить каждое испускаемое значение, попробуйте

from([1, 2, 3]).pipe(concatMap(item => of(item).pipe(delay(1000))));
Адриан Бер
источник
4
На мой взгляд, самое чистое решение.
Maayao
Это «решение» работает, только если вы испускаете один элемент. Оператор задержки не вызывается для каждого элемента наблюдаемого. Вот почему требуется ужасное решение concatMap.
Рик О'Ши,
1
@ RickO'Shea, вопрос касается одного излучаемого значения, поэтому это решение.
Адриан Бер,
1
Такой свежий и такой чистый!
Nahn
Я обновил свой ответ на несколько задержек @ RickO'Shea
Адриан Бер
12

Вам нужен таймер:

// RxJS v6+
import { timer } from 'rxjs';

//emit [1, 2, 3] after 1 second.
const source = timer(1000).map(([1, 2, 3]);
//output: [1, 2, 3]
const subscribe = source.subscribe(val => console.log(val));
Пеллет
источник
3
Хороший ответ, не забудьте отказаться от подписки
Sami
8

Немного поздно отвечать ... но на всякий случай может кто-нибудь вернется к этому вопросу в поисках ответа

«задержка» - это свойство (функция) наблюдаемого

fakeObservable = Observable.create(obs => {
  obs.next([1, 2, 3]);
  obs.complete();
}).delay(3000);

Это сработало для меня ...

микрочип78
источник
1
import 'rxjs/add/operator/delay' теперь выдает эту ошибку: Модуль не найден: Ошибка: не удается разрешить 'rxjs / add / operator / delay'
Агги Джон из 87,
Почему вы назвали себя наблюдаемой фальшивкой, когда это вполне реально? :)
lagoman
0

import * as Rx from 'rxjs/Rx';

Мы должны добавить указанный выше импорт, чтобы код удара работал

Let obs = Rx.Observable
    .interval(1000).take(3);

obs.subscribe(value => console.log('Subscriber: ' + value));
Нарендра Кумар Ачари
источник