Варианты использования для планировщиков RxJava

253

В RxJava есть 5 разных планировщиков на выбор:

  1. немедленный () : создает и возвращает планировщик, который выполняет работу немедленно в текущем потоке.

  2. trampoline () : создает и возвращает планировщик, который ставит в очередь работу в текущем потоке, который будет выполнен после завершения текущей работы.

  3. newThread () : создает и возвращает планировщик, который создает новый поток для каждой единицы работы.

  4. computation () : создает и возвращает планировщик, предназначенный для вычислительной работы. Это может быть использовано для обработки событий, обработки обратных вызовов и другой вычислительной работы. Не выполняйте связанную с IO работу с этим планировщиком. Используйте планировщики. IO () вместо.

  5. io () : создает и возвращает планировщик, предназначенный для работы, связанной с вводом-выводом. Реализация поддерживается пулом потоков Executor, который будет расти по мере необходимости. Это может быть использовано для асинхронного выполнения блокировки ввода-вывода. Не выполняйте вычислительную работу с этим планировщиком. Используйте планировщики. вместо вычисления () .

Вопросы:

Первые 3 планировщика говорят сами за себя; Однако, я немного смущен о вычислении и гипергликемии .

  1. Что такое «работа, связанная с IO»? Используется ли он для работы с streams ( java.io) и files ( java.nio.files)? Используется ли он для запросов к базе данных? Используется ли он для загрузки файлов или доступа к API REST?
  2. Как вычисление () отличается от newThread () ? Все ли вызовы computation () каждый раз выполняются в одном (фоновом) потоке, а не в новом (фоновом)?
  3. Почему плохо вызывать computation () при выполнении операций ввода-вывода?
  4. Почему плохо вызывать io () при выполнении вычислительной работы?
bcorso
источник

Ответы:

332

Отличные вопросы, я думаю, что документация могла бы быть более подробной.

  1. io()поддерживается неограниченным пулом потоков и это то, что вы бы использовали для задач, не требующих большого объема вычислений, то есть то, что не сильно нагружает процессор. Таким образом, да, взаимодействие с файловой системой, взаимодействие с базами данных или службами на другом хосте являются хорошими примерами.
  2. computation()поддерживается ограниченным пулом потоков с размером, равным количеству доступных процессоров. Если вы пытались планировать интенсивную работу ЦП параллельно между более чем доступными процессорами (скажем, используяnewThread() ), тогда вы готовы к накладным расходам на создание потоков и переключению контекста, поскольку потоки соперничают за процессор, и это потенциально может сильно снизить производительность.
  3. Лучше всего уйти computation() на интенсивную работу процессора, иначе вы не получите хорошую загрузку процессора.
  4. Плохо требовать io()вычислительной работы, потому что причина, обсуждаемая в 2. io(), неограничена, и если вы запланируете тысячу вычислительных задач io()параллельно, то каждая из этих тысяч задач будет иметь свой собственный поток и будет конкурировать за ЦП, что влечет за собой затраты на переключение контекста.
Дейв Мотен
источник
5
Благодаря знакомству с источником RxJava. Это было источником путаницы для меня долгое время, и я думаю, что документация должна быть увеличена в этом отношении.
Дейв Мотен
2
@IgorGanapolsky Я думаю, это то, что вы редко хотели бы сделать. Создание нового потока для каждой единицы работы редко способствует эффективности, поскольку создавать и разрушать потоки дорого. Обычно вы хотите повторно использовать потоки, которые выполняет computation () и другие планировщики. Единственное время, когда newThread () может иметь законное использование (по крайней мере, я могу придумать), это запуск отдельных, нечастых и длительных задач. Даже тогда я мог бы использовать io () для этого сценария.
tmn
4
Не могли бы вы показать пример, где trampoline () будет полезен? Я понимаю концепцию, но не могу понять сценарий, который бы использовал на практике. Это единственный планировщик, который до сих пор остается для меня загадкой
tmn
32
Для сетевых вызовов используйте Schedulers.io (), а если вам нужно ограничить количество одновременных сетевых вызовов, используйте Scheduler.from (Executors.newFixedThreadPool (n)).
Дейв Мотен
4
Вы можете подумать, что установка timeoutпо умолчанию на computation()вас будет блокировать поток, но это не так. Под прикрытиями computation()используются ScheduledExecutorServiceтак запаздывающие действия, которые не блокируют. Учитывая этот факт computation(), это хорошая идея, потому что, если бы он был в другом потоке, мы были бы подвержены затратам на переключение потоков.
Дейв Мотен
3

Наиболее важным моментом является то, что и Schedulers.io, и Schedulers.computation поддерживаются неограниченными пулами потоков, в отличие от других, упомянутых в вопросе. Эта характеристика является общей для Schedulers.from (Executor) только в том случае, если Исполнитель создан с помощью newCachedThreadPool (без ограничений с пулом потоков с автоматическим возвратом ).

Как обильно объяснено в предыдущих ответах и ​​многочисленных статьях в Интернете, Schedulers.io и Schedulers.computation должны использоваться осторожно, поскольку они оптимизированы для типа работы от их имени. Но, на мой взгляд, они играют важнейшую роль в обеспечении реального параллелизма реактивным потокам .

Вопреки убеждению новичков, реактивные потоки по своей природе не параллельны, а асинхронны и последовательны. По этой самой причине Schedulers.io должен использоваться только тогда, когда операция ввода-вывода блокируется (например, с помощью команды блокировки, такой как Apache IOUtils FileUtils.readFileAsString (...) ), таким образом, будет вызывающий поток до тех пор, пока операция не будет завершена. сделано.

Использование асинхронного метода, такого как Java AsynchronousFileChannel (...) не будет блокировать вызывающий поток во время операции, поэтому нет смысла использовать отдельный поток. На самом деле, Schedulers.io потоки не очень подходят для асинхронных операций, так как они не запускают цикл обработки событий и обратный вызов никогда ... не будет вызван.

Та же логика применяется для доступа к базе данных или удаленных вызовов API. Не используйте Schedulers.io если вы можете использовать асинхронный или реактивный API для выполнения вызова.

Вернуться к параллелизму. У вас может не быть доступа к асинхронному или реактивному API для асинхронного или одновременного выполнения операций ввода-вывода, поэтому единственной альтернативой является отправка нескольких вызовов в отдельном потоке. Увы, реактивные потоки являются последовательными на своих концах, но хорошая новость заключается в том, что оператор flatMap () может вводить параллелизм в своей основе .

Параллелизм должен быть встроен в потоковую конструкцию, обычно используя оператор flatMap () . Этот мощный оператор может быть настроен на внутреннее предоставление многопоточного контекста для встроенной функции flatMap () <T, R>. Этот контекст предоставляется многопоточным планировщиком, таким как Scheduler.io или Scheduler.computation. .

Найти больше деталей в статьях на RxJava2 планировщиках и параллелизмегде вы найдете пример кода и подробные объяснения о том, как использовать планировщики последовательно и одновременно.

Надеюсь это поможет,

Softjake

softjake
источник
2

Этот блог дает отличный ответ

Из поста в блоге:

Schedulers.io () поддерживается неограниченным пулом потоков. Он используется для операций ввода-вывода, не требующих интенсивного использования процессора, включая взаимодействие с файловой системой, выполнение сетевых вызовов, взаимодействие с базой данных и т. Д. Этот пул потоков предназначен для асинхронного блокирующего ввода-вывода.

Schedulers.computation () поддерживается ограниченным пулом потоков размером до числа доступных процессоров. Он используется для вычислительной или ресурсоемкой работы, такой как изменение размера изображений, обработка больших наборов данных и т. Д. Будьте осторожны: когда вы выделяете больше вычислительных потоков, чем доступных ядер, производительность снижается из-за переключения контекста и накладных расходов на создание потоков, поскольку потоки соперничают за время процессоров.

Schedulers.newThread () создает новый поток для каждой запланированной единицы работы. Этот планировщик дорог, так как каждый раз создается новый поток, и повторное использование не происходит.

Schedulers.from (Executor executor) создает и возвращает пользовательский планировщик, поддерживаемый указанным исполнителем. Чтобы ограничить количество одновременных потоков в пуле потоков, используйте Scheduler.from (Executors.newFixedThreadPool (n)). Это гарантирует, что если задача запланирована, когда все потоки заняты, она будет поставлена ​​в очередь. Потоки в пуле будут существовать до тех пор, пока он не будет явно отключен.

Основной поток или AndroidSchedulers.mainThread () предоставляется библиотекой расширений RxAndroid для RxJava. Основной поток (также известный как поток пользовательского интерфейса) - это место, где происходит взаимодействие с пользователем. Следует позаботиться о том, чтобы не перегружать этот поток, чтобы не допустить дергания неотвечающего пользовательского интерфейса или, что еще хуже, диалога «приложение не отвечает» (ANR).

Schedulers.single () является новым в RxJava 2. Этот планировщик поддерживается одним потоком, выполняющим задачи последовательно в запрошенном порядке.

Schedulers.trampoline () выполняет задачи в порядке FIFO (First In, First Out) одним из участвующих рабочих потоков. Он часто используется при реализации рекурсии, чтобы избежать увеличения стека вызовов.

Джо
источник