Как заставить ThreadPoolExecutor увеличивать потоки до максимума перед постановкой в ​​очередь?

100

Некоторое время я был разочарован поведением по умолчанию, ThreadPoolExecutorкоторое поддерживает ExecutorServiceпулы потоков, которые используют многие из нас. Цитата из Javadocs:

Если количество запущенных потоков больше corePoolSize, но меньше maximumPoolSize, новый поток будет создан только в том случае, если очередь заполнена .

Это означает, что если вы определите пул потоков с помощью следующего кода, он никогда не запустит 2-й поток, потому что LinkedBlockingQueueон неограничен.

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Только если у вас есть ограниченная очередь и она заполнена , запускаются любые потоки с номером ядра выше. Я подозреваю, что большое количество молодых многопоточных программистов на Java не знают о таком поведении ThreadPoolExecutor.

Теперь у меня есть конкретный вариант использования, когда это неоптимально. Я ищу способы обойти это без написания собственного класса TPE.

Мои требования касаются веб-службы, которая выполняет обратный вызов, возможно, ненадежной третьей стороне.

  • Я не хочу выполнять обратный вызов синхронно с веб-запросом, поэтому я хочу использовать пул потоков.
  • Я обычно получаю пару таких в минуту, поэтому я не хочу иметь newFixedThreadPool(...)большое количество потоков, которые в основном бездействуют.
  • Время от времени я получаю всплеск этого трафика, и я хочу увеличить количество потоков до некоторого максимального значения (скажем, 50).
  • Мне нужно сделать все возможное, чтобы выполнить все обратные вызовы, поэтому я хочу поставить в очередь любые дополнительные, превышающие 50. Я не хочу перегружать остальную часть моего веб-сервера, используя newCachedThreadPool().

Как я могу обойти это ограничение, ThreadPoolExecutorкогда очередь должна быть ограничена и заполнена до того, как будут запущены другие потоки? Как я могу заставить его запускать больше потоков перед постановкой задач в очередь?

Редактировать:

@Flavio хорошо замечает использование тега ThreadPoolExecutor.allowCoreThreadTimeOut(true)для тайм-аута и выхода основных потоков. Я подумал об этом, но мне все еще нужна функция core-thread. Я не хотел, чтобы количество потоков в пуле упало ниже размера ядра, если это возможно.

Серый
источник
1
Учитывая, что в вашем примере создается максимум 10 потоков, есть ли реальная экономия при использовании чего-то, что растет / сжимается в пуле потоков фиксированного размера?
bstempi
Хороший момент @bstempi. Число было несколько произвольным. Я увеличил его в вопросе до 50. Не совсем уверен, сколько параллельных потоков я действительно хочу работать, теперь, когда у меня есть это решение.
Грей
1
Ох, блин! 10 голосов, если бы я мог здесь, точно в той же позиции, что и я.
Евгений

Ответы:

51

Как я могу обойти это ограничение, ThreadPoolExecutorкогда очередь должна быть ограничена и заполнена, прежде чем будут запущены другие потоки.

Я считаю, что наконец-то нашел несколько элегантное (возможно, немного хакерское) решение этого ограничения с помощью ThreadPoolExecutor. Она включает в себя расширение , LinkedBlockingQueueчтобы он вернуться falseк queue.offer(...)когда уже есть некоторые задачи , поставленные в очередь. Если текущие потоки не успевают за задачами в очереди, TPE добавит дополнительные потоки. Если пул уже имеет максимальное количество потоков, RejectedExecutionHandlerбудет вызываться. Затем обработчик помещает запрос put(...)в очередь.

Конечно, странно писать очередь, в которой offer(...)можно возвращаться falseи put()никогда не блокироваться, так что это часть взлома. Но это хорошо работает с использованием очереди TPE, поэтому я не вижу в этом никаких проблем.

Вот код:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

С помощью этого механизма, когда я отправляю задачи в очередь, ThreadPoolExecutorони:

  1. Изначально увеличьте количество потоков до размера ядра (здесь 1).
  2. Предложите в очередь. Если очередь пуста, она будет помещена в очередь для обработки существующими потоками.
  3. Если в очереди уже есть 1 или более элементов, offer(...)функция вернет false.
  4. Если возвращается false, увеличивайте количество потоков в пуле, пока они не достигнут максимального числа (здесь 50).
  5. Если на максимуме, то он вызывает RejectedExecutionHandler
  6. В RejectedExecutionHandlerто ставит задачу в очередь для обработки первого доступного потока в FIFO порядке.

Хотя в моем примере кода выше очередь не ограничена, вы также можете определить ее как ограниченную очередь. Например, если вы добавите емкость 1000 к, LinkedBlockingQueueто это будет:

  1. масштабируйте резьбу до макс.
  2. затем встаньте в очередь, пока она не заполнится 1000 задачами
  3. затем заблокируйте вызывающего абонента до тех пор, пока для очереди не станет доступным пространство.

Кроме того, если вам нужно использовать offer(...)в, RejectedExecutionHandlerвы можете использовать offer(E, long, TimeUnit)метод вместо Long.MAX_VALUEтайм-аута.

Предупреждение:

Если вы ожидаете, что задачи будут добавлены к исполнителю после того, как он был выключен, тогда вы можете быть более умными, выбрасывая RejectedExecutionExceptionнашу кастомную программу, RejectedExecutionHandlerкогда служба-исполнитель была выключена. Спасибо @RaduToader за указание на это.

Редактировать:

Еще одна настройка этого ответа может заключаться в том, чтобы спросить TPE, есть ли незанятые потоки, и поставить элемент в очередь только в том случае, если он есть. Вам нужно будет создать для этого настоящий класс и добавить к нему ourQueue.setThreadPoolExecutor(tpe);метод.

Тогда ваш offer(...)метод может выглядеть примерно так:

  1. Проверьте, если tpe.getPoolSize() == tpe.getMaximumPoolSize()в каком случае просто позвоните super.offer(...).
  2. В противном случае tpe.getPoolSize() > tpe.getActiveCount()вызовите if, super.offer(...)поскольку кажется, что есть незанятые потоки.
  3. В противном случае вернитесь falseк вилке другого потока.

Может быть, это:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Обратите внимание, что методы get в TPE дороги, поскольку они обращаются к volatileполям или (в случае getActiveCount()) блокируют TPE и просматривают список потоков. Кроме того, здесь есть условия гонки, которые могут привести к неправильной постановке задачи в очередь или разветвлению другого потока, когда был незанятый поток.

Серый
источник
Я также боролся с той же проблемой, в конечном итоге переопределил метод выполнения. Но это действительно хорошее решение. :)
Бэтти
Как бы мне ни не нравилась идея разорвать контракт Queueради достижения этой цели, вы определенно не одиноки в своей идее: groovy-programming.com/post/26923146865
bstempi
3
Разве вы не видите здесь странности в том, что первая пара задач будет поставлена ​​в очередь, и только после этого появятся новые потоки? Например, если ваш единственный основной поток занят одной длительно выполняющейся задачей, и вы вызываете его execute(runnable), он runnableпросто добавляется в очередь. Если звонишь execute(secondRunnable), то secondRunnableдобавляется в очередь. Но если сейчас позвоните execute(thirdRunnable), то thirdRunnableбудет запущен новый поток. Выполнение runnableи secondRunnableтолько один раз thirdRunnable(или исходная длительная задача) завершены.
Роберт Тупело-Шнек
1
Да, Роберт прав, в многопоточной среде очередь иногда увеличивается, когда есть свободные потоки для использования. Решение ниже, которое расширяет TPE - работает намного лучше. Я думаю, что предложение Роберта следует отметить как ответ, даже несмотря на то, что вышеупомянутый прием интересен
хочу знать все
1
RejectedExecutionHandler помог исполнителю при завершении работы. Теперь вы вынуждены использовать shutdownNow (), поскольку shutdown () не предотвращает добавление новых задач (из-за запроса)
Radu
28

Установите одинаковое значение для размера ядра и максимального размера и разрешите удаление основных потоков из пула с помощью allowCoreThreadTimeOut(true).

Флавио
источник
+1 Да, я подумал об этом, но мне все еще хотелось иметь функцию core-thread. Я не хотел, чтобы пул потоков переходил в 0 потоков в периоды бездействия. Я отредактирую свой вопрос, чтобы указать на это. Но отличный момент.
Грей
Спасибо! Это самый простой способ сделать это.
Дмитрий Овчинников
28

У меня уже есть два других ответа на этот вопрос, но я подозреваю, что это лучший.

Он основан на методе принятого в настоящее время ответа , а именно:

  1. Переопределите метод очереди, offer()чтобы (иногда) возвращать false,
  2. который заставляет ThreadPoolExecutorлибо порождать новый поток, либо отклонять задачу, и
  3. установить, RejectedExecutionHandlerчтобы фактически поставить задачу в очередь при отклонении.

Проблема в том, когда offer()должно возвращаться false. Принятый в настоящее время ответ возвращает false, если в очереди есть несколько задач, но, как я указал в своем комментарии, это вызывает нежелательные эффекты. В качестве альтернативы, если вы всегда возвращаете false, вы продолжите порождать новые потоки, даже если у вас есть потоки, ожидающие в очереди.

Решение - использовать Java 7 LinkedTransferQueueи иметь offer()call tryTransfer(). Когда есть ожидающий потребительский поток, задача просто передается этому потоку. В противном случае offer()вернет false и ThreadPoolExecutorсоздаст новый поток.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
Роберт Тупело-Шнек
источник
Я должен согласиться, мне это кажется самым чистым. Единственным недостатком этого решения является то, что LinkedTransferQueue неограничен, поэтому вы не получите очередь задач с ограниченной емкостью без дополнительной работы.
Yeroc
Возникает проблема при увеличении пула до максимального размера. Скажем, пул увеличен до максимального размера, и каждый поток в настоящее время выполняет задачу, при отправке этого предложения для запуска это предложение impl вернет false, а ThreadPoolExecutor пытается добавить поток addWorker, но пул уже достиг максимума, поэтому runnable будет просто отклонен. Согласно rejectedExceHandler, который вы написали, он будет снова предложен в очередь, в результате чего этот танец обезьяны снова произойдет с самого начала.
Sudheera 01
1
@Sudheera Я считаю, что вы ошибаетесь. queue.offer(), поскольку он действительно вызывает LinkedTransferQueue.tryTransfer(), вернет false и не поставит задачу в очередь. Однако RejectedExecutionHandlerвызовы queue.put(), которые не терпят неудачу и ставят задачу в очередь.
Роберт Тупело-Шнек
1
@ RobertTupelo-Schneck очень полезно и приятно!
Евгений
1
@ RobertTupelo-Schneck Работает как шарм! Я не знаю, почему в java нет чего-то подобного из коробки
Георгий Пеев
7

Примечание: теперь я предпочитаю и рекомендую свой другой ответ .

Вот версия, которая кажется мне более простой: увеличивайте corePoolSize (до предела maximumPoolSize) всякий раз, когда выполняется новая задача, затем уменьшайте corePoolSize (до предела, указанного пользователем «размер основного пула»), когда задача завершена.

Другими словами, отслеживайте количество запущенных или поставленных в очередь задач и убедитесь, что corePoolSize равен количеству задач, пока он находится между заданным пользователем «размером основного пула» и максимальным размером пула.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Как написано, класс не поддерживает изменение заданного пользователем corePoolSize или maximumPoolSize после построения и не поддерживает управление рабочей очередью напрямую или через remove()или purge().

Роберт Тупело-Шнек
источник
Мне нравится кроме synchronizedблоков. Можете ли вы дозвониться до очереди, чтобы узнать количество задач. Или, может быть, использовать AtomicInteger?
Gray
Я хотел их избежать, но проблема вот в чем. Если есть несколько execute()вызовов в отдельных потоках, каждый из них (1) вычислит, сколько потоков необходимо, (2) setCorePoolSizeдо этого числа и (3) вызовет super.execute(). Если шаги (1) и (2) не синхронизированы, я не уверен, как предотвратить неудачный порядок, когда вы устанавливаете размер основного пула на меньшее число после большего числа. При прямом доступе к полю суперкласса это можно было бы сделать с помощью метода сравнения и установки, но я не вижу чистого способа сделать это в подклассе без синхронизации.
Роберт Тупело-Шнек
Я думаю, что штрафы за это состояние гонки относительно невелики, пока taskCountполе является действительным (т.е. a AtomicInteger). Если два потока пересчитывают размер пула сразу после друг друга, он должен получить правильные значения. Если второй сжимает основные потоки, значит, он видел падение очереди или что-то в этом роде.
Gray
1
К сожалению, я думаю, что это еще хуже. Допустим, задачи 10 и 11 звонят execute(). Каждый позвонит, atomicTaskCount.incrementAndGet()и они получат 10 и 11 соответственно. Но без синхронизации (после получения количества задач и установки размера основного пула) вы можете получить: (1) задача 11 устанавливает размер основного пула равным 11, (2) задача 10 устанавливает размер основного пула равным 10, (3) задача 10 вызывает super.execute(), (4) задача 11 вызывает super.execute()и ставится в очередь.
Роберт Тупело-Шнек
2
Я провел серьезное тестирование этого решения, и оно определенно лучшее. В многопоточной среде он по-прежнему иногда ставится в очередь, когда есть свободные потоки (из-за природы TPE.execute со свободными потоками), но это происходит редко, в отличие от решения с пометкой как ответ, где состояние гонки имеет больше шансов случаются, так что это происходит в значительной степени при каждом многопоточном запуске.
Хочу знать всех
6

У нас есть подкласс, ThreadPoolExecutorкоторый принимает дополнительные creationThresholdи переопределяет execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

может быть, это тоже помогает, но ваш, конечно, выглядит более вычурно…

Ральф Х
источник
Интересный. Спасибо за это. На самом деле я не знал, что размер ядра изменяемый.
Грей
Теперь, когда я думаю об этом еще раз, это решение лучше моего с точки зрения проверки размера очереди. Я изменил свой ответ, чтобы offer(...)метод возвращался только falseусловно. Спасибо!
Грей
4

Рекомендуемый ответ решает только одну (1) проблему с пулом потоков JDK:

  1. Пулы потоков JDK смещены в сторону очередей. Поэтому вместо создания нового потока они поставят задачу в очередь. Только если очередь достигает своего предела, пул потоков порождает новый поток.

  2. Ухода нити не происходит при уменьшении нагрузки. Например, если у нас есть всплеск заданий, попадающих в пул, что приводит к тому, что пул достигает максимума, за которым следует небольшая нагрузка максимум 2 задачи за раз, пул будет использовать все потоки для обслуживания небольшой нагрузки, предотвращающей выход из эксплуатации. (потребуется всего 2 потока…)

Недовольный описанным выше поведением, я пошел дальше и реализовал пул, чтобы преодолеть указанные выше недостатки.

Решение 2) Использование расписания Lifo решает проблему. Эту идею представил Бен Маурер на конференции ACM Applicative 2015: Systems @ Facebook scale.

Так родилась новая реализация:

LifoThreadPoolExecutorSQP

Пока эта реализация улучшает производительность асинхронного выполнения для ZEL .

Эта реализация позволяет сократить накладные расходы на переключение контекста, обеспечивая превосходную производительность для определенных случаев использования.

Надеюсь, поможет...

PS: JDK Fork Join Pool реализует ExecutorService и работает как «обычный» пул потоков, реализация является производительной, она использует планирование потоков LIFO, однако нет контроля над размером внутренней очереди, тайм-аутом вывода на пенсию ... и, что наиболее важно, задачи не могут быть прерывается при их отмене

user2179737
источник
1
Жаль, что у этой реализации так много внешних зависимостей. Делает это бесполезным для меня: - /
Мартин Л.
1
Это действительно хороший момент (2-й). К сожалению, реализация не ясна из-за внешних зависимостей, но все же может быть принята, если вы хотите.
Алексей Власов
1

Примечание: теперь я предпочитаю и рекомендую свой другой ответ .

У меня есть еще одно предложение, продолжающее первоначальную идею изменить очередь, чтобы вернуть false. В этом случае все задачи могут поступать в очередь, но всякий раз, когда задача ставится в очередь после execute(), мы следуем за ней с помощью дозорной задачи без операции, которую очередь отклоняет, вызывая порождение нового потока, который немедленно выполнит операцию без операции, за которой следует что-то из очереди.

Поскольку рабочие потоки могут запрашивать LinkedBlockingQueueновую задачу, задача может быть поставлена ​​в очередь даже при наличии доступного потока. Чтобы не порождать новые потоки даже при наличии доступных потоков, нам нужно отслеживать, сколько потоков ожидают новых задач в очереди, и создавать новый поток только тогда, когда в очереди задач больше, чем ожидающих потоков.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});
Роберт Тупело-Шнек
источник
0

Лучшее решение, которое я могу придумать, - это расширить.

ThreadPoolExecutorпредлагает несколько методов перехвата: beforeExecuteи afterExecute. В своем расширении вы можете использовать ограниченную очередь для подачи задач и вторую неограниченную очередь для обработки переполнения. Когда кто-то звонит submit, вы можете попытаться поместить запрос в ограниченную очередь. Если вы встретились с исключением, вы просто помещаете задачу в свою очередь переполнения. Затем вы можете использовать afterExecuteловушку, чтобы увидеть, есть ли что-нибудь в очереди переполнения после завершения задачи. Таким образом, исполнитель сначала позаботится о материалах в своей ограниченной очереди и автоматически выберет из этой неограниченной очереди, если позволит время.

Кажется, что работы больше, чем у вашего решения, но, по крайней мере, это не связано с неожиданным поведением очереди. Я также полагаю, что есть лучший способ проверить состояние очереди и потоков, чем полагаться на исключения, которые довольно медленно генерируются.

bstempi
источник
Я не люблю это решение. Я почти уверен, что ThreadPoolExecutor не предназначен для наследования.
Скотт
На самом деле, прямо в JavaDoc есть пример расширения. Они заявляют, что, скорее всего, будут просто реализовывать методы перехвата, но они говорят вам, на что еще нужно обратить внимание при расширении.
bstempi
0

Примечание. Для JDK ThreadPoolExecutor, когда у вас есть ограниченная очередь, вы создаете новые потоки только тогда, когда предложение возвращает false. Вы можете получить что-то полезное с помощью CallerRunsPolicy, который создает немного BackPressure и напрямую вызывает run в потоке вызывающего.

Мне нужно, чтобы задачи выполнялись из потоков, созданных пулом, и имели неограниченную очередь для планирования, в то время как количество потоков в пуле может увеличиваться или уменьшаться между corePoolSize и maximumPoolSize, поэтому ...

Я закончил тем, что сделал полную копию из ThreadPoolExecutor и немного изменил метод выполнения, потому что, к сожалению, это невозможно сделать с помощью расширения (он вызывает частные методы).

Я не хотел создавать новые потоки сразу же, когда приходит новый запрос и все потоки заняты (потому что у меня обычно недолговечные задачи). Я добавил порог, но не стесняйтесь изменять его в соответствии с вашими потребностями (возможно, для большей части ввода-вывода лучше удалить этот порог)

private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;

protected void beforeExecute(Thread t, Runnable r) {
    activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
    activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) {
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {
                this.addWorker((Runnable) null, false);
            }
            //>>change start
            else if (workerCountOf(recheck) < maximumPoolSize //
                && (activeWorkers.get() > workerCountOf(recheck) * threshold
                    || workQueue.size() > workerCountOf(recheck) * threshold)) {
                this.addWorker((Runnable) null, false);
            }
            //<<change end
        } else if (!this.addWorker(command, false)) {
            this.reject(command);
        }
    }
Раду Тоадер
источник