Я пытаюсь закодировать решение, в котором один поток создает задачи с интенсивным вводом-выводом, которые могут выполняться параллельно. Каждая задача имеет важные данные в памяти. Поэтому я хочу иметь возможность ограничивать количество задач, ожидающих выполнения в данный момент.
Если я создам ThreadPoolExecutor следующим образом:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueue));
Потом executor.submit(callable)
выкидывает, RejectedExecutionException
когда очередь заполняется и все потоки уже заняты.
Что я могу сделать, чтобы executor.submit(callable)
заблокировать, когда очередь заполнена и все потоки заняты?
РЕДАКТИРОВАТЬ : Я пробовал это :
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
И это в некоторой степени обеспечивает эффект, которого я хочу достичь, но неэлегантным способом (в основном отклоненные потоки запускаются в вызывающем потоке, поэтому это блокирует вызывающий поток от отправки большего количества).
РЕДАКТИРОВАТЬ: (5 лет после того, как задали вопрос)
Всем, кто читает этот вопрос и ответы на него, не принимайте принятый ответ как одно правильное решение. Пожалуйста, прочтите все ответы и комментарии.
источник
numWorkerThreads
когда вызывающий поток также выполняет задачу. Но более важными проблемами являются то, что если вызывающий поток получает длительную задачу, другие потоки могут бездействовать в ожидании следующей задачи.Ответы:
Я сделал то же самое. Уловка состоит в том, чтобы создать BlockingQueue, в котором метод offer () действительно является put (). (вы можете использовать любую базу BlockingQueue, которую хотите).
public class LimitedQueue<E> extends LinkedBlockingQueue<E> { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }
Обратите внимание, что это работает только для пула потоков,
corePoolSize==maxPoolSize
поэтому будьте осторожны (см. Комментарии).источник
corePoolSize==maxPoolSize
. Без этого он больше не позволяет ThreadPoolExecutor иметь заданное поведение. Я искал решение этой проблемы, которое не имело бы этого ограничения; см. мой альтернативный ответ ниже, чтобы узнать, какой подход мы выбрали.Вот как я решил это со своей стороны:
(примечание: это решение блокирует поток, отправляющий Callable, поэтому предотвращает выброс исключения RejectedExecutionException)
public class BoundedExecutor extends ThreadPoolExecutor{ private final Semaphore semaphore; public BoundedExecutor(int bound) { super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); semaphore = new Semaphore(bound); } /**Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{ semaphore.acquire(); return submit(task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); semaphore.release(); } }
источник
corePoolSize < maxPoolSize
...: |corePoolSize < maxPoolSize
. В этих случаях семафор будет доступен, но не будет потока, иSynchronousQueue
функция вернет false. ЗатемThreadPoolExecutor
будет создана новая нить. Проблема этого решения в том, что оно имеет состояние гонки . Послеsemaphore.release()
, но до завершения потокаexecute
, submit () получит разрешение семафора. ЕСЛИ super.submit () запускается доexecute()
завершения, задание будет отклонено.У принятого в настоящее время ответа есть потенциально серьезная проблема - он изменяет поведение ThreadPoolExecutor.execute таким образом, что если у вас есть
corePoolSize < maxPoolSize
, логика ThreadPoolExecutor никогда не будет добавлять дополнительных рабочих за пределы ядра.Из ThreadPoolExecutor .execute (Runnable):
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);
В частности, последний блок else никогда не будет затронут.
Лучшая альтернатива - сделать что-то похожее на то, что уже делает OP - используйте RejectedExecutionHandler, чтобы выполнить ту же
put
логику:public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e); } }
При таком подходе есть некоторые вещи, на которые следует обратить внимание, как указано в комментариях (со ссылкой на этот ответ ):
corePoolSize==0
, то существует состояние гонки, при котором все потоки в пуле могут умереть до того, как задача станет видимой.ThreadPoolExecutor
), приведет к проблемам, если обработчик также не обернет ее таким же образом.Помня об этих ошибках, это решение будет работать для большинства типичных ThreadPoolExecutors и будет правильно обрабатывать случай, когда
corePoolSize < maxPoolSize
.источник
Я знаю, что это старый вопрос, но у него была аналогичная проблема: создание новых задач было очень быстрым, и если было слишком много OutOfMemoryError, возникла ошибка, потому что существующая задача не была завершена достаточно быстро.
В моем случае
Callables
отправлены, и мне нужен результат, поэтому мне нужно сохранить все, чтоFutures
вернулexecutor.submit()
. Мое решение заключалось в том, чтобы поместитьFutures
файлBlockingQueue
с максимальным размером. После заполнения этой очереди задачи больше не будут создаваться, пока некоторые из них не будут завершены (элементы удалены из очереди). В псевдокоде:final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads); final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize); try { Thread taskGenerator = new Thread() { @Override public void run() { while (reader.hasNext) { Callable task = generateTask(reader.next()); Future future = executor.submit(task); try { // if queue is full blocks until a task // is completed and hence no future tasks are submitted. futures.put(compoundFuture); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } executor.shutdown(); } } taskGenerator.start(); // read from queue as long as task are being generated // or while Queue has elements in it while (taskGenerator.isAlive() || !futures.isEmpty()) { Future compoundFuture = futures.take(); // do something } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (ExecutionException ex) { throw new MyException(ex); } finally { executor.shutdownNow(); }
источник
У меня была аналогичная проблема, и я реализовал ее с помощью
beforeExecute/afterExecute
хуков изThreadPoolExecutor
:import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Blocks current task execution if there is not enough resources for it. * Maximum task count usage controlled by maxTaskCount property. */ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final ReentrantLock taskLock = new ReentrantLock(); private final Condition unpaused = taskLock.newCondition(); private final int maxTaskCount; private volatile int currentTaskCount; public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int maxTaskCount) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.maxTaskCount = maxTaskCount; } /** * Executes task if there is enough system resources for it. Otherwise * waits. */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskLock.lock(); try { // Spin while we will not have enough capacity for this job while (maxTaskCount < currentTaskCount) { try { unpaused.await(); } catch (InterruptedException e) { t.interrupt(); } } currentTaskCount++; } finally { taskLock.unlock(); } } /** * Signalling that one more task is welcome */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskLock.lock(); try { currentTaskCount--; unpaused.signalAll(); } finally { taskLock.unlock(); } } }
Этого должно быть достаточно для вас. Кстати, исходная реализация была основана на размере задачи, потому что одна задача могла быть в 100 раз больше, чем другая, и отправка двух огромных задач убивала коробку, но запуск одной большой и множества маленьких было нормально. Если ваши задачи с интенсивным вводом-выводом имеют примерно такой же размер, вы можете использовать этот класс, в противном случае просто дайте мне знать, и я опубликую реализацию на основе размера.
PS Вы бы хотели проверить
ThreadPoolExecutor
javadoc. Это действительно хорошее руководство пользователя от Дуга Ли о том, как его можно легко настроить.источник
maxTaskCount < currentTaskCount
и начинает ждать поunpaused
условию. В то же время другой поток пытается получить блокировку в afterExecute (), чтобы сигнализировать о завершении задачи. Не будет ли это тупик?RejectedExecutionException
что все еще возможно.Я реализовал решение, следуя шаблону декоратора и используя семафор для управления количеством выполняемых задач. Вы можете использовать его с любыми
Executor
и:RejectedExecutionException
)import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Duration; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; public class BlockingOnFullQueueExecutorDecorator implements Executor { private static final class PermitReleasingDecorator implements Runnable { @Nonnull private final Runnable delegate; @Nonnull private final Semaphore semaphore; private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) { this.delegate = task; this.semaphore = semaphoreToRelease; } @Override public void run() { try { this.delegate.run(); } finally { // however execution goes, release permit for next task this.semaphore.release(); } } @Override public final String toString() { return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate); } } @Nonnull private final Semaphore taskLimit; @Nonnull private final Duration timeout; @Nonnull private final Executor delegate; public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) { this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); if (maximumTaskNumber < 1) { throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber)); } this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null"); if (this.timeout.isNegative()) { throw new IllegalArgumentException("'maximumTimeout' must not be negative"); } this.taskLimit = new Semaphore(maximumTaskNumber); } @Override public final void execute(final Runnable command) { Objects.requireNonNull(command, "'command' must not be null"); try { // attempt to acquire permit for task execution if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); } } catch (final InterruptedException e) { // restore interrupt status Thread.currentThread().interrupt(); throw new IllegalStateException(e); } this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit)); } @Override public final String toString() { return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(), this.timeout, this.delegate); } }
источник
Я думаю, это так же просто, как использовать a
ArrayBlockingQueue
вместо aaLinkedBlockingQueue
.Игнорируй меня ... это совершенно неправильно.
ThreadPoolExecutor
вызываетQueue#offer
не те,put
которые имели бы требуемый эффект.Вы можете расширить
ThreadPoolExecutor
и предоставить реализациюexecute(Runnable)
этих вызововput
вместоoffer
.Боюсь, это не совсем удовлетворительный ответ.
источник