Невозможно создать пул кэшированных потоков с ограничением размера?

127

Кажется, невозможно создать пул кэшированных потоков с ограничением количества потоков, которые он может создать.

Вот как статический Executors.newCachedThreadPool реализован в стандартной библиотеке Java:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Итак, используя этот шаблон для создания пула кэшированных потоков фиксированного размера:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Теперь, если вы воспользуетесь этим и отправите 3 задачи, все будет хорошо. Отправка любых дальнейших задач приведет к отклонению исключений выполнения.

Пробуем это:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

В результате все потоки будут выполняться последовательно. То есть пул потоков никогда не будет создавать более одного потока для обработки ваших задач.

Это ошибка в методе выполнения ThreadPoolExecutor? А может это намеренно? Или есть другой способ?

Изменить: мне нужно что-то точно такое же, как пул кэшированных потоков (он создает потоки по запросу, а затем убивает их после некоторого тайм-аута), но с ограничением количества потоков, которые он может создать, и возможностью продолжать ставить в очередь дополнительные задачи после того, как он достигнет своего предела потока. Согласно ответу Sjlee, это невозможно. Глядя на метод execute () ThreadPoolExecutor, это действительно невозможно. Мне нужно было бы создать подкласс ThreadPoolExecutor и переопределить execute () примерно так, как это делает SwingWorker, но то, что SwingWorker делает в своем execute (), - это полный взлом.

Мэтт Кринкло-Фогт
источник
1
Какой у Вас вопрос? Разве ваш второй пример кода не является ответом на ваш заголовок?
rsp
4
Мне нужен пул потоков, который будет добавлять потоки по запросу по мере роста количества задач, но никогда не будет добавлять больше некоторого максимального количества потоков. CachedThreadPool уже делает это, за исключением того, что он добавит неограниченное количество потоков и не остановится на каком-то заранее определенном размере. Размер, который я определяю в примерах, равен 3. Во втором примере добавляется 1 поток, но не добавляются еще два по мере поступления новых задач, в то время как другие задачи еще не завершены.
Мэтт Кринкло-Фогт,
Проверьте это, он это решает, отладкаisfun.blogspot.com
2012/05/…
Связано с: stackoverflow.com/questions/19528304/…
Серый

Ответы:

235

ThreadPoolExecutor имеет следующие несколько основных поведений, и ваши проблемы можно объяснить этим поведением.

Когда задачи отправлены,

  1. Если пул потоков не достиг размера ядра, он создает новые потоки.
  2. Если достигнут размер ядра и нет свободных потоков, он ставит задачи в очередь.
  3. Если достигнут размер ядра, нет простаивающих потоков и очередь заполняется, создаются новые потоки (пока не достигнут максимальный размер).
  4. Если достигнут максимальный размер, нет незанятых потоков и очередь заполняется, срабатывает политика отклонения.

В первом примере обратите внимание, что размер SynchronousQueue по существу равен 0. Поэтому, как только вы достигнете максимального размера (3), сработает политика отклонения (# 4).

Во втором примере предпочтительной очередью является LinkedBlockingQueue неограниченного размера. Следовательно, вы застреваете в поведении №2.

Вы не можете сильно повозиться с кешированным или фиксированным типом, поскольку их поведение почти полностью определено.

Если вы хотите иметь ограниченный и динамический пул потоков, вам необходимо использовать положительный размер ядра и максимальный размер в сочетании с очередью конечного размера. Например,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Приложение : это довольно старый ответ, и похоже, что JDK изменил свое поведение, когда дело доходит до размера ядра 0. Начиная с JDK 1.6, если размер ядра равен 0 и в пуле нет потоков, ThreadPoolExecutor добавит поток для выполнения этой задачи. Следовательно, размер ядра 0 является исключением из приведенного выше правила. Благодаря Steve для чего , что мое внимание.

sjlee
источник
4
Вы должны написать несколько слов о методе, allowCoreThreadTimeOutчтобы этот ответ был идеальным. См. Ответ @ user1046052
hsestupin
1
Отличный ответ! Еще одно замечание: стоит упомянуть и другие правила отказа. См. Ответ @brianegge
Джефф
1
Поведение 2 не должно говорить: «Если достигнут размер maxThread и нет свободных потоков, он ставит задачи в очередь». ?
Zoltán
1
Не могли бы вы пояснить, что подразумевает размер очереди? Означает ли это, что только 20 задач могут быть поставлены в очередь, прежде чем они будут отклонены?
Zoltán
1
@ Zoltán Я написал это некоторое время назад, так что есть вероятность, что с тех пор какое-то поведение могло измениться (я не слишком внимательно следил за последними действиями), но при условии, что это поведение не изменилось, # 2 является правильным, как указано, и это пожалуй, самый важный (и несколько удивительный) момент в этом. По достижении размера ядра TPE предпочитает создание очередей созданию новых потоков. Размер очереди буквально равен размеру очереди, переданной в TPE. Если очередь заполняется, но не достигла максимального размера, она создает новый поток (не отклоняет задачи). См. № 3. Надеюсь, это поможет.
sjlee 01
60

Если я что-то не упустил, ответ на исходный вопрос прост. Следующий код реализует желаемое поведение, как описано на исходном плакате. Он создаст до 5 потоков для работы с неограниченной очередью, а незанятые потоки завершатся через 60 секунд.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

источник
1
Ты прав. Этот метод был добавлен в jdk 1.6, поэтому не так много людей знают о нем. Кроме того, у вас не может быть «минимального» размера основного пула, что прискорбно.
jtahlborn
4
Единственное, что меня беспокоит (из документации JDK 8): «Когда новая задача отправляется в методе execute (Runnable) и выполняется меньше потоков corePoolSize, создается новый поток для обработки запроса, даже если другой рабочий потоки простаивают ".
veegee,
Уверен, что это на самом деле не работает. В прошлый раз, когда я смотрел, выполняя описанное выше, на самом деле ваша работа выполняется только в одном потоке, даже если вы создаете 5. Опять же, прошло несколько лет, но когда я погрузился в реализацию ThreadPoolExecutor, он отправлялся в новые потоки только после того, как ваша очередь была заполнена. Использование неограниченной очереди приводит к тому, что этого никогда не происходит. Вы можете протестировать, отправив работу и войдя в систему под именем потока, а затем перейдя в режим сна. Каждый runnable будет печатать одно и то же имя / не будет запущен ни в каком другом потоке.
Мэтт Кринкло-Фогт,
2
Это действительно работает, Мэтт. Вы устанавливаете размер ядра равным 0, поэтому у вас был только 1 поток. Хитрость здесь в том, чтобы установить максимальный размер ядра.
T-Gergely
1
@vegee прав - это на самом деле не очень хорошо работает - ThreadPoolExecutor будет повторно использовать потоки только тогда, когда выше corePoolSize. Поэтому, когда corePoolSize равен maxPoolSize, вы только выиграете от кеширования потоков, когда ваш пул будет заполнен (так что, если вы собираетесь использовать это, но обычно остаетесь ниже максимального размера пула, вы также можете уменьшить тайм-аут потока до минимума значение; и имейте в виду, что нет кеширования - всегда новые потоки)
Крис Ридделл,
7

Была такая же проблема. Поскольку никакой другой ответ не объединяет все проблемы, я добавляю свой:

Теперь это четко написано в документации : если вы используете очередь, которая не блокирует ( LinkedBlockingQueue), настройка максимального количества потоков не имеет никакого эффекта, используются только основные потоки.

так:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

У этого исполнителя есть:

  1. Нет концепции максимальных потоков, поскольку мы используем неограниченную очередь. Это хорошо, потому что такая очередь может привести к тому, что исполнитель создаст огромное количество неосновных, дополнительных потоков, если он будет следовать своей обычной политике.

  2. Очередь максимального размера Integer.MAX_VALUE. Submit()будет выбрасывать, RejectedExecutionExceptionесли количество ожидающих задач превышает Integer.MAX_VALUE. Не уверен, что сначала у нас закончится память, или это произойдет.

  3. Имеет 4 возможных основных потока. Неактивные основные потоки автоматически завершаются, если они простаивают в течение 5 секунд. Итак, да, потоки строго по требованию. Число может быть изменено с помощью setThreads()метода.

  4. Следит за тем, чтобы минимальное количество основных потоков никогда не было меньше единицы, иначе submit()будет отклонять каждую задачу. Поскольку основные потоки должны быть> = max потоков, метод также setThreads()устанавливает максимальное количество потоков, хотя настройка максимального потока бесполезна для неограниченной очереди.

SD
источник
Я думаю , вы также должны установить «allowCoreThreadTimeOut» к «истинной», в противном случае, когда потоки создаются, вы будете держать их вокруг навсегда: gist.github.com/ericdcobb/46b817b384f5ca9d5f5d
Эрику
ой, я просто пропустил это, извините, тогда ваш ответ идеален!
Эрик
6

В вашем первом примере последующие задачи отклоняются, потому что AbortPolicyпо умолчанию RejectedExecutionHandler. ThreadPoolExecutor содержит следующие политики, которые вы можете изменить с помощью setRejectedExecutionHandlerметода:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Похоже, вам нужен кешированный пул потоков с CallerRunsPolicy.

brianegge
источник
5

Ни один из ответов здесь не устранил мою проблему, которая была связана с созданием ограниченного количества HTTP-соединений с использованием HTTP-клиента Apache (версия 3.x). Поскольку мне потребовалось несколько часов, чтобы найти хорошую настройку, я поделюсь:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Это создает объект, ThreadPoolExecutorкоторый начинается с пяти и содержит максимум десять одновременно работающих потоков, используемых CallerRunsPolicyдля выполнения.

Павел
источник
Проблема с этим решением заключается в том, что если вы увеличите количество производителей, вы увеличите количество потоков, выполняющих фоновые потоки. Во многих случаях это не то, что вам нужно.
Грей
3

Согласно Javadoc для ThreadPoolExecutor:

Если запущено больше, чем corePoolSize, но меньше, чем maximumPoolSize потоков, новый поток будет создан только в том случае, если очередь заполнена . Устанавливая одинаковые значения corePoolSize и maximumPoolSize, вы создаете пул потоков фиксированного размера.

(Акцент мой.)

Ответ джиттера - это то, что вы хотите, хотя мой ответ на ваш другой вопрос. :)

Джонатан Файнберг
источник
2

есть еще один вариант. Вместо использования новой SynchronousQueue вы также можете использовать любую другую очередь, но вы должны убедиться, что ее размер равен 1, так что это заставит Exectorservice создать новый поток.

Ashkrit
источник
Я думаю, вы имеете в виду размер 0 (по умолчанию), чтобы не было задач в очереди и действительно заставляло службу исполнителя каждый раз создавать новый поток.
Leonmax
2

Не похоже, что какой-либо из ответов действительно отвечает на вопрос - на самом деле я не вижу способа сделать это - даже если вы подклассифицируете PooledExecutorService, поскольку многие из методов / свойств являются частными, например, создание addIfUnderMaximumPoolSize было защищено, вы могли сделать следующее:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

Самое близкое, что я получил, было это - но даже это не очень хорошее решение

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

пс не проверял выше

Стюарт
источник
2

Вот еще одно решение. Я думаю, что это решение ведет себя так, как вы хотите (хотя и не горжусь этим решением):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
Стюарт
источник
2

Это то, что вы хотите (по крайней мере, я так думаю). Для объяснения проверьте ответ Джонатана Файнберга

Executors.newFixedThreadPool(int n)

Создает пул потоков, который повторно использует фиксированное количество потоков, работающих в общей неограниченной очереди. В любой момент не более nThreads потоков будут активными задачами обработки. Если дополнительные задачи отправляются, когда все потоки активны, они будут ждать в очереди, пока поток не станет доступным. Если какой-либо поток завершается из-за сбоя во время выполнения перед завершением работы, новый поток займет его место, если это необходимо для выполнения последующих задач. Потоки в пуле будут существовать до тех пор, пока он не будет явно отключен.

дрожание
источник
4
Конечно, я мог бы использовать фиксированный пул потоков, но это оставило бы n потоков навсегда или до тех пор, пока я не вызову выключение. Мне нужно что-то в точности как пул кэшированных потоков (он создает потоки по запросу, а затем убивает их по истечении некоторого времени ожидания), но с ограничением количества потоков, которые он может создать.
Мэтт Кринкло-Фогт,
0
  1. Вы можете использовать, ThreadPoolExecutorкак предлагает @sjlee

    Вы можете управлять размером пула динамически. Взгляните на этот вопрос для получения более подробной информации:

    Динамический пул потоков

    ИЛИ

  2. Вы можете использовать API newWorkStealingPool , который был представлен в java 8.

    public static ExecutorService newWorkStealingPool()

    Создает пул рабочих потоков, использующий все доступные процессоры в качестве целевого уровня параллелизма.

По умолчанию уровень параллелизма равен количеству ядер ЦП на вашем сервере. Если у вас есть сервер с 4-ядерным процессором, размер пула потоков будет равен 4. Этот API возвращает ForkJoinPoolтип ExecutorService и разрешает работу кражи незанятых потоков путем кражи задач из занятых потоков в ForkJoinPool.

Равиндра бабу
источник
0

Проблема была резюмирована следующим образом:

Мне нужно что-то точно такое же, как пул кэшированных потоков (он создает потоки по запросу, а затем убивает их после некоторого тайм-аута), но с ограничением количества потоков, которые он может создать, и возможностью продолжать ставить в очередь дополнительные задачи после того, как он достигнет своего ограничение потока.

Прежде чем указать на решение, я объясню, почему следующие решения не работают:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

Это не будет ставить в очередь задачи, когда будет достигнуто ограничение в 3, потому что SynchronousQueue, по определению, не может содержать никаких элементов.

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

Это не создаст более одного потока, потому что ThreadPoolExecutor создает потоки, превышающие corePoolSize, только если очередь заполнена. Но LinkedBlockingQueue никогда не бывает заполнен.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

Это не будет повторно использовать потоки, пока не будет достигнут corePoolSize, потому что ThreadPoolExecutor увеличивает количество потоков до достижения corePoolSize, даже если существующие потоки простаивают. Если вы можете смириться с этим недостатком, то это самое простое решение проблемы. Это также решение, описанное в «Java Concurrency in Practice» (сноска на стр. 172).

Единственное полное решение описанной проблемы, по-видимому, связано с переопределением offerметода очереди и написанием, RejectedExecutionHandlerкак описано в ответах на этот вопрос: как заставить ThreadPoolExecutor увеличить потоки до максимального значения перед постановкой в ​​очередь?

Стефан Фейерхан
источник
0

Это работает для Java8 + (и других пока что ..)

     Executor executor = new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>()){{allowCoreThreadTimeOut(true);}};

где 3 - это предел количества потоков, а 5 - тайм-аут для незанятых потоков.

Если вы хотите проверить, работает ли он самостоятельно , вот код для выполнения работы:

public static void main(String[] args) throws InterruptedException {
    final int DESIRED_NUMBER_OF_THREADS=3; // limit of number of Threads for the task at a time
    final int DESIRED_THREAD_IDLE_DEATH_TIMEOUT=5; //any idle Thread ends if it remains idle for X seconds

    System.out.println( java.lang.Thread.activeCount() + " threads");
    Executor executor = new ThreadPoolExecutor(DESIRED_NUMBER_OF_THREADS, DESIRED_NUMBER_OF_THREADS, DESIRED_THREAD_IDLE_DEATH_TIMEOUT, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()) {{allowCoreThreadTimeOut(true);}};

    System.out.println(java.lang.Thread.activeCount() + " threads");

    for (int i = 0; i < 5; i++) {
        final int fi = i;
        executor.execute(() -> waitsout("starting hard thread computation " + fi, "hard thread computation done " + fi,2000));
    }
    System.out.println("If this is UP, it works");

    while (true) {
        System.out.println(
                java.lang.Thread.activeCount() + " threads");
        Thread.sleep(700);
    }

}

static void waitsout(String pre, String post, int timeout) {
    try {
        System.out.println(pre);
        Thread.sleep(timeout);
        System.out.println(post);
    } catch (Exception e) {
    }
}

вывод кода выше для меня

1 threads
1 threads
If this is UP, it works
starting hard thread computation 0
4 threads
starting hard thread computation 2
starting hard thread computation 1
4 threads
4 threads
hard thread computation done 2
hard thread computation done 0
hard thread computation done 1
starting hard thread computation 3
starting hard thread computation 4
4 threads
4 threads
4 threads
hard thread computation done 3
hard thread computation done 4
4 threads
4 threads
4 threads
4 threads
3 threads
3 threads
3 threads
1 threads
1 threads
Ондржей
источник