Java ThreadPoolExecutor: обновление размера основного пула динамически отклоняет входящие задачи периодически

13

Я сталкиваюсь с проблемой, при которой, если я пытаюсь изменить размер ThreadPoolExecutorосновного пула на другой номер после создания пула, то периодически, некоторые задачи отклоняются сRejectedExecutionException даже если я никогда не отправляю больше, чем queueSize + maxPoolSizeколичество задач.

Проблема, которую я пытаюсь решить, заключается в расширении, ThreadPoolExecutorкоторое изменяет размеры своих основных потоков на основе ожидающих выполнений, находящихся в очереди пула потоков. Мне это нужно, потому что по умолчанию a ThreadPoolExecutorсоздаст новый, Threadтолько если очередь заполнена.

Вот небольшая автономная программа Pure Java 8, которая демонстрирует проблему.

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResizeTest {

    public static void main(String[] args) throws Exception {
        // increase the number of iterations if unable to reproduce
        // for me 100 iterations have been enough
        int numberOfExecutions = 100;

        for (int i = 1; i <= numberOfExecutions; i++) {
            executeOnce();
        }
    }

    private static void executeOnce() throws Exception {
        int minThreads = 1;
        int maxThreads = 5;
        int queueCapacity = 10;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                minThreads, maxThreads,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity),
                new ThreadPoolExecutor.AbortPolicy()
        );

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                0, 10, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

        try {
            int totalTasksToSubmit = queueCapacity + maxThreads;

            for (int i = 1; i <= totalTasksToSubmit; i++) {
                // following line sometimes throws a RejectedExecutionException
                pool.submit(() -> {
                    // block the thread and prevent it from completing the task
                    taskBlocker.join();
                });
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away
            }
        } finally {
            taskBlocker.complete(null);
            scheduler.shutdown();
            pool.shutdown();
        }
    }

    /**
     * Resize the thread pool if the number of pending tasks are non-zero.
     */
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
        int pendingExecutions = pool.getQueue().size();
        int approximateRunningExecutions = pool.getActiveCount();

        /*
         * New core thread count should be the sum of pending and currently executing tasks
         * with an upper bound of maxThreads and a lower bound of minThreads.
         */
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

        pool.setCorePoolSize(newThreadCount);
        pool.prestartAllCoreThreads();
    }
}

Зачем пулу когда-либо выдавать RejectedExecutionException, если я никогда не отправлю больше, чем queueCapacity + maxThreads. Я никогда не меняю максимальное количество потоков, поэтому по определению ThreadPoolExecutor он должен либо разместить задачу в потоке, либо в очереди.

Конечно, если я никогда не изменяю размер пула, то пул потоков никогда не отклоняет любые представления. Это также трудно отладить, поскольку добавление каких-либо задержек в представлениях устраняет проблему.

Любые указатели о том, как исправить RejectedExecutionException?

Сваранга Сарма
источник
Почему бы не обеспечить собственную реализацию ExecutorServiceпутем обертывания существующей, которая повторно отправляет задачи, которые не удалось отправить из-за изменения размера?
Даниу
@daniu это обходной путь. Суть вопроса в том, почему пул должен генерировать исключение RejectedExecutionException, если я никогда не отправлю больше, чем queueCapacity + maxThreads. Я никогда не меняю максимальное количество потоков, поэтому по определению ThreadPoolExecutor он должен либо разместить задачу в потоке, либо в очереди.
Сваранга Сарма
Хорошо, похоже, я неправильно понял ваш вопрос. Что это? Вы хотите знать, почему поведение происходит или как вы обходите его, вызывая проблемы для вас?
Даниу
Да, изменить мою реализацию на службу executor невозможно, поскольку большая часть кода ссылается на ThreadPoolExecutor. Поэтому, если я все еще хотел иметь изменяемый размер ThreadPoolExecutor, мне нужно знать, как я могу это исправить. Возможно, правильным способом сделать что-то подобное является расширение ThreadPoolExecutor и получение доступа к некоторым его защищенным переменным, а также обновление размера пула в синхронизированном блоке для блокировки, совместно используемой суперклассом.
Сваранга Сарма
Расширение, ThreadPoolExecutorскорее всего, плохая идея, и не нужно ли вам в этом случае изменять существующий код? Было бы лучше, если бы вы предоставили пример того, как ваш реальный код обращается к исполнителю. Я был бы удивлен, если бы он использовал много методов, специфичных для ThreadPoolExecutor(то есть не в ExecutorService).
Даниу

Ответы:

5

Вот сценарий, почему это происходит:

В моем примере я использую minThreads = 0, maxThreads = 2 и queueCapacity = 2, чтобы сделать его короче. Первая команда отправляется, это делается в методе execute:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

для этой команды выполняется workQueue.offer (команда), а не addWorker (null, false). Рабочий поток сначала выводит эту команду из очереди в методе выполнения потока, поэтому в данный момент в очереди все еще есть одна команда,

На этот раз отправляется вторая команда. WorkQueue.offer (команда) выполняется. Теперь очередь заполнена

Теперь ScheduledExecutorService выполняет метод resizeThreadPool, который вызывает setCorePoolSize с maxThreads. Вот метод setCorePoolSize:

 public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

Этот метод добавляет одного работника, используя addWorker (null, true). Нет, есть 2 рабочих очереди, максимум и очередь заполнена.

Третья команда отправляется и завершается неудачей, потому что workQueue.offer (команда) и addWorker (команда, ложь) завершаются неудачей, что приводит к исключению:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

Я думаю, что для решения этой проблемы вы должны установить емкость очереди на максимум команд, которые вы хотите выполнить.

Томас Кригер
источник
Правильный. Мне удалось воспроизвести, скопировав код в свой класс и добавив регистраторы. Обычно, когда очередь заполнена, и я отправляю новое задание, оно пытается создать нового работника. Между тем, если в этот момент мой ресайзер также вызывает setCorePoolSize в 2, это также создает нового работника. На этом этапе два рабочих конкурируют за добавление, но они оба не могут быть добавлены, потому что это нарушило бы ограничение максимального размера пула, поэтому отправка новой задачи отклоняется. Я думаю, что это условие гонки, и я подал отчет об ошибке в OpenJDK. Посмотрим. Но вы ответили на мой вопрос, чтобы вы получили награду. Спасибо.
Сваранга Сарма
2

Не уверен, что это квалифицируется как ошибка. Это поведение, когда дополнительные рабочие потоки создаются после того, как очередь заполнена, но в документах java было отчасти отмечено, что вызывающая сторона должна иметь дело с отклоняемыми задачами.

Java документы

Фабрика для новых тем. Все потоки создаются с использованием этой фабрики (через метод addWorker). Все абоненты должны быть готовы к сбою addWorker, который может отражать политику системы или пользователя, ограничивающую количество потоков. Даже если это не рассматривается как ошибка, сбой при создании потоков может привести к отклонению новых задач или оставлению существующих в очереди.

Когда вы изменяете размер основного пула, скажем, увеличивается, создаются дополнительные работники ( addWorkerметод in setCorePoolSize), и вызов для создания дополнительной работы ( addWorkerметод from execute) отклоняется, когда addWorkerвозвращается false ( add Workerпоследний фрагмент кода), так как достаточно дополнительных работников уже созданный, setCorePoolSize но еще не запущенный для отражения обновления в очереди .

Соответствующие части

сравнить

public void setCorePoolSize(int corePoolSize) {
    ....
    int k = Math.min(delta, workQueue.size());
    while (k-- > 0 && addWorker(null, true)) {
        if (workQueue.isEmpty())
             break;
    }
}

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
....
   if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
     return false;             
}

Используйте пользовательский обработчик отклонения повторных попыток (это должно работать для вашего случая, поскольку у вас есть верхняя граница в качестве максимального размера пула). Пожалуйста, настройте по мере необходимости.

public static class RetryRejectionPolicy implements RejectedExecutionHandler {
    public RetryRejectionPolicy () {}

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
           while(true)
            if(e.getQueue().offer(r)) break;
        }
    }
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(
      minThreads, maxThreads,
      0, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(queueCapacity),
      new ThreadPoolResizeTest.RetryRejectionPolicy()
 );

Также обратите внимание, что вы используете shutdown не правильно, так как это не будет ждать завершения поставленной задачи, но awaitTerminationвместо этого используйте с .

Сагар Вирам
источник
Я думаю, что shutdown ожидает уже отправленные задачи, в соответствии с JavaDoc: shutdown () Инициирует упорядоченное завершение, при котором выполняются ранее отправленные задачи, но новые задачи не принимаются.
Томас Кригер
@ThomasKrieger - он выполнит уже отправленные задачи, но не будет ждать их завершения - из документов docs.oracle.com/javase/7/docs/api/java/util/concurrent/… - этот метод не ожидает ранее отправленных задачи для завершения выполнения. Используйте awaitTermination, чтобы сделать это.
Сагар Вирам