Некоторое время я был разочарован поведением по умолчанию, ThreadPoolExecutor
которое поддерживает ExecutorService
пулы потоков, которые используют многие из нас. Цитата из Javadocs:
Если количество запущенных потоков больше corePoolSize, но меньше maximumPoolSize, новый поток будет создан только в том случае, если очередь заполнена .
Это означает, что если вы определите пул потоков с помощью следующего кода, он никогда не запустит 2-й поток, потому что LinkedBlockingQueue
он неограничен.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Только если у вас есть ограниченная очередь и она заполнена , запускаются любые потоки с номером ядра выше. Я подозреваю, что большое количество молодых многопоточных программистов на Java не знают о таком поведении ThreadPoolExecutor
.
Теперь у меня есть конкретный вариант использования, когда это неоптимально. Я ищу способы обойти это без написания собственного класса TPE.
Мои требования касаются веб-службы, которая выполняет обратный вызов, возможно, ненадежной третьей стороне.
- Я не хочу выполнять обратный вызов синхронно с веб-запросом, поэтому я хочу использовать пул потоков.
- Я обычно получаю пару таких в минуту, поэтому я не хочу иметь
newFixedThreadPool(...)
большое количество потоков, которые в основном бездействуют. - Время от времени я получаю всплеск этого трафика, и я хочу увеличить количество потоков до некоторого максимального значения (скажем, 50).
- Мне нужно сделать все возможное, чтобы выполнить все обратные вызовы, поэтому я хочу поставить в очередь любые дополнительные, превышающие 50. Я не хочу перегружать остальную часть моего веб-сервера, используя
newCachedThreadPool()
.
Как я могу обойти это ограничение, ThreadPoolExecutor
когда очередь должна быть ограничена и заполнена до того, как будут запущены другие потоки? Как я могу заставить его запускать больше потоков перед постановкой задач в очередь?
Редактировать:
@Flavio хорошо замечает использование тега ThreadPoolExecutor.allowCoreThreadTimeOut(true)
для тайм-аута и выхода основных потоков. Я подумал об этом, но мне все еще нужна функция core-thread. Я не хотел, чтобы количество потоков в пуле упало ниже размера ядра, если это возможно.
Ответы:
Я считаю, что наконец-то нашел несколько элегантное (возможно, немного хакерское) решение этого ограничения с помощью
ThreadPoolExecutor
. Она включает в себя расширение ,LinkedBlockingQueue
чтобы он вернутьсяfalse
кqueue.offer(...)
когда уже есть некоторые задачи , поставленные в очередь. Если текущие потоки не успевают за задачами в очереди, TPE добавит дополнительные потоки. Если пул уже имеет максимальное количество потоков,RejectedExecutionHandler
будет вызываться. Затем обработчик помещает запросput(...)
в очередь.Конечно, странно писать очередь, в которой
offer(...)
можно возвращатьсяfalse
иput()
никогда не блокироваться, так что это часть взлома. Но это хорошо работает с использованием очереди TPE, поэтому я не вижу в этом никаких проблем.Вот код:
С помощью этого механизма, когда я отправляю задачи в очередь,
ThreadPoolExecutor
они:offer(...)
функция вернет false.RejectedExecutionHandler
RejectedExecutionHandler
то ставит задачу в очередь для обработки первого доступного потока в FIFO порядке.Хотя в моем примере кода выше очередь не ограничена, вы также можете определить ее как ограниченную очередь. Например, если вы добавите емкость 1000 к,
LinkedBlockingQueue
то это будет:Кроме того, если вам нужно использовать
offer(...)
в,RejectedExecutionHandler
вы можете использоватьoffer(E, long, TimeUnit)
метод вместоLong.MAX_VALUE
тайм-аута.Предупреждение:
Если вы ожидаете, что задачи будут добавлены к исполнителю после того, как он был выключен, тогда вы можете быть более умными, выбрасывая
RejectedExecutionException
нашу кастомную программу,RejectedExecutionHandler
когда служба-исполнитель была выключена. Спасибо @RaduToader за указание на это.Редактировать:
Еще одна настройка этого ответа может заключаться в том, чтобы спросить TPE, есть ли незанятые потоки, и поставить элемент в очередь только в том случае, если он есть. Вам нужно будет создать для этого настоящий класс и добавить к нему
ourQueue.setThreadPoolExecutor(tpe);
метод.Тогда ваш
offer(...)
метод может выглядеть примерно так:tpe.getPoolSize() == tpe.getMaximumPoolSize()
в каком случае просто позвонитеsuper.offer(...)
.tpe.getPoolSize() > tpe.getActiveCount()
вызовите if,super.offer(...)
поскольку кажется, что есть незанятые потоки.false
к вилке другого потока.Может быть, это:
Обратите внимание, что методы get в TPE дороги, поскольку они обращаются к
volatile
полям или (в случаеgetActiveCount()
) блокируют TPE и просматривают список потоков. Кроме того, здесь есть условия гонки, которые могут привести к неправильной постановке задачи в очередь или разветвлению другого потока, когда был незанятый поток.источник
Queue
ради достижения этой цели, вы определенно не одиноки в своей идее: groovy-programming.com/post/26923146865execute(runnable)
, онrunnable
просто добавляется в очередь. Если звонишьexecute(secondRunnable)
, тоsecondRunnable
добавляется в очередь. Но если сейчас позвонитеexecute(thirdRunnable)
, тоthirdRunnable
будет запущен новый поток. Выполнениеrunnable
иsecondRunnable
только один разthirdRunnable
(или исходная длительная задача) завершены.Установите одинаковое значение для размера ядра и максимального размера и разрешите удаление основных потоков из пула с помощью
allowCoreThreadTimeOut(true)
.источник
У меня уже есть два других ответа на этот вопрос, но я подозреваю, что это лучший.
Он основан на методе принятого в настоящее время ответа , а именно:
offer()
чтобы (иногда) возвращать false,ThreadPoolExecutor
либо порождать новый поток, либо отклонять задачу, иRejectedExecutionHandler
чтобы фактически поставить задачу в очередь при отклонении.Проблема в том, когда
offer()
должно возвращаться false. Принятый в настоящее время ответ возвращает false, если в очереди есть несколько задач, но, как я указал в своем комментарии, это вызывает нежелательные эффекты. В качестве альтернативы, если вы всегда возвращаете false, вы продолжите порождать новые потоки, даже если у вас есть потоки, ожидающие в очереди.Решение - использовать Java 7
LinkedTransferQueue
и иметьoffer()
calltryTransfer()
. Когда есть ожидающий потребительский поток, задача просто передается этому потоку. В противном случаеoffer()
вернет false иThreadPoolExecutor
создаст новый поток.источник
queue.offer()
, поскольку он действительно вызываетLinkedTransferQueue.tryTransfer()
, вернет false и не поставит задачу в очередь. ОднакоRejectedExecutionHandler
вызовыqueue.put()
, которые не терпят неудачу и ставят задачу в очередь.Примечание: теперь я предпочитаю и рекомендую свой другой ответ .
Вот версия, которая кажется мне более простой: увеличивайте corePoolSize (до предела maximumPoolSize) всякий раз, когда выполняется новая задача, затем уменьшайте corePoolSize (до предела, указанного пользователем «размер основного пула»), когда задача завершена.
Другими словами, отслеживайте количество запущенных или поставленных в очередь задач и убедитесь, что corePoolSize равен количеству задач, пока он находится между заданным пользователем «размером основного пула» и максимальным размером пула.
Как написано, класс не поддерживает изменение заданного пользователем corePoolSize или maximumPoolSize после построения и не поддерживает управление рабочей очередью напрямую или через
remove()
илиpurge()
.источник
synchronized
блоков. Можете ли вы дозвониться до очереди, чтобы узнать количество задач. Или, может быть, использоватьAtomicInteger
?execute()
вызовов в отдельных потоках, каждый из них (1) вычислит, сколько потоков необходимо, (2)setCorePoolSize
до этого числа и (3) вызоветsuper.execute()
. Если шаги (1) и (2) не синхронизированы, я не уверен, как предотвратить неудачный порядок, когда вы устанавливаете размер основного пула на меньшее число после большего числа. При прямом доступе к полю суперкласса это можно было бы сделать с помощью метода сравнения и установки, но я не вижу чистого способа сделать это в подклассе без синхронизации.taskCount
поле является действительным (т.е. aAtomicInteger
). Если два потока пересчитывают размер пула сразу после друг друга, он должен получить правильные значения. Если второй сжимает основные потоки, значит, он видел падение очереди или что-то в этом роде.execute()
. Каждый позвонит,atomicTaskCount.incrementAndGet()
и они получат 10 и 11 соответственно. Но без синхронизации (после получения количества задач и установки размера основного пула) вы можете получить: (1) задача 11 устанавливает размер основного пула равным 11, (2) задача 10 устанавливает размер основного пула равным 10, (3) задача 10 вызываетsuper.execute()
, (4) задача 11 вызываетsuper.execute()
и ставится в очередь.У нас есть подкласс,
ThreadPoolExecutor
который принимает дополнительныеcreationThreshold
и переопределяетexecute
.может быть, это тоже помогает, но ваш, конечно, выглядит более вычурно…
источник
offer(...)
метод возвращался толькоfalse
условно. Спасибо!Рекомендуемый ответ решает только одну (1) проблему с пулом потоков JDK:
Пулы потоков JDK смещены в сторону очередей. Поэтому вместо создания нового потока они поставят задачу в очередь. Только если очередь достигает своего предела, пул потоков порождает новый поток.
Ухода нити не происходит при уменьшении нагрузки. Например, если у нас есть всплеск заданий, попадающих в пул, что приводит к тому, что пул достигает максимума, за которым следует небольшая нагрузка максимум 2 задачи за раз, пул будет использовать все потоки для обслуживания небольшой нагрузки, предотвращающей выход из эксплуатации. (потребуется всего 2 потока…)
Недовольный описанным выше поведением, я пошел дальше и реализовал пул, чтобы преодолеть указанные выше недостатки.
Решение 2) Использование расписания Lifo решает проблему. Эту идею представил Бен Маурер на конференции ACM Applicative 2015: Systems @ Facebook scale.
Так родилась новая реализация:
LifoThreadPoolExecutorSQP
Пока эта реализация улучшает производительность асинхронного выполнения для ZEL .
Эта реализация позволяет сократить накладные расходы на переключение контекста, обеспечивая превосходную производительность для определенных случаев использования.
Надеюсь, поможет...
PS: JDK Fork Join Pool реализует ExecutorService и работает как «обычный» пул потоков, реализация является производительной, она использует планирование потоков LIFO, однако нет контроля над размером внутренней очереди, тайм-аутом вывода на пенсию ... и, что наиболее важно, задачи не могут быть прерывается при их отмене
источник
Примечание: теперь я предпочитаю и рекомендую свой другой ответ .
У меня есть еще одно предложение, продолжающее первоначальную идею изменить очередь, чтобы вернуть false. В этом случае все задачи могут поступать в очередь, но всякий раз, когда задача ставится в очередь после
execute()
, мы следуем за ней с помощью дозорной задачи без операции, которую очередь отклоняет, вызывая порождение нового потока, который немедленно выполнит операцию без операции, за которой следует что-то из очереди.Поскольку рабочие потоки могут запрашивать
LinkedBlockingQueue
новую задачу, задача может быть поставлена в очередь даже при наличии доступного потока. Чтобы не порождать новые потоки даже при наличии доступных потоков, нам нужно отслеживать, сколько потоков ожидают новых задач в очереди, и создавать новый поток только тогда, когда в очереди задач больше, чем ожидающих потоков.источник
Лучшее решение, которое я могу придумать, - это расширить.
ThreadPoolExecutor
предлагает несколько методов перехвата:beforeExecute
иafterExecute
. В своем расширении вы можете использовать ограниченную очередь для подачи задач и вторую неограниченную очередь для обработки переполнения. Когда кто-то звонитsubmit
, вы можете попытаться поместить запрос в ограниченную очередь. Если вы встретились с исключением, вы просто помещаете задачу в свою очередь переполнения. Затем вы можете использоватьafterExecute
ловушку, чтобы увидеть, есть ли что-нибудь в очереди переполнения после завершения задачи. Таким образом, исполнитель сначала позаботится о материалах в своей ограниченной очереди и автоматически выберет из этой неограниченной очереди, если позволит время.Кажется, что работы больше, чем у вашего решения, но, по крайней мере, это не связано с неожиданным поведением очереди. Я также полагаю, что есть лучший способ проверить состояние очереди и потоков, чем полагаться на исключения, которые довольно медленно генерируются.
источник
Примечание. Для JDK ThreadPoolExecutor, когда у вас есть ограниченная очередь, вы создаете новые потоки только тогда, когда предложение возвращает false. Вы можете получить что-то полезное с помощью CallerRunsPolicy, который создает немного BackPressure и напрямую вызывает run в потоке вызывающего.
Мне нужно, чтобы задачи выполнялись из потоков, созданных пулом, и имели неограниченную очередь для планирования, в то время как количество потоков в пуле может увеличиваться или уменьшаться между corePoolSize и maximumPoolSize, поэтому ...
Я закончил тем, что сделал полную копию из ThreadPoolExecutor и немного изменил метод выполнения, потому что, к сожалению, это невозможно сделать с помощью расширения (он вызывает частные методы).
Я не хотел создавать новые потоки сразу же, когда приходит новый запрос и все потоки заняты (потому что у меня обычно недолговечные задачи). Я добавил порог, но не стесняйтесь изменять его в соответствии с вашими потребностями (возможно, для большей части ввода-вывода лучше удалить этот порог)
источник