Я хочу создать ThreadPoolExecutor
такой, чтобы при достижении максимального размера и заполнении очереди submit()
метод блокировался при попытке добавить новые задачи. Нужно ли мне реализовать RejectedExecutionHandler
для этого индивидуальный заказ или есть способ сделать это с помощью стандартной библиотеки Java?
java
concurrency
executor
Fixpoint
источник
источник
Ответы:
Одно из возможных решений, которое я только что нашел:
Есть ли другие решения? Я бы предпочел что-то на основе,
RejectedExecutionHandler
поскольку это кажется стандартным способом решения таких ситуаций.источник
throw e;
это НЕ в книге. JCIP правильный!Вы можете использовать ThreadPoolExecutor и blockingQueue:
источник
Вы должны использовать
CallerRunsPolicy
, который выполняет отклоненную задачу в вызывающем потоке. Таким образом, он не может отправлять какие-либо новые задачи исполнителю, пока эта задача не будет выполнена, после чего появятся несколько свободных потоков пула или процесс повторится.http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
Из документов:
Кроме того, при вызове
ThreadPoolExecutor
конструктора обязательно используйте ограниченную очередь, такую как ArrayBlockingQueue . Иначе ничего не будет отклонено.Изменить: в ответ на ваш комментарий установите размер ArrayBlockingQueue равным максимальному размеру пула потоков и используйте AbortPolicy.
Изменить 2: Хорошо, я понимаю, к чему вы клоните. Как насчет этого: переопределить
beforeExecute()
метод, чтобы проверить, чтоgetActiveCount()
не превышаетgetMaximumPoolSize()
, и если это так, спать и повторить попытку?источник
Hibernate имеет
BlockPolicy
простой элемент, который может делать то, что вы хотите:См .: Executors.java
источник
ThreadPoolExecutor
даже говорится буквально: «Метод getQueue () разрешает доступ к рабочей очереди для целей мониторинга и отладки. Использование этого метода для любых других целей категорически не рекомендуется». Абсолютно грустно видеть то, что это доступно в столь широко известной библиотеке.Приведенный
BoundedExecutor
выше ответ из Java Concurrency in Practice работает правильно только в том случае, если вы используете неограниченную очередь для Executor или если размер семафора не превышает размер очереди. Семафор является общим состоянием между отправляющим потоком и потоками в пуле, что делает возможным насыщение исполнителя, даже если размер очереди <bound <= (размер очереди + размер пула).Использование
CallerRunsPolicy
допустимо только в том случае, если ваши задачи не выполняются вечно, и в этом случае ваш поток отправки останетсяrejectedExecution
навсегда, и плохая идея, если ваши задачи требуют много времени для выполнения, потому что поток отправки не может отправить какие-либо новые задачи или делать что-нибудь еще, если он сам выполняет задачу.Если это неприемлемо, я предлагаю проверить размер ограниченной очереди исполнителя перед отправкой задачи. Если очередь заполнена, подождите немного перед повторной попыткой отправки. Пропускная способность пострадает, но я предлагаю более простое решение, чем многие другие предлагаемые решения, и вам гарантировано, что ни одна задача не будет отклонена.
источник
Я знаю, что это взлом, но, на мой взгляд, самый чистый из предложенных здесь ;-)
Поскольку ThreadPoolExecutor использует блокирующую очередь "предложение" вместо "положить", позволяет переопределить поведение "предложения" блокирующей очереди:
Я тестировал, и вроде работает. Реализация некоторой политики тайм-аута оставлена как упражнение читателя.
источник
Следующий класс оборачивается вокруг ThreadPoolExecutor и использует семафор для блокировки, когда рабочая очередь заполнена:
Этот класс-оболочка основан на решении, приведенном в книге Брайана Гетца в книге Java Concurrency in Practice. Решение в книге принимает только два параметра конструктора: an
Executor
и границу, используемую для семафора. Это показано в ответе Fixpoint. У такого подхода есть проблема: он может попасть в состояние, когда потоки пула заняты, очередь заполнена, но семафор только что выпустил разрешение. (semaphore.release()
в блоке finally). В этом состоянии новая задача может получить только что выпущенное разрешение, но будет отклонена, поскольку очередь задач заполнена. Конечно, вы этого не хотите; вы хотите заблокировать в этом случае.Чтобы решить эту проблему, мы должны использовать неограниченную очередь, как ясно упоминает JCiP. Семафор действует как защита, создавая эффект размера виртуальной очереди. Это имеет побочный эффект, заключающийся в том, что модуль может содержать
maxPoolSize + virtualQueueSize + maxPoolSize
задачи. Это почему? Из-semaphore.release()
за блока в finally. Если все потоки пула вызывают этот оператор одновременно, тоmaxPoolSize
разрешения освобождаются, позволяя одному и тому же количеству задач войти в модуль. Если бы мы использовали ограниченную очередь, она все равно была бы заполнена, что привело бы к отклонению задачи. Теперь, поскольку мы знаем, что это происходит только тогда, когда поток пула почти завершен, это не проблема. Мы знаем, что поток пула не будет блокироваться, поэтому задача скоро будет снята из очереди.Однако вы можете использовать ограниченную очередь. Только убедитесь, что его размер равен
virtualQueueSize + maxPoolSize
. Большие размеры бесполезны, семафор не позволит впустить больше элементов. Меньшие размеры приведут к отклонению задач. Вероятность отклонения задач увеличивается с уменьшением размера. Например, предположим, что вам нужен ограниченный исполнитель с maxPoolSize = 2 и virtualQueueSize = 5. Затем возьмите семафор с 5 + 2 = 7 разрешениями и фактическим размером очереди 5 + 2 = 7. Реальное количество задач, которые могут быть в отряде, тогда 2 + 5 + 2 = 9. Когда исполнитель заполнен (5 задач в очереди, 2 в пуле потоков, поэтому доступно 0 разрешений) и ВСЕ потоки пула освобождают свои разрешения, то входящие задачи могут получить ровно 2 разрешения.Теперь решение от JCiP несколько громоздко в использовании, поскольку оно не обеспечивает соблюдение всех этих ограничений (неограниченная очередь или ограниченная этими математическими ограничениями и т. Д.). Я думаю, что это служит только хорошим примером, демонстрирующим, как вы можете создавать новые потокобезопасные классы на основе уже доступных частей, но не как полноценный, многоразовый класс. Не думаю, что последнее было намерением автора.
источник
вы можете использовать собственный RejectedExecutionHandler, например
источник
Создайте свою собственную очередь блокировки, которая будет использоваться исполнителем, с требуемым поведением блокировки, всегда возвращая доступную оставшуюся емкость (гарантируя, что исполнитель не будет пытаться создать больше потоков, чем его основной пул, или вызвать обработчик отклонения).
Я верю, что это даст вам желаемое поведение блокировки. Обработчик отказа никогда не будет соответствовать требованиям, поскольку это указывает на то, что исполнитель не может выполнить задачу. Что я мог представить, так это то, что вы получаете некоторую форму «занятого ожидания» в обработчике. Это не то, что вам нужно, вам нужна очередь для исполнителя, которая блокирует вызывающего ...
источник
ThreadPoolExecutor
используетoffer
метод для добавления задач в очередь. Если бы я создал обычай,BlockingQueue
который блокируетсяoffer
, это нарушитBlockingQueue
контракт.ThreadPoolExecutor
было реализовано использование,offer
а неput
(версия блокировки)? Кроме того, если бы у клиентского кода был способ сказать, какой из них использовать и когда, многие люди, пытающиеся вручнуюЧтобы избежать проблем с решением @FixPoint. Можно использовать ListeningExecutorService и освободить семафор onSuccess и onFailure внутри FutureCallback.
источник
Runnable
поскольку эти методы все еще вызываются перед очисткой рабочего в обычном режимеThreadPoolExecutor
. То есть вам все равно придется обрабатывать исключения отклонения.Недавно я обнаружил, что этот вопрос имеет ту же проблему. OP не говорит об этом явно, но мы не хотим использовать,
RejectedExecutionHandler
который выполняет задачу в потоке отправителя, потому что это будет недостаточно использовать рабочие потоки, если эта задача является длительной.Прочитав все ответы и комментарии, в частности ошибочное решение с семафором или его использование,
afterExecute
я внимательно изучил код ThreadPoolExecutor, чтобы увидеть, есть ли выход. Я был поражен, увидев, что существует более 2000 строк (прокомментированного) кода, некоторые из которых вызывают у меня головокружение . Учитывая довольно простое требование, которое у меня действительно есть - один производитель, несколько потребителей, пусть производитель блокирует, когда ни один из потребителей не может выполнять работу - я решил развернуть свое собственное решение. Это не файл,ExecutorService
а просто файлExecutor
. И он не адаптирует количество потоков к рабочей нагрузке, а поддерживает только фиксированное количество потоков, что также соответствует моим требованиям. Вот код. Не стесняйтесь разглагольствовать об этом :-)источник
Я считаю, что есть довольно элегантный способ решить эту проблему, используя
java.util.concurrent.Semaphore
и делегируя поведениеExecutor.newFixedThreadPool
. Новая служба исполнителя будет выполнять новую задачу, только если для этого есть поток. Блокирование управляется семафором с количеством разрешений, равным количеству потоков. Когда задача завершена, она возвращает разрешение.источник
У меня была такая же потребность в прошлом: своего рода блокирующая очередь с фиксированным размером для каждого клиента, поддерживаемая общим пулом потоков. В итоге я написал свой собственный ThreadPoolExecutor:
UserThreadPoolExecutor (очередь блокировки (для каждого клиента) + пул потоков (общий для всех клиентов))
См .: https://github.com/d4rxh4wx/UserThreadPoolExecutor
Каждому UserThreadPoolExecutor предоставляется максимальное количество потоков из общего ThreadPoolExecutor.
Каждый UserThreadPoolExecutor может:
источник
Я нашел эту политику отказа в клиенте эластичного поиска. Он блокирует вызывающий поток в очереди блокировки. Код ниже-
источник
Недавно мне потребовалось добиться чего-то подобного, но на
ScheduledExecutorService
.Я также должен был убедиться, что я обрабатываю задержку, передаваемую методу, и гарантирую, что либо задача будет отправлена для выполнения в то время, которое ожидает вызывающий объект, либо просто не удастся, таким образом, бросив файл
RejectedExecutionException
.Другие методы из
ScheduledThreadPoolExecutor
для выполнения или отправки задачи изнутри вызывают,#schedule
которые, в свою очередь, будут вызывать переопределенные методы.У меня есть код, буду благодарен за любой отзыв. https://github.com/AmitabhAwasthi/BlockingScheduler
источник
Вот решение, которое, кажется, действительно работает. Это называется NotifyingBlockingThreadPoolExecutor .
Демо-программа.
Изменить: есть проблема с этим кодом, метод await () ошибочен. Вызов shutdown () + awaitTermination (), похоже, работает нормально.
источник
Мне не всегда нравится CallerRunsPolicy, тем более что он позволяет отклоненной задаче «пропускать очередь» и выполняться до задач, которые были отправлены ранее. Более того, выполнение задачи в вызывающем потоке может занять гораздо больше времени, чем ожидание освобождения первого слота.
Я решил эту проблему с помощью специального RejectedExecutionHandler, который на некоторое время просто блокирует вызывающий поток, а затем снова пытается отправить задачу:
Этот класс можно просто использовать в исполнителе пула потоков как RejectedExecutinHandler, как и любой другой, например:
Единственный недостаток, который я вижу, заключается в том, что вызывающий поток может быть заблокирован немного дольше, чем это необходимо (до 250 мс). Более того, поскольку этот исполнитель фактически вызывается рекурсивно, очень долгое ожидание доступности потока (часы) может привести к переполнению стека.
Тем не менее лично мне этот метод нравится. Он компактен, прост для понимания и хорошо работает.
источник