Почему в реактивных расширениях .NET не рекомендуются темы?

112

В настоящее время я занимаюсь фреймворком Reactive Extensions для .NET и прорабатываю различные вводные ресурсы, которые я нашел (в основном http://www.introtorx.com )

Наше приложение включает в себя несколько аппаратных интерфейсов, которые обнаруживают сетевые фреймы, это будут мои IObservables, а затем у меня будет множество компонентов, которые будут использовать эти фреймы или выполнять какое-либо преобразование данных и создавать новый тип фрейма. Также будут другие компоненты, которые, например, должны отображать каждый n-й кадр. Я убежден, что Rx будет полезен для нашего приложения, однако я борюсь с деталями реализации интерфейса IObserver.

В большинстве (если не во всех) ресурсов, которые я читал, говорится, что я не должен сам реализовывать интерфейс IObservable, а должен использовать одну из предоставленных функций или классов. Из моих исследований видно, что создание a Subject<IBaseFrame>предоставит мне то, что мне нужно, у меня будет мой единственный поток, который считывает данные с аппаратного интерфейса, а затем вызывает функцию OnNext моего Subject<IBaseFrame>экземпляра. Затем различные компоненты IObserver будут получать свои уведомления от этого субъекта.

Мое замешательство вызвано советом, приведенным в приложении к этому руководству, где говорится:

Избегайте использования тематических типов. Rx - это, по сути, парадигма функционального программирования. Использование субъектов означает, что теперь мы управляем состоянием, которое потенциально может изменяться. Правильно справиться и с изменяющимся состоянием, и с асинхронным программированием одновременно очень сложно. Более того, многие операторы (методы расширения) были тщательно написаны, чтобы обеспечить правильное и постоянное время жизни подписок и последовательностей; когда вы вводите предметы, вы можете сломать это. В будущих выпусках также может наблюдаться значительное снижение производительности, если вы явно используете темы.

Мое приложение весьма критично к производительности, я, очевидно, собираюсь протестировать производительность использования шаблонов Rx, прежде чем оно попадет в производственный код; однако меня беспокоит, что я делаю что-то, что противоречит духу структуры Rx, используя класс Subject, и что будущая версия инфраструктуры может снизить производительность.

Есть ли лучший способ делать то, что я хочу? Поток аппаратного опроса будет работать постоянно, независимо от того, присутствуют ли какие-либо наблюдатели или нет (в противном случае будет выполнено резервное копирование HW-буфера), так что это очень горячая последовательность. Затем мне нужно передать полученные кадры нескольким наблюдателям.

Любой совет будет очень признателен.

Энтони
источник
1
Это действительно помогло мне понять предмет, я просто начинаю думать о том, как использовать это в моем приложении. Я знаю, что это правильно - у меня есть конвейер компонентов, которые очень ориентированы на push, и мне нужно выполнять все виды фильтрации и вызова потока пользовательского интерфейса для отображения в графическом интерфейсе, а также буферизовать последний полученный кадр и т. Д. и т.д. - Мне просто нужно убедиться, что я делаю это правильно с первого раза!
Энтони

Ответы:

70

Хорошо, если мы проигнорируем мои догматические взгляды и проигнорируем все вместе «предметы хорошие / плохие». Давайте посмотрим на проблемное пространство.

Бьюсь об заклад, у вас либо есть 1 из 2 стилей системы, к которым вам нужно снискать расположение.

  1. Система вызывает событие или обратный вызов при поступлении сообщения
  2. Вам нужно опросить систему, чтобы увидеть, есть ли какие-либо сообщения для обработки

Для варианта 1, просто, мы просто оборачиваем его соответствующим методом FromEvent, и все готово. В паб!

Что касается варианта 2, то теперь нам нужно подумать о том, как мы проводим опрос и как это сделать эффективно. Также, когда мы получаем значение, как мы его публикуем?

Я предполагаю, что вам понадобится выделенная ветка для опроса. Вы бы не хотели, чтобы какой-то другой программист забил ThreadPool / TaskPool и оставил вас в ситуации голодания ThreadPool. В качестве альтернативы вы не хотите хлопот с переключением контекста (я думаю). Итак, предположим, что у нас есть собственный поток, у нас, вероятно, будет какой-то цикл While / Sleep, который мы будем опрашивать. Когда проверка обнаруживает сообщения, мы их публикуем. Что ж, все это звучит идеально для Observable.Create. Теперь мы, вероятно, не можем использовать цикл While, так как он не позволит нам когда-либо вернуть Disposable, чтобы разрешить отмену. К счастью, вы прочитали всю книгу, так что разбираетесь в рекурсивном планировании!

Думаю, что-то подобное могло сработать. #Не проверено

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;

    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();

        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();

        _messages = messages;
        messages.Connect();
    }

    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }

    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }

    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

Причина, по которой мне действительно не нравятся Subjects, заключается в том, что обычно разработчик не имеет четкого представления о проблеме. Взломайте тему, ткните ее здесь, там и везде, а затем пусть бедный разработчик поддержки догадывается, что происходит WTF. Когда вы используете методы Create / Generate и т. Д., Вы локализуете эффекты в последовательности. Вы можете увидеть все это в одном методе, и вы знаете, что никто другой не вызывает неприятных побочных эффектов. Если я вижу поля темы, мне нужно искать все места в классе, в которых они используются. Если какой-то MFer выставляет один публично, то все ставки отключены, кто знает, как эта последовательность используется! Async / Concurrency / Rx - это сложно. Вам не нужно усложнять задачу, позволяя побочным эффектам и программированию причинно-следственной связи еще больше вскружить вам голову.

Ли Кэмпбелл
источник
10
Я просто читаю этот ответ сейчас, но чувствовал, что должен указать, что я никогда не стал бы рассматривать раскрытие интерфейса Subject! Я использую его для обеспечения реализации IObservable <> в закрытом классе (который предоставляет IObservable <>). Я определенно понимаю, почему раскрытие интерфейса Subject <> было бы плохой вещью ™
Энтони
эй, извини за то, что был толстым, но я просто не совсем понимаю твой код. что делают и возвращают ListenToMessages () и GetMessages ()?
user10479 01
1
Для вашего личного проекта @jeromerg это может подойти. Однако, по моему опыту, разработчики борются с WPF, MVVM, дизайном графического интерфейса модульного тестирования, а затем добавление Rx может усложнить ситуацию. Я пробовал использовать шаблон BehaviourSubject-as-a-property. Однако я обнаружил, что это было бы гораздо более приемлемо для других, если бы мы использовали стандартные свойства INPC, а затем использовали простой метод расширения для преобразования их в IObservable. Кроме того, вам потребуются настраиваемые привязки WPF для работы с вашими объектами поведения. Теперь ваша бедная команда должна изучить WPF, MVVM, Rx, а также ваш новый фреймворк.
Ли Кэмпбелл,
2
@LeeCampbell, чтобы выразить это в терминах вашего примера кода, нормальным способом было бы создание MessageListener системой (вы, вероятно, каким-то образом зарегистрируете имя класса), и вам сообщают, что затем система вызовет OnCreate () и OnGoodbye () и будет вызывать message1 (), message2 () и message3 () по мере создания сообщений. Похоже, messageX [123] вызовет OnNext по теме, но есть ли способ лучше?
Джеймс Мур
1
@JamesMoore, поскольку эти вещи намного легче объяснить на конкретных примерах. Если вы знаете приложение для Android с открытым исходным кодом, которое использует Rx и Subjects, то, возможно, я найду время, чтобы посмотреть, могу ли я предложить лучший способ. Я понимаю, что стоять на пьедестале и говорить, что предметы плохи, не очень полезно. Но я думаю, что такие вещи, как IntroToRx, RxCookbook и ReactiveTrader, дают различные уровни примеров того, как использовать Rx.
Ли Кэмпбелл,
38

В целом вам следует избегать использования Subject, однако для того, что вы здесь делаете, я думаю, они работают достаточно хорошо. Я задал аналогичный вопрос, когда наткнулся на сообщение «Избегайте тем» в руководствах по Rx.

Процитирую Дэйва Секстона (из Rxx)

«Субъекты - это компоненты Rx с отслеживанием состояния. Они полезны, когда вам нужно создать подобный событию наблюдаемый объект в виде поля или локальной переменной».

Я предпочитаю использовать их как точку входа в Rx. Поэтому, если у меня есть код, который должен сказать «что-то произошло» (как у вас), я бы использовал Subjectи call OnNext. Затем представьте это как возможность IObservableдля подписки другим пользователям (вы можете использовать это AsObservable()в своей теме, чтобы убедиться, что никто не может применить к Subject и все испортить).

Вы также можете добиться этого с помощью события .NET и использовать его FromEventPattern, но если я все IObservableравно собираюсь превратить событие в событие , я не вижу преимущества наличия события вместо Subject(что может означать, что мне не хватает что-то здесь)

Однако, чего вам следует избегать, так это подписки на IObservablea Subject, т.е. не передавать a Subjectв IObservable.Subscribeметод.

Wilka
источник
Зачем вам вообще государство? Как показал мой ответ, если вы разбиваете проблему на отдельные части, вам вообще не нужно управлять состоянием. Испытуемые не должны быть использованы в данном случае.
casperOne
8
@casperOne Вам не нужно состояние за пределами Subject <T> или события (в обоих случаях есть коллекции вещей для вызова, наблюдателей или обработчиков событий). Я просто предпочитаю использовать Subject, если единственная причина для добавления события - обернуть его FromEventPattern. Помимо изменения схемы исключений, которое может быть важным для вас, я не вижу преимуществ в том, чтобы избегать Subject таким образом. Опять же, мне может здесь не хватать чего-то еще, что предпочтительнее события, чем Тема. Упоминание о состоянии было лишь частью цитаты, и казалось, что лучше оставить его. Может быть, без этой части будет понятнее?
Вилька
@casperOne - но вы также не должны создавать событие только для того, чтобы обернуть его FromEventPattern. Очевидно, это ужасная идея.
Джеймс Мур
3
Я объяснил свою цитату более подробно в этом сообщении в блоге .
Дэйв Секстон,
Я предпочитаю использовать их как точку входа в Rx. Это попало мне в точку. У меня есть ситуация, когда есть API, который при вызове генерирует события, которые я хотел бы передать через конвейер реактивной обработки. Тема была для меня ответом, поскольку FromEventPattern, похоже, не существует в RxJava AFAICT.
scorpiodawg
31

Часто, когда вы управляете предметом, вы на самом деле просто переопределяете функции, уже существующие в Rx, и, вероятно, не таким надежным, простым и расширяемым способом.

Когда вы пытаетесь адаптировать некоторый асинхронный поток данных к Rx (или создать асинхронный поток данных из потока, который в настоящее время не является асинхронным), наиболее распространенными случаями обычно являются:

  • Источником данных является событие : как говорит Ли, это самый простой случай: используйте FromEvent и отправляйтесь в паб.

  • Источником данных является синхронная операция, и вам нужны опрашиваемые обновления (например, веб-сервис или вызов базы данных): в этом случае вы можете использовать подход, предложенный Ли, или для простых случаев вы можете использовать что-то вроде Observable.Interval.Select(_ => <db fetch>). Вы можете использовать DistinctUntilChanged (), чтобы предотвратить публикацию обновлений, когда в исходных данных ничего не изменилось.

  • Источником данных является своего рода асинхронный api, который вызывает ваш обратный вызов : в этом случае используйте Observable.Create, чтобы подключить ваш обратный вызов для вызова OnNext / OnError / OnComplete для наблюдателя.

  • Источником данных является вызов, который блокируется до тех пор, пока не станут доступны новые данные (например, некоторые синхронные операции чтения из сокета): в этом случае вы можете использовать Observable.Create, чтобы обернуть императивный код, который читает из сокета и публикует в Observer.OnNext при чтении данных. Это может быть похоже на то, что вы делаете с темой.

Использование Observable.Create по сравнению с созданием класса, который управляет Subject, довольно эквивалентно использованию ключевого слова yield по сравнению с созданием всего класса, который реализует IEnumerator. Конечно, вы можете написать IEnumerator, чтобы он был таким же чистым и хорошим гражданином, как код yield, но какой из них лучше инкапсулирован и выглядит более аккуратным? То же самое верно для Observable.Create и управления Subject.

Observable.Create дает вам чистый шаблон для ленивой установки и чистого разрыва. Как этого добиться с помощью класса, обертывающего тему? Вам нужен какой-то метод Start ... как узнать, когда его вызвать? Или вы просто всегда запускаете его, даже когда его никто не слушает? И когда вы закончите, как вы заставите его прекратить чтение из сокета / опрос базы данных и т. Д.? У вас должен быть какой-то метод Stop, и у вас все еще должен быть доступ не только к IObservable, на который вы подписаны, но и к классу, который изначально создал Subject.

С Observable.Create все это собрано в одном месте. Тело Observable.Create не запускается, пока кто-то не подпишется, поэтому, если никто не подписывается, вы никогда не используете свой ресурс. И Observable.Create возвращает Disposable, который может полностью отключить ваш ресурс / обратные вызовы и т.д. - это вызывается, когда Observer отменяет подписку. Время жизни ресурсов, которые вы используете для создания Observable, аккуратно привязано к времени жизни самого Observable.

Найл Коннотон
источник
1
Очень четкое объяснение Observable.Create. Спасибо!
Эван Моран,
1
У меня все еще есть случаи, когда я использую субъект, когда объект брокера предоставляет наблюдаемое (скажем, это просто изменяемое свойство). Различные компоненты будут вызывать брокера, сообщая, когда это свойство изменяется (с помощью вызова метода), и этот метод выполняет OnNext. Потребители подписываются. Я думаю, что в этом случае я бы использовал BehaviorSubject, это уместно?
Франк Швитерман,
1
Это зависит от ситуации. Хороший дизайн Rx имеет тенденцию преобразовывать систему в сторону асинхронной / реактивной архитектуры. Трудно полностью интегрировать небольшие компоненты реактивного кода с системой, имеющей императивный дизайн. Решающим решением является использование субъектов для превращения императивных действий (вызовов функций, наборов свойств) в наблюдаемые события. Тогда у вас останутся маленькие карманы реактивного кода и никакого настоящего «ага!» момент. Изменение дизайна для моделирования потока данных и реакции на него обычно дает лучший дизайн, но это повсеместное изменение, требующее изменения мышления и поддержки команды.
Найл Коннотон
1
Я бы сказал здесь (как неопытный Rx), что: Используя Subjects, вы можете войти в мир Rx в рамках выросшего императивного приложения и медленно его трансформировать. Также для получения первого опыта .... и, конечно же, позже измените ваш код так, как он должен был быть с самого начала (смеется). Но для начала я думаю, что стоит использовать предметы.
Robetto
9

Цитируемый текст блока в значительной степени объясняет, почему вы не должны использовать Subject<T>, но, говоря проще, вы комбинируете функции наблюдателя и наблюдаемого, вводя какое-то состояние между ними (независимо от того, инкапсулируете ли вы или расширяете).

Здесь вы столкнетесь с проблемой; эти обязанности должны быть отдельными и отличными друг от друга.

Тем не менее, в вашем конкретном случае я бы рекомендовал вам разбить ваши проблемы на более мелкие части.

Во-первых, у вас есть активный поток, и вы всегда отслеживаете оборудование для сигналов, для которых нужно создавать уведомления. Как бы вы это сделали обычно? События . Итак, начнем с этого.

Определим, EventArgsчто будет срабатывать ваше событие.

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");

        // Set values.
        BaseFrame = baseFrame;
    }

    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

Теперь класс, который запускает событие. Обратите внимание, что это может быть статический класс (так как вы всегда есть поток , работающий мониторинг аппаратного буфера), или то , что вы звоните по требованию , который подписывается на что . Вам нужно будет изменить это соответствующим образом.

public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;

    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

Итак, теперь у вас есть класс, который предоставляет событие. Наблюдаемые хорошо работают с событиями. Настолько , что есть поддержка первого класса для преобразования потоков событий (думать о потоке событий как несколько включений события) в IObservable<T>реализацию , если вы будете следовать стандартной схеме событий, через статический FromEventPatternметод на Observableклассе .

Имея источник ваших событий и FromEventPatternметод, мы можем IObservable<EventPattern<BaseFrameEventArgs>>легко создать ( EventPattern<TEventArgs>класс воплощает то, что вы видите в событии .NET, в частности, экземпляр, производный от EventArgsи объект, представляющий отправителя), например:

// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();

// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

Конечно, вам нужен IObservable<IBaseFrame>, но это легко, используя Selectметод расширения в Observableклассе для создания проекции (точно так же, как и в LINQ, и мы можем обернуть все это простым в использовании методом):

public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();

    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);

    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}
casperOne
источник
7
Спасибо за ваш ответ @casperOne, это был мой первоначальный подход, но мне показалось «неправильным» добавлять событие, чтобы я мог обернуть его Rx. В настоящее время я использую делегатов (и да, я знаю, что это именно то, что представляет собой событие!), Чтобы соответствовать коду, используемому для загрузки и сохранения конфигурации, это должно иметь возможность перестраивать конвейеры компонентов, и система делегатов дала мне больше всего гибкость. Rx сейчас вызывает у меня головную боль в этой области, но мощь всего остального фреймворка делает решение проблемы конфигурации очень полезным.
Энтони
@Anthony: Если вы можете заставить его образец кода работать, отлично, но, как я уже сказал, в этом нет смысла. Что касается чувства «неправильности», я не знаю, почему разделение вещей на логические части кажется «неправильным», но вы не дали достаточно подробностей в своем исходном сообщении, чтобы указать, как лучше всего перевести это на « IObservable<T>отсутствие информации о том, как вы» дается текущая сигнализация с этой информацией.
casperOne
@casperOne По вашему мнению, будет ли использование Subjects подходящим для агрегатора шины сообщений / событий?
kitsune
1
@kitsune Нет, я не понимаю, зачем им это делать. Если вы думаете об «оптимизации», вы должны задать вопрос, является ли это проблемой, измерили ли вы Rx как причину проблемы?
casperOne
2
Я согласен с casperOne, что разделение проблем - хорошая идея. Я хотел бы отметить, что если вы выберете шаблон Hardware to Event to Rx, вы потеряете семантику ошибок. Любые потерянные соединения или сеансы и т. Д. Не будут видны потребителю. Теперь потребитель не может решить, хочет ли он повторить попытку, отключиться, подписаться на другую последовательность или что-то еще.
Ли Кэмпбелл
0

Плохо обобщать, что субъекты не подходят для публичного интерфейса. Хотя это, безусловно, правда, что подход реактивного программирования должен выглядеть не так, это определенно хороший вариант улучшения / рефакторинга для вашего классического кода.

Если у вас есть обычное свойство с общедоступным набором средств доступа и вы хотите уведомлять об изменениях, ничто не говорит против замены его на BehaviorSubject. INPC или другие другие мероприятия не так уж чисты, и это меня утомляет. Для этого вы можете и должны использовать BehaviorSubjects в качестве общедоступных свойств вместо обычных свойств и отказаться от INPC или других событий.

Кроме того, Subject-interface делает пользователей вашего интерфейса более осведомленными о функциональности ваших свойств и с большей вероятностью подписывается, а не просто получает значение.

Лучше всего использовать, если вы хотите, чтобы другие слушали / подписывались на изменения свойства.

Феликс Кейл
источник