Каковы преимущества использования новой структуры fork / join по сравнению с простым разделением большой задачи на N подзадач вначале, отправкой их в кэшированный пул потоков (от Executors ) и ожиданием завершения каждой задачи? Я не вижу, как использование абстракции fork / join упрощает проблему или делает решение более эффективным по сравнению с тем, что у нас было в течение многих лет.
Например, распараллеленный алгоритм размытия в учебном примере можно реализовать следующим образом:
public class Blur implements Runnable {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
private int mBlurWidth = 15; // Processing window size, should be odd.
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
public void run() {
computeDirectly();
}
protected void computeDirectly() {
// As in the example, omitted for brevity
}
}
Разделить в начале и отправить задачи в пул потоков:
// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool
int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();
// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
int size = Math.min(maxSize, src.length - i);
ForkBlur task = new ForkBlur(src, i, size, dst);
Future f = threadPool.submit(task);
futures.add(f);
}
// Wait for all sent tasks to complete:
for (Future future : futures) {
future.get();
}
// Done!
Задачи попадают в очередь пула потоков, из которой они выполняются по мере того, как рабочие потоки становятся доступными. До тех пор, пока разделение является достаточно гранулярным (чтобы не приходилось особенно ждать последней задачи) и в пуле потоков имеется достаточно (по крайней мере N процессоров) потоков, все процессоры работают на полной скорости, пока не будут выполнены все вычисления.
Я что-то упускаю? В чем добавленная стоимость использования инфраструктуры fork / join?
Если у вас есть n загруженных потоков, которые работают на 100% независимо, это будет лучше, чем n потоков в пуле Fork-Join (FJ). Но так никогда не бывает.
Возможно, не удастся точно разделить задачу на n равных частей. Даже если вы это сделаете, планирование потоков будет некорректным. Вы будете ждать самого медленного потока. Если у вас несколько задач, то каждая из них может работать с менее чем n-сторонним параллелизмом (обычно более эффективным), но переходить к n-стороннему параллелизму, когда другие задачи завершены.
Так почему бы нам просто не разрезать проблему на части размером с FJ и заставить пул потоков работать над этим. Типичное использование FJ разрезает проблему на мелкие кусочки. Выполнение этого в случайном порядке требует большой координации на аппаратном уровне. Накладные расходы были бы убийственными. В FJ задачи помещаются в очередь, которую поток считывает в порядке «последним пришел - первым обслужен» (LIFO / стек), а кража работы (обычно в основной работе) выполняется в порядке очереди (FIFO / «очередь»). В результате обработка длинных массивов может выполняться в основном последовательно, даже если она разбита на крошечные фрагменты. (Также бывает, что разбить проблему на небольшие куски одинакового размера одним большим взрывом может быть нетривиально. Скажем, иметь дело с некоторой формой иерархии без балансировки.)
Вывод: FJ позволяет более эффективно использовать аппаратные потоки в нестандартных ситуациях, что всегда будет, если у вас более одного потока.
источник
maxSize
параметра в моем примере приведет к разделению подзадач почти так же, как «двоичное разбиение» в примере FJ (выполняется в рамкахcompute()
метода, который либо что-то вычисляет, либо отправляет подзадачиinvokeAll()
).Конечная цель пулов потоков и Fork / Join одинакова: оба хотят максимально использовать доступную мощность процессора для максимальной пропускной способности. Максимальная пропускная способность означает, что за длительный период времени нужно выполнить как можно больше задач. Что для этого нужно? (В дальнейшем мы будем предполагать, что недостатка в вычислительных задачах нет: всегда достаточно сделать для 100% загрузки ЦП. Кроме того, я использую «ЦП» эквивалентно для ядер или виртуальных ядер в случае гиперпоточности).
Таким образом, мы выяснили, что для максимальной пропускной способности нам нужно иметь такое же количество потоков, что и процессоров. В примере с размытием Oracle вы можете взять пул потоков фиксированного размера с количеством потоков, равным количеству доступных процессоров, или использовать пул потоков. Не будет никакой разницы, вы правы!
Итак, когда у вас возникнут проблемы с пулами потоков? Это происходит, если поток блокируется , потому что ваш поток ожидает завершения другой задачи. Предположим следующий пример:
Здесь мы видим алгоритм, который состоит из трех шагов A, B и C. A и B могут выполняться независимо друг от друга, но для шага C требуется результат шага A И B. Этот алгоритм выполняет задачу A для пул потоков и выполнить задачу b напрямую. После этого поток будет ждать выполнения задачи A и перейдет к шагу C. Если A и B выполняются одновременно, тогда все в порядке. Но что, если A занимает больше времени, чем B? Это может быть связано с тем, что природа задачи A диктует это, но также может быть так, потому что нет потока для задачи A, доступного в начале, и задача A должна ждать. (Если доступен только один процессор и, таким образом, ваш пул потоков имеет только один поток, это даже вызовет тупик, но пока это не главное). Дело в том, что поток, только что выполнивший задачу Bблокирует весь поток . Поскольку у нас такое же количество потоков, что и у процессоров, и один поток заблокирован, это означает, что один процессор простаивает .
Fork / Join решает эту проблему: в структуре fork / join вы должны написать тот же алгоритм, как показано ниже:
Выглядит так же, не правда ли? Однако подсказка в том, что
aTask.join
блокировка не будет . Вместо этого здесь вступает в игру кража работы : поток будет искать другие задачи, которые были разветвлены в прошлом, и продолжит их. Сначала он проверяет, начали ли обрабатываться разветвленные задачи. Поэтому, если A еще не был запущен другим потоком, он выполнит A следующим образом, иначе он проверит очередь других потоков и украдет их работу. Как только эта другая задача другого потока будет завершена, он проверит, завершена ли сейчас A. Если это вышеперечисленный алгоритм, можно позвонитьstepC
. В противном случае он будет искать очередную задачу украсть. Таким образом, пулы fork / join могут достичь 100% -ной загрузки ЦП даже в условиях блокирующих действий .Однако есть ловушка: кража работы возможна только по
join
вызовуForkJoinTask
s. Это невозможно сделать для действий внешней блокировки, таких как ожидание другого потока или ожидание действия ввода-вывода. Так что насчет того, что ожидание завершения ввода-вывода - обычная задача? В этом случае, если бы мы могли добавить дополнительный поток в пул Fork / Join, который будет остановлен снова, как только действие блокировки будет завершено, будет вторым лучшим вариантом. ИForkJoinPool
действительно может это сделать, если мы используемManagedBlocker
s.Фибоначчи
В JavaDoc для RecursiveTask приведен пример вычисления чисел Фибоначчи с использованием Fork / Join. Для классического рекурсивного решения см .:
Как объясняется в JavaDocs, это довольно удобный способ вычисления чисел Фибоначчи, так как этот алгоритм имеет сложность O (2 ^ n), хотя возможны более простые способы. Однако этот алгоритм очень прост и понятен, поэтому мы его придерживаемся. Предположим, мы хотим ускорить это с помощью Fork / Join. Наивная реализация выглядела бы так:
Шаги, на которые разбита эта задача, слишком короткие, и поэтому она будет работать ужасно, но вы можете увидеть, как фреймворк в целом работает очень хорошо: два слагаемых можно вычислить независимо, но тогда нам нужны оба из них, чтобы построить окончательный результат. результат. Итак, одна половина выполняется в другом потоке. Получайте удовольствие, делая то же самое с пулами потоков, не заходя в тупик (возможно, но не так просто).
Просто для полноты: если вы действительно хотите рассчитать числа Фибоначчи, используя этот рекурсивный подход, вот оптимизированная версия:
Это значительно уменьшает размер подзадач, потому что они разделяются только тогда, когда
n > 10 && getSurplusQueuedTaskCount() < 2
истинно, а это означает, что существует значительно больше, чем 100 вызовов методов для do (n > 10
), и не очень много ручных задач уже ожидают (getSurplusQueuedTaskCount() < 2
).На моем компьютере (4 ядра (8 при подсчете Hyper-threading), процессор Intel (R) Core (TM) i7-2720QM @ 2,20 ГГц)
fib(50)
занимает 64 секунды при классическом подходе и всего 18 секунд при подходе Fork / Join, который это довольно заметный выигрыш, хотя и не настолько, насколько теоретически возможно.Резюме
источник
Форк / объединение отличается от пула потоков, потому что он реализует кражу работы. От вилки / присоединения
Допустим, у вас есть два потока и 4 задачи a, b, c, d, которые занимают 1, 1, 5 и 6 секунд соответственно. Первоначально a и b назначаются потоку 1, а c и d - потоку 2. В пуле потоков это займет 11 секунд. С вилкой / соединением поток 1 завершается и может украсть работу из потока 2, поэтому задача d будет выполняться потоком 1. Поток 1 выполняет a, b и d, поток 2 только c. Общее время: 8 секунд, а не 11.
РЕДАКТИРОВАТЬ: как указывает Джунас, задачи не обязательно заранее выделяются потоку. Идея fork / join заключается в том, что поток может разделить задачу на несколько частей. Итак, чтобы повторить вышесказанное:
У нас есть две задачи (ab) и (cd), которые занимают 2 и 11 секунд соответственно. Поток 1 начинает выполнение ab и разбивает его на две подзадачи a и b. Аналогично потоку 2 он разбивается на две подзадачи c и d. Когда поток 1 завершит a & b, он может украсть d из потока 2.
источник
compute()
либо вычисляет задачу, либо разбивает ее на две подзадачи. Какой вариант он выберет, зависит только от размера задачи (if (mLength < sThreshold)...
), поэтому это просто модный способ создания фиксированного количества задач. Для изображения размером 1000x1000 будет ровно 16 подзадач, которые действительно что-то вычисляют. Кроме того, будет 15 (= 16 - 1) «промежуточных» задач, которые только генерируют и вызывают подзадачи и сами ничего не вычисляют.computeDirectly()
метод, у меня больше нет возможности украсть что-либо. Все расщепление делается априори , по крайней мере, в примере.Все вышеперечисленные правы, выгоды достигаются за счет кражи работы, но подробнее почему это так.
Основное преимущество - эффективная координация между рабочими потоками. Работа должна быть разделена и собрана заново, что требует координации. Как вы можете видеть в ответе AH выше, у каждого потока есть свой собственный рабочий список. Важным свойством этого списка является то, что он отсортирован (большие задачи вверху, а маленькие задачи внизу). Каждый поток выполняет задачи из нижней части своего списка и крадет задачи из верхней части других списков потоков.
Результат этого:
Большинство других схем «разделяй и властвуй», использующие пулы потоков, требуют большего взаимодействия и координации между потоками.
источник
В этом примере Fork / Join не добавляет значения, потому что разветвление не требуется и рабочая нагрузка равномерно распределяется между рабочими потоками. Fork / Join только добавляет накладные расходы.
Вот хорошая статья на эту тему. Quote:
источник
Еще одно важное отличие состоит в том, что с FJ вы можете выполнять несколько сложных этапов «соединения». Рассмотрим сортировку слиянием из http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html , для предварительного разделения этой работы потребуется слишком много оркестровки. Например, вам нужно сделать следующее:
Как вы указываете, что должны выполнять сортировку перед объединением, которое их касается и т. Д.
Я искал, как лучше всего сделать определенную вещь для каждого элемента из списка. Думаю, я просто предварительно разделю список и воспользуюсь стандартным ThreadPool. FJ кажется наиболее полезным, когда работа не может быть предварительно разделена на достаточно независимых задач, но может быть рекурсивно разделена на задачи, которые независимы между собой (например, сортировка половинок независима, а объединение 2 отсортированных половин в отсортированное целое - нет).
источник
F / J также имеет явное преимущество, когда у вас есть дорогостоящие операции слияния. Поскольку он разбивается на древовидную структуру, вы выполняете только слияние log2 (n), а не n слияний с линейным разделением потоков. (Это делает теоретическое предположение, что у вас столько же процессоров, сколько потоков, но все же преимущество) Для домашнего задания нам пришлось объединить несколько тысяч 2D-массивов (все одинаковые размеры), суммируя значения по каждому индексу. С процессорами fork join и P время приближается к log2 (n), когда P приближается к бесконечности.
1 2 3 .. 7 3 1 .... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 .... 8 9 9
источник
Вы будете поражены производительностью ForkJoin в таких приложениях, как краулер. вот лучший учебник, из которого вы могли бы научиться.
источник
Если проблема такова, что нам нужно дождаться завершения других потоков (как в случае сортировки массива или суммы массива), следует использовать соединение fork, поскольку Executor (Executors.newFixedThreadPool (2)) будет подавляться из-за ограниченного количество потоков. В этом случае пул forkjoin создаст больше потоков, чтобы покрыть заблокированный поток, чтобы поддерживать тот же параллелизм.
Источник: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
Проблема с исполнителями для реализации алгоритмов «разделяй и властвуй» не связана с созданием подзадач, потому что Callable может отправить новую подзадачу своему исполнителю и дождаться ее результата синхронно или асинхронно. Проблема заключается в параллелизме: когда вызываемый объект ожидает результата другого вызываемого объекта, он переводится в состояние ожидания, тем самым теряя возможность обработать другой вызываемый объект, поставленный в очередь на выполнение.
Фреймворк fork / join, добавленный в пакет java.util.concurrent в Java SE 7 усилиями Дуга Ли, заполняет этот пробел.
Источник: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html
Пул пытается поддерживать достаточное количество активных (или доступных) потоков, динамически добавляя, приостанавливая или возобновляя внутренние рабочие потоки, даже если некоторые задачи остановлены в ожидании присоединения к другим. Однако такие корректировки не гарантируются в случае заблокированного ввода-вывода или другой неуправляемой синхронизации.
public int getPoolSize () Возвращает количество запущенных, но еще не завершенных рабочих потоков. Результат, возвращаемый этим методом, может отличаться от getParallelism (), когда потоки создаются для поддержания параллелизма, когда другие совместно блокируются.
источник
Я хотел бы добавить короткий ответ для тех, у кого мало времени читать длинные ответы. Сравнение взято из книги Applied Akka Patterns:
источник