Как получить текущее значение RxJS Subject или Observable?

207

У меня есть сервис Angular 2:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

Все отлично работает. Но у меня есть другой компонент, которому не нужно подписываться, ему просто нужно получить текущее значение isLoggedIn в определенный момент времени. Как я могу это сделать?

Baconbeastnz
источник

Ответы:

341

А Subjectили Observableне имеет текущего значения. Когда значение передается, оно передается подписчикам, и с Observableним делается.

Если вы хотите иметь текущее значение, используйте BehaviorSubjectкоторое предназначено именно для этой цели. BehaviorSubjectсохраняет последнее выбранное значение и немедленно отправляет его новым подписчикам.

У него также есть метод getValue()для получения текущего значения.

Гюнтер Цохбауэр
источник
1
Привет, это плохо, они теперь то же самое в rxjs 5. Ссылка на исходный код выше ссылалась на rxjs4.
код
84
Важное примечание от автора RxJS 5: использование ОГРОМНОГО getValue()красного флага - вы делаете что-то не так. Это как спасательный люк. Обычно все, что вы делаете с RxJS, должно быть декларативным. getValue()обязательно. Если вы используете getValue(), есть вероятность 99,9%, что вы делаете что-то не так или странно.
Бен Леш
1
@BenLesh, что если у меня есть BehaviorSubject<boolean>(false)и нравится переключать его?
Боб
3
@BenLesh getValue () очень полезен, например, для выполнения мгновенного действия, такого как onClick-> dispatchToServer (x.getValue ()), но не используйте его в наблюдаемых цепочках.
FlavorScape,
2
@ IngoBürk Единственный способ, которым я могу обосновать ваше предложение, - это если вы не знали, что это foo$было BehaviorSubject(то есть оно определено как, Observableа может, горячее или холодное из отложенного источника). Однако , так как вы затем перейти к использованию .nextна $fooсебе , что означает , что ваша реализация зависит от него бытьBehaviorSubject так что нет никаких оснований для не только с помощью , .valueчтобы получить текущее значение в первую очередь.
Simon_Weaver
145

Единственный способ, которым вы должны получать значения «из» Observable / Subject - это подписаться!

Если вы используете, getValue()вы делаете что-то обязательное в декларативной парадигме. Он есть в качестве аварийного люка, но 99,9% времени НЕ следует использовать getValue(). Есть несколько интересных вещей, которые getValue()будут делать: он выдаст ошибку, если субъект был отписан, он не даст вам получить значение, если субъект мертв, потому что он ошибочен, и т. Д. Но, опять же, это как выход люк для редких обстоятельств.

Есть несколько способов получить последнее значение от субъекта или наблюдаемого способом «Rx-y»:

  1. Использование BehaviorSubject: Но на самом деле подписаться на это . Когда вы впервые подписываетесь на BehaviorSubjectнего, он будет синхронно отправлять предыдущее значение, которое он получил или был инициализирован.
  2. Используя a ReplaySubject(N): Это кеширует Nзначения и воспроизводит их новым подписчикам.
  3. A.withLatestFrom(B): Используйте этот оператор, чтобы получить самое последнее значение из наблюдаемого, Bкогда наблюдаемое Aизлучает. Даст вам оба значения в массиве [a, b].
  4. A.combineLatest(B): Используйте этот оператор , чтобы получить самые последние значения от Aи Bкаждый раз , когда либо Aили Bиспускает. Даст вам оба значения в массиве.
  5. shareReplay(): Создает наблюдаемую многоадресную рассылку через ReplaySubject, но позволяет вам повторить наблюдаемое при ошибке. (В основном это дает вам обещанное поведение кеширования).
  6. publishReplay(), publishBehavior(initialValue), multicast(subject: BehaviorSubject | ReplaySubject)И т.д.: Другие операторы , которые используют BehaviorSubjectи ReplaySubject. Различные разновидности одного и того же, они в основном направляют источник, наблюдаемый путем направления всех уведомлений через субъект. Вам нужно позвонить, connect()чтобы подписаться на источник с темой.
Бен Леш
источник
19
Можете ли вы объяснить, почему использование getValue () является красным флагом? Что если мне нужно текущее значение наблюдаемого только один раз, в тот момент, когда пользователь нажимает кнопку? Должен ли я подписаться на стоимость и сразу же отписаться?
Пламя
5
Иногда все в порядке. Но часто это признак того, что автор кода делает что-то очень важное (обычно с побочными эффектами), чего не должно быть. Вы также можете click$.mergeMap(() => behaviorSubject.take(1))решить свою проблему.
Бен Леш
1
Отличное объяснение! На самом деле, если вы можете сохранить Observable и использовать методы, это гораздо лучший способ. В контексте машинописного текста с Observable, используемого повсеместно, но лишь немногие из них определены как BehaviourSubject, тогда этот код будет менее последовательным. Методы, которые вы предложили, позволили мне хранить наблюдаемые типы везде.
JLavoie
2
хорошо, если вы хотите просто отправить текущее значение (например, отправить его на сервер по щелчку), выполнение getValue () очень удобно. но не используйте его при объединении наблюдаемых операторов.
FlavorScape,
1
У меня есть, AuthenticationServiceкоторый использует BehaviourSubjectдля хранения текущего вошедшего в систему состояния ( boolean trueили false). Это предоставляет isLoggedIn$обозримое для подписчиков, которые хотят знать, когда состояние меняется. Он также предоставляет get isLoggedIn()свойство, которое возвращает текущее состояние входа в систему, вызывая getValue()базовый объект BehaviourSubject- это используется моим средством проверки подлинности для проверки текущего состояния. Это кажется разумным использованием getValue()для меня ...?
Дэн Кинг,
13

У меня была похожая ситуация, когда поздние подписчики подписывались на Предмет после того, как его ценность прибыла.

Я обнаружил, что ReplaySubject, который похож на BehaviorSubject, в этом случае работает как шарм. А вот ссылка на лучшее объяснение: http://reactivex.io/rxjs/manual/overview.html#replaysubject

Кфир Эрез
источник
1
это помогло мне в моем приложении Angular4 - мне пришлось перенести подписку из конструктора компонента в ngOnInit () (такой компонент является общим для всех маршрутов), просто оставить его на случай, если у кого-то
Luca
1
У меня была проблема с приложением Angular 5, где я использовал сервис для получения значений из API и установки переменных в разных компонентах. Я использовал предметы / наблюдаемые, но это не будет выдвигать значения после изменения маршрута. ReplaySubject был каплей замены для Subject и решил все.
Jafaircl
1
Убедитесь, что вы используете ReplaySubject (1), иначе новые подписчики получат каждое ранее отправленное значение в последовательности - это не всегда очевидно во время выполнения
Drenai
@ Насколько я понимаю, ReplaySubject (1) ведет себя так же, как BehaviorSubject ()
Kfir Erez
Не совсем то же самое, большая разница в том, что ReplaySubject не сразу генерирует значение по умолчанию при подписке, если его next()функция еще не была вызвана, в то время как BehaviourSubject делает. Немедленный выброс очень удобен для применения значений по умолчанию к представлению, когда BehaviourSubject используется в качестве источника данных, наблюдаемого в службе, например
Drenai
5
const observable = of('response')

function hasValue(value: any) {
  return value !== null && value !== undefined;
}

function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}

const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

Вы можете проверить полную статью о том, как реализовать это здесь. https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/

Keerati Limkulphong
источник
3

Я сталкивался с той же проблемой в дочерних компонентах, где первоначально он должен иметь текущее значение Subject, а затем подписаться на Subject для прослушивания изменений. Я просто поддерживаю текущее значение в Сервисе, чтобы оно было доступно для компонентов, например:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {

  isLoggedIn: boolean;

  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

Компонент, которому нужно текущее значение, может просто получить к нему доступ из службы, то есть:

sessionStorage.isLoggedIn

Не уверен, что это правильная практика :)

Molp Burnbright
источник
2
Если вам нужно значение наблюдаемого в вашем компонентном представлении, вы можете просто использовать asyncканал.
Инго Бюрк
2

Аналогичный ищет ответ был downvoted. Но я думаю, что могу оправдать то, что я предлагаю здесь для ограниченных случаев.


Хотя верно, что наблюдаемая не имеет текущего значения, очень часто она будет иметь сразу доступное значение. Например, в хранилищах redux / flux / akita вы можете запрашивать данные из центрального хранилища, основываясь на количестве наблюдаемых, и это значение обычно будет сразу же доступно.

Если это так, то когда вы subscribe, значение вернется немедленно.

Допустим, у вас был вызов в службу, и по окончании вы хотите получить из своего магазина самую свежую ценность, которая потенциально может не генерироваться :

Вы можете попытаться сделать это (и вы должны как можно больше держать вещи «в трубах»):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {

                        // we have serviceCallResponse and customer 
                     });

Проблема в том, что он будет блокироваться до тех пор, пока вторичная наблюдаемая не выдаст значение, которое потенциально может никогда не быть.

В последнее время я обнаружил, что мне нужно оценивать наблюдаемое только в том случае, если значение было немедленно доступно , и, что более важно, мне нужно было уметь определять, не было ли оно. Я закончил тем, что сделал это:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {

                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;

                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);

                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();

                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

Обратите внимание, что для всего вышеперечисленного я использую, subscribeчтобы получить значение (как обсуждает @Ben). Не используя .valueсобственность, даже если бы у меня был BehaviorSubject.

Simon_Weaver
источник
1
Кстати, это работает, потому что по умолчанию «schedule» использует текущий поток.
Simon_Weaver
1

Хотя это может показаться излишним, но это просто еще одно «возможное» решение, позволяющее сохранить наблюдаемый тип и уменьшить шаблон…

Вы всегда можете создать расширение для получения текущего значения Observable.

Чтобы сделать это, вам нужно расширить Observable<T>интерфейс в global.d.tsфайле объявления типографий. Затем осуществить расширение геттер в observable.extension.tsфайле и , наконец , включают в себя как типизации и файл расширения для вашего приложения.

Вы можете обратиться к этому ответу StackOverflow, чтобы узнать, как включить расширения в ваше приложение Angular.

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}

// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});

// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });
j3ff
источник
0

Вы можете хранить последнее переданное значение отдельно от Observable. Тогда прочитайте это когда необходимо.

let lastValue: number;

const subscription = new Service().start();
subscription
    .subscribe((data) => {
        lastValue = data;
    }
);
Slawa
источник
1
Это не реактивный подход, чтобы хранить некоторые вещи вне наблюдаемого. Вместо этого вы должны иметь как можно больше данных, проходящих внутри наблюдаемых потоков.
ganqqwerty
0

Лучший способ сделать это с помощью Behaviur Subject, вот пример:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5
Яя
источник
0

Подписка может быть создана и после получения первого испущенного элемента уничтожена. Труба - это функция, которая использует Observable в качестве входных данных и возвращает другой Observable в качестве выходных данных, не изменяя первую наблюдаемую. Угловой 8.1.0. Пакеты: "rxjs": "6.5.3","rxjs-observable": "0.0.7"

  ngOnInit() {

    ...

    // If loading with previously saved value
    if (this.controlValue) {

      // Take says once you have 1, then close the subscription
      this.selectList.pipe(take(1)).subscribe(x => {
        let opt = x.find(y => y.value === this.controlValue);
        this.updateValue(opt);
      });

    }
  }
SushiGuy
источник