Я смотрел на новый rx java 2, и я не совсем уверен, что понимаю идею backpressure
...
Я знаю, что у нас есть Observable
то, у кого нет backpressure
поддержки, а у Flowable
того есть.
Так на основе , например, позволяет сказать , что у меня есть flowable
с interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Это выйдет из строя примерно после 128 значений, и это довольно очевидно, что я потребляю медленнее, чем получаю предметы.
Но тогда у нас то же самое с Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Это вообще не приведет к сбою, даже если я отложу потребление, он все еще работает. Чтобы заставить Flowable
работать, скажем, я поставил onBackpressureDrop
оператор, сбой исчез, но также не все значения выдаются.
Итак, основной вопрос, на который я не могу найти ответа в настоящее время в моей голове, заключается в том, почему я должен заботиться о том, backpressure
когда я могу использовать plain, Observable
все равно получать все значения без управления buffer
? Или, может быть, с другой стороны, какие преимущества backpressure
дает мне управление и управление потреблением?
Ответы:
На практике противодавление проявляется в ограниченных буферах
Flowable.observeOn
с буфером из 128 элементов, который истощается так быстро, как это может принять нисходящий поток. Вы можете увеличивать этот размер буфера индивидуально для обработки импульсного источника, и все методы управления противодавлением по-прежнему применяются с версии 1.x.Observable.observeOn
имеет неограниченный буфер, который продолжает собирать элементы, и вашему приложению может не хватить памяти.Вы можете использовать,
Observable
например:Вы можете использовать,
Flowable
например:источник
Maybe
,Single
иCompletable
может всегда использоваться вместо того , чтобы,Flowable
когда они семантически соответствующие?Maybe
,Single
иCompletable
являются далеко слишком малы , чтобы иметь какой - либо необходимости концепции противодавления. Нет никакого шанса, что производитель может выпускать предметы быстрее, чем они могут быть потреблены, поскольку 0–1 предмет когда-либо будет произведен или потреблен.Обратное давление - это когда ваш наблюдаемый (издатель) создает больше событий, чем может обработать ваш подписчик. Таким образом, вы можете заставить подписчиков пропустить события или получить огромную очередь событий, которая в конечном итоге приводит к нехватке памяти.
Flowable
учитывает противодавление.Observable
не. Это оно.это напоминает мне воронку, которая, когда в ней слишком много жидкости, переливается. Flowable может помочь избежать этого:
с огромным противодавлением:
но при использовании текучей среды обратное давление намного меньше:
Rxjava2 имеет несколько стратегий противодавления, которые вы можете использовать в зависимости от вашего сценария использования. Под стратегией я подразумеваю, что Rxjava2 предоставляет способ обработки объектов, которые не могут быть обработаны из-за переполнения (противодавления).
вот стратегии. Я не буду перечислять их все, но, например, если вы не хотите беспокоиться о переполненных элементах, вы можете использовать такую стратегию сброса:
observable.toFlowable (BackpressureStrategy.DROP)
Насколько я знаю, в очереди должно быть ограничение в 128 элементов, после чего может произойти переполнение (противодавление). Даже если это не 128, оно близко к этому числу. Надеюсь, это кому-то поможет.
если вам нужно изменить размер буфера со 128, похоже, это можно сделать следующим образом (но обратите внимание на любые ограничения памяти:
при разработке программного обеспечения обычно стратегия обратного давления означает, что вы говорите эмиттеру немного замедлить, поскольку потребитель не может справиться со скоростью, с которой вы излучаете события.
источник
Тот факт, что у вас
Flowable
произошел сбой после выдачи 128 значений без обработки противодавления, не означает, что он всегда будет аварийно завершать работу ровно после 128 значений: иногда он вылетает после 10, а иногда вообще не падает. Я считаю, что это то, что произошло, когда вы попробовали пример сObservable
- не было никакого противодавления, поэтому ваш код работал нормально, в следующий раз может не работать. Разница в RxJava 2 заключается в том, что вObservable
s больше нет концепции противодавления и нет способа справиться с этим. Если вы разрабатываете реактивную последовательность, которая, вероятно, потребует явной обработки противодавления -Flowable
это ваш лучший выбор.источник
interval
безbackpressure
, ожидать ли я какого-то странного поведения или проблем?