RxJS: Как мне «вручную» обновить Observable?

146

Я думаю, что я неправильно понимаю что-то фундаментальное, потому что, на мой взгляд, это должен быть самый простой случай для наблюдаемого, но на всю жизнь я не могу понять, как это сделать из документации.

По сути, я хочу уметь это делать:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

Но мне так и не удалось найти способ вроде push. Я использую это для обработчика кликов, и я знаю, что они есть Observable.fromEventдля этого, но я пытаюсь использовать его с React, и я бы предпочел просто обновить поток данных в обратном вызове, вместо того, чтобы использовать совершенно другой система обработки событий. В общем, я хочу это:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

Самое близкое, что у меня было observer.onNext('foo'), было использование , но это, похоже, на самом деле не сработало, и это вызвало наблюдателя, что кажется неправильным. Наблюдатель должен реагировать на поток данных, а не изменять его, верно?

Я просто не понимаю отношения наблюдатель / наблюдаемое?

ЛиамД
источник
Взгляните на это, чтобы прояснить свою идею (введение в реактивное программирование, которое вам не хватало): gist.github.com/staltz/868e7e9bc2a7b8c1f754 . Здесь также есть множество ресурсов, с помощью которых вы можете улучшить свое понимание: github.com/Reactive-Extensions/RxJS#resources
user3743222
Я проверил первый, кажется, надежный ресурс. Второй - отличный список, в нем я нашел aaronstacy.com/writings/reactive-programming-and-mvc, который помог мне открыть Rx.Subject, который решает мою проблему. Так что спасибо! Как только я напишу еще немного приложения, я опубликую свое решение, просто хочу немного его протестировать.
LiamD
Хе-хе, большое спасибо за этот вопрос, я собирался задать тот же вопрос, имея в виду тот же самый образец кода :-)
arturh 01

Ответы:

157

В RX Observer и Observable - разные сущности. Наблюдатель подписывается на Observable. Observable передает элементы своим наблюдателям, вызывая методы наблюдателей. Если вам нужно вызвать методы наблюдателя за пределами области действия, Observable.create()вы можете использовать Subject, который является прокси, который действует как наблюдатель и Observable одновременно.

Сделать можно так:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

Вы можете найти больше информации по предметам здесь:

Луисгабриэль
источник
1
Это именно то, чем я закончил! Я продолжал работать над этим, чтобы посмотреть, смогу ли я найти лучший способ сделать то, что мне нужно, но это определенно жизнеспособное решение. Я впервые увидел это в этой статье: aaronstacy.com/writings/reactive-programming-and-mvc .
LiamD
37

Я считаю, Observable.create()что в качестве параметра обратного вызова не используется наблюдатель, а эмиттер. Поэтому, если вы хотите добавить новое значение в свой Observable, попробуйте вместо этого:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Да Subject упрощает задачу, предоставляя Observable и Observer в одном объекте, но это не совсем то же самое, поскольку Subject позволяет вам подписывать нескольких наблюдателей на один и тот же наблюдаемый объект, когда наблюдаемый отправляет данные только последнему подписанному наблюдателю, поэтому используйте его сознательно . Вот JsBin, если вы хотите с ним повозиться.

СвободаБанан
источник
возможность перезаписи свойства эмиттера задокументирована где-то в руководствах по RxJS?
Tomas
В этом случае emitterбудут только next()новые значения для наблюдателя, который подписался последним. Лучшим подходом было бы собрать все emitters в массив и перебрать их все и nextзначение каждого из них
Эрик Гопак