Я сталкиваюсь с проблемой, при которой, если я пытаюсь изменить размер ThreadPoolExecutor
основного пула на другой номер после создания пула, то периодически, некоторые задачи отклоняются сRejectedExecutionException
даже если я никогда не отправляю больше, чем queueSize + maxPoolSize
количество задач.
Проблема, которую я пытаюсь решить, заключается в расширении, ThreadPoolExecutor
которое изменяет размеры своих основных потоков на основе ожидающих выполнений, находящихся в очереди пула потоков. Мне это нужно, потому что по умолчанию a ThreadPoolExecutor
создаст новый, Thread
только если очередь заполнена.
Вот небольшая автономная программа Pure Java 8, которая демонстрирует проблему.
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
Зачем пулу когда-либо выдавать RejectedExecutionException, если я никогда не отправлю больше, чем queueCapacity + maxThreads. Я никогда не меняю максимальное количество потоков, поэтому по определению ThreadPoolExecutor он должен либо разместить задачу в потоке, либо в очереди.
Конечно, если я никогда не изменяю размер пула, то пул потоков никогда не отклоняет любые представления. Это также трудно отладить, поскольку добавление каких-либо задержек в представлениях устраняет проблему.
Любые указатели о том, как исправить RejectedExecutionException?
источник
ExecutorService
путем обертывания существующей, которая повторно отправляет задачи, которые не удалось отправить из-за изменения размера?ThreadPoolExecutor
скорее всего, плохая идея, и не нужно ли вам в этом случае изменять существующий код? Было бы лучше, если бы вы предоставили пример того, как ваш реальный код обращается к исполнителю. Я был бы удивлен, если бы он использовал много методов, специфичных дляThreadPoolExecutor
(то есть не вExecutorService
).Ответы:
Вот сценарий, почему это происходит:
В моем примере я использую minThreads = 0, maxThreads = 2 и queueCapacity = 2, чтобы сделать его короче. Первая команда отправляется, это делается в методе execute:
для этой команды выполняется workQueue.offer (команда), а не addWorker (null, false). Рабочий поток сначала выводит эту команду из очереди в методе выполнения потока, поэтому в данный момент в очереди все еще есть одна команда,
На этот раз отправляется вторая команда. WorkQueue.offer (команда) выполняется. Теперь очередь заполнена
Теперь ScheduledExecutorService выполняет метод resizeThreadPool, который вызывает setCorePoolSize с maxThreads. Вот метод setCorePoolSize:
Этот метод добавляет одного работника, используя addWorker (null, true). Нет, есть 2 рабочих очереди, максимум и очередь заполнена.
Третья команда отправляется и завершается неудачей, потому что workQueue.offer (команда) и addWorker (команда, ложь) завершаются неудачей, что приводит к исключению:
Я думаю, что для решения этой проблемы вы должны установить емкость очереди на максимум команд, которые вы хотите выполнить.
источник
Не уверен, что это квалифицируется как ошибка. Это поведение, когда дополнительные рабочие потоки создаются после того, как очередь заполнена, но в документах java было отчасти отмечено, что вызывающая сторона должна иметь дело с отклоняемыми задачами.
Java документы
Когда вы изменяете размер основного пула, скажем, увеличивается, создаются дополнительные работники (
addWorker
метод insetCorePoolSize
), и вызов для создания дополнительной работы (addWorker
метод fromexecute
) отклоняется, когдаaddWorker
возвращается false (add Worker
последний фрагмент кода), так как достаточно дополнительных работников уже созданный,setCorePoolSize
но еще не запущенный для отражения обновления в очереди .Соответствующие части
сравнить
Используйте пользовательский обработчик отклонения повторных попыток (это должно работать для вашего случая, поскольку у вас есть верхняя граница в качестве максимального размера пула). Пожалуйста, настройте по мере необходимости.
Также обратите внимание, что вы используете shutdown не правильно, так как это не будет ждать завершения поставленной задачи, но
awaitTermination
вместо этого используйте с .источник