Можно ли указать пользовательский пул потоков для параллельного потока Java 8 ? Я не могу найти это нигде.
Представьте, что у меня есть серверное приложение, и я хотел бы использовать параллельные потоки. Но приложение большое и многопоточное, поэтому я хочу разделить его. Я не хочу медленное выполнение задачи в одном модуле задач приложения блока из другого модуля.
Если я не могу использовать разные пулы потоков для разных модулей, это означает, что я не могу безопасно использовать параллельные потоки в большинстве реальных ситуаций.
Попробуйте следующий пример. Есть несколько задач с интенсивным использованием процессора, выполняемых в отдельных потоках. Задачи используют параллельные потоки. Первое задание не выполняется, поэтому каждый шаг занимает 1 секунду (имитируется спящий поток). Проблема в том, что другие потоки застревают и ждут, пока не завершится неработающая задача. Это надуманный пример, но представьте себе приложение сервлета и того, кто отправляет долгосрочную задачу в общий пул соединений ветвлений.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
Ответы:
На самом деле есть хитрость в том, как выполнить параллельную операцию в определенном пуле разветвления. Если вы выполняете его как задачу в пуле разветвления, он остается там и не использует общий.
Уловка основана на ForkJoinTask.fork, который указывает: «Обеспечивает асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача, если это применимо, или с использованием ForkJoinPool.commonPool (), если не inForkJoinPool ()»
источник
ForkJoinPool
или это детали реализации? Ссылка на документацию была бы хороша.ForkJoinPool
экземпляр должен быть,shutdown()
когда он больше не нужен, чтобы избежать утечки потока. (пример)Параллельные потоки используют значение по умолчанию
ForkJoinPool.commonPool
которое по умолчанию имеет на один меньше, чем у вас процессоров , как это возвращаетRuntime.getRuntime().availableProcessors()
(это означает, что параллельные потоки используют все ваши процессоры, потому что они также используют основной поток):Это также означает, что если у вас есть параллельные потоки или несколько параллельных потоков, запущенных одновременно, все они будут использовать один и тот же пул. Преимущество: вы никогда не будете использовать больше, чем по умолчанию (количество доступных процессоров). Недостаток: вы можете не получить «все процессоры», назначенные каждому параллельному потоку, который вы инициируете (если у вас их больше одного). (Очевидно, вы можете использовать ManagedBlocker, чтобы обойти это.)
Чтобы изменить способ выполнения параллельных потоков, вы можете либо
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
илиSystem.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
для целевого параллелизма 20 потоков. Тем не менее, это больше не работает после исправленного патча https://bugs.openjdk.java.net/browse/JDK-8190974 .Пример последнего на моей машине, которая имеет 8 процессоров. Если я запускаю следующую программу:
Выход:
Итак, вы можете видеть, что параллельный поток обрабатывает 8 элементов одновременно, то есть использует 8 потоков. Однако, если я раскомментирую закомментированную строку, результат будет:
На этот раз параллельный поток использовал 20 потоков, и все 20 элементов в потоке были обработаны одновременно.
источник
commonPool
Имеет фактически один меньшеavailableProcessors
, что приводит к полной параллельности равняться ,availableProcessors
потому вызывающими подсчеты нитей , как один.ForkJoinTask
. Для подражанияparallel()
get()
необходимо:stream.parallel().forEach(soSomething)).get();
ForkJoinPool.submit(() -> stream.forEach(...))
мои потоки будут работать с данными даннымиForkJoinPool
. Я ожидаю, что все Stream-Action выполняется в ForJoinPool как ОДНО действие, но внутренне все еще использует стандартный / общий ForkJoinPool. Где вы видели, что ForkJoinPool.submit () будет делать то, что вы говорите, что делает?В качестве альтернативы хитрости запуска параллельных вычислений внутри вашего собственного forkJoinPool вы также можете передать этот пул в метод CompletableFuture.supplyAsync, как показано в:
источник
Исходное решение (установка свойства общего параллелизма ForkJoinPool) больше не работает. Глядя на ссылки в исходном ответе, обновление, которое ломает это, было обратно перенесено на Java 8. Как упоминалось в связанных потоках, это решение не гарантировало работать вечно. Исходя из этого, решение представляет собой forkjoinpool.submit с решением .get, обсуждаемым в принятом ответе. Я думаю, что backport также исправляет ненадежность этого решения.
источник
ForkJoinPool.commonPool().getParallelism()
в режиме отладки.unreported exception InterruptedException; must be caught or declared to be thrown
даже со всемиcatch
исключениями в цикле.Мы можем изменить параллелизм по умолчанию, используя следующее свойство:
который может быть настроен на использование большего параллелизма.
источник
Чтобы измерить фактическое количество используемых потоков, вы можете проверить
Thread.activeCount()
:Это может привести к 4-ядерному процессору, например:
Без
.parallel()
этого дает:источник
До сих пор я использовал решения, описанные в ответах на этот вопрос. Для этого я разработал небольшую библиотеку под названием « Поддержка параллельного потока» :
Но, как отметил @PabloMatiasGomez в комментариях, существуют недостатки в отношении механизма разделения параллельных потоков, который сильно зависит от размера общего пула. См. Параллельный поток из HashSet не работает параллельно .
Я использую это решение только для того, чтобы иметь отдельные пулы для разных типов работы, но я не могу установить размер общего пула равным 1, даже если я его не использую.
источник
Замечания: Похоже, в JDK 10 реализовано исправление, обеспечивающее использование ожидаемого числа потоков в пуле пользовательских потоков.
Параллельное выполнение потока в пользовательском ForkJoinPool должно подчиняться параллелизму https://bugs.openjdk.java.net/browse/JDK-8190974
источник
Я попробовал пользовательский ForkJoinPool следующим образом, чтобы настроить размер пула:
Вот вывод о том, что пул использует больше потоков, чем по умолчанию 4 .
Но на самом деле есть чудак , когда я пытался добиться того же результата, используя
ThreadPoolExecutor
следующее:но я потерпел неудачу.
Он только запустит параллельный поток в новом потоке, а затем все остальное будет таким же, что еще раз доказывает, что он
parallelStream
будет использовать ForkJoinPool для запуска своих дочерних потоков.источник
Иди, чтобы получить AbacusUtil . Номер потока может быть указан для параллельного потока. Вот пример кода:
Раскрытие информации: я разработчик AbacusUtil.
источник
Если вы не хотите полагаться на хаки реализации, всегда есть способ добиться того же самого путем реализации пользовательских сборщиков, которые будут комбинировать
map
иcollect
семантику ... и вы не будете ограничены ForkJoinPool:К счастью, это уже сделано и доступно на Maven Central: http://github.com/pivovarit/parallel-collectors
Отказ от ответственности: я написал это и беру на себя ответственность за это.
источник
Если вы не возражаете против использования сторонней библиотеки, с помощью cyclops-реагировать вы можете смешивать последовательные и параллельные потоки в одном конвейере и предоставлять собственные ForkJoinPools. Например
Или, если мы хотим продолжить обработку в последовательном потоке
[Раскрытие Я ведущий разработчик циклоп-реакции]
источник
Если вам не нужен пользовательский ThreadPool, но вы хотите ограничить количество одновременных задач, вы можете использовать:
(Дубликат вопроса об этом заблокирован, поэтому, пожалуйста, несите меня сюда)
источник
Вы можете попробовать реализовать этот ForkJoinWorkerThreadFactory и внедрить его в класс Fork-Join.
Вы можете использовать этот конструктор пула Fork-Join для этого.
примечания: - 1. если вы используете это, примите во внимание, что на основе вашей реализации новых потоков это повлияет на планирование из JVM, которое обычно планирует потоки fork-join к различным ядрам (рассматриваются как вычислительный поток). 2. Планирование задач с помощью fork-join к потокам не пострадает. 3. На самом деле не понял, как параллельный поток выбирает потоки из fork-join (не смог найти соответствующую документацию по нему), поэтому попробуйте использовать другую фабрику threadNaming, чтобы убедиться, что потоки в параллельном потоке выбираются от customThreadFactory, которую вы предоставляете. 4. commonThreadPool не будет использовать этот customThreadFactory.
источник