Какую реализацию параллельной очереди следует использовать в Java?

132

Из JavaDocs:

  • ConcurrentLinkedQueue является подходящим выбором , когда много потоков будет общий доступ к общей коллекции. Эта очередь не допускает пустых элементов.
  • ArrayBlockingQueue - это классический «ограниченный буфер», в котором массив фиксированного размера содержит элементы, вставленные производителями и извлеченные потребителями. Этот класс поддерживает необязательную политику справедливости для упорядочивания ожидающих потоков производителей и потребителей.
  • LinkedBlockingQueue обычно имеет более высокую пропускную способность, чем очереди на основе массивов, но менее предсказуемую производительность в большинстве параллельных приложений.

У меня есть 2 сценария: один требует, чтобы очередь поддерживала множество производителей (потоков, использующих его) с одним потребителем, а другой - наоборот.

Я не понимаю, какую реализацию использовать. Может кто-нибудь объяснить, в чем разница?

Кроме того, что такое «необязательная политика справедливости» в ArrayBlockingQueue?

Дэвид Хофманн
источник
1
Вы также забыли спросить о PriorityBlockingQueue, который полезен для указания порядка обработки потоков.
Игорь Ганапольский

Ответы:

53

В основном разница между ними заключается в характеристиках производительности и поведении блокировки.

Самое простое ArrayBlockingQueue- это очередь фиксированного размера. Поэтому, если вы установите размер 10 и попытаетесь вставить 11-й элемент, оператор вставки заблокируется, пока другой поток не удалит элемент. Проблема справедливости заключается в том, что происходит, если несколько потоков пытаются одновременно вставлять и удалять (другими словами, в период, когда Очередь была заблокирована). Алгоритм равноправия гарантирует, что первый поток, который запрашивает, будет первым потоком, который получит. В противном случае данный поток может ждать дольше, чем другие потоки, что приводит к непредсказуемому поведению (иногда одному потоку требуется всего несколько секунд, потому что другие потоки, запущенные позже, обрабатываются первыми). Компромисс заключается в том, что для управления справедливостью требуются накладные расходы, что снижает пропускную способность.

Наиболее важное различие между LinkedBlockingQueueи ConcurrentLinkedQueueзаключается в том, что если вы запрашиваете элемент из a, LinkedBlockingQueueа очередь пуста, ваш поток будет ждать, пока там что-то не будет. A сразу ConcurrentLinkedQueueже вернется с поведением пустой очереди.

Какой из них зависит от того, нужна ли вам блокировка. Когда у вас много производителей и один потребитель, это звучит так. С другой стороны, если у вас много потребителей и только один производитель, вам может не понадобиться блокирующее поведение, и вы можете просто попросить потребителей проверить, пуста ли очередь, и продолжить, если это так.

Ишай
источник
67
Ответ вводит в заблуждение. Как LinkedBlockingQueue, так и ConcurrentLinkedQueue имеют метод poll (), который удаляет заголовок очереди или возвращает значение null (не блокирует), и метод offer (E e), который вставляется в конец очереди и не блокирует. Разница в том, что только LinkedBlockingQueue имеет блокирующие операции в дополнение к неблокирующим операциям - и за эту привилегию вы платите цену, которую LinkedBlockingQueue фактически имеет некоторую блокировку. Другой ответ объясняет это.
Nakedible
123

ConcurrentLinkedQueue означает, что блокировки не выполняются (т.е. не выполняются вызовы synchronized (this) или Lock.lock ). Он будет использовать операцию CAS - Compare and Swap во время модификаций, чтобы увидеть, остается ли головной / хвостовой узел таким же, как при запуске. Если это так, операция завершается успешно. Если узел голова / хвост отличается, он будет вращаться и повторить попытку.

LinkedBlockingQueue будет заблокирован перед любым изменением. Таким образом, ваши звонки с предложениями будут блокироваться, пока они не получат блокировку. Вы можете использовать перегрузку предложения, которая требует TimeUnit, чтобы сказать, что вы готовы подождать только X количество времени, прежде чем отказаться от добавления (обычно хорошо для очередей типов сообщений, где сообщение устарело после X количества миллисекунд).

Справедливость означает, что реализация Lock будет поддерживать порядок потоков. Это означает, что если поток A входит, а затем входит поток B, поток A первым получит блокировку. Без всякой справедливости, на самом деле не известно, что происходит. Скорее всего, это будет следующий запланированный поток.

Что касается того, что использовать, это зависит. Я предпочитаю использовать ConcurrentLinkedQueue, потому что время, необходимое моим производителям, чтобы поставить работу в очередь, разнообразно. У меня не так много продюсеров работают одновременно. Но со стороны потребителя сложнее, потому что опрос не переходит в состояние хорошего сна. Вы должны сами справиться с этим.

Джастин Радд
источник
1
И при каких условиях ArrayBlockingQueue лучше LinkedBlockingQueue?
колобок
@akapelko ArrayBlockingQueue позволяет упорядочивать более детально.
Игорь Ганапольский
2
Что это значит - «он закрутится и попробует еще раз». ?
Лестер
9

В заголовке вашего вопроса упоминаются очереди с блокировкой. Однако ConcurrentLinkedQueueэто не очередь на блокировку.

В BlockingQueues являются ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, и SynchronousQueue.

Некоторые из них явно не подходит для ваших целей ( DelayQueue, PriorityBlockingQueueи SynchronousQueue). LinkedBlockingQueueи LinkedBlockingDequeидентичны, за исключением того, что последняя представляет собой двустороннюю очередь (она реализует интерфейс Deque).

поскольку ArrayBlockingQueue это полезно, только если вы хотите ограничить количество элементов, я бы придерживался LinkedBlockingQueue.

Powerlord
источник
Я удалил слово "Блокировка" из заголовка, спасибо. Позвольте мне посмотреть, понял ли я, то, что вы сказали, означает, что LinkedBlockingQueue может использоваться в сценариях нескольких потребителей / производителей для одного и того же объекта?
Дэвид Хофманн,
1
Я думал, что ArrayBlockingQueue позволяет более детально упорядочивать потоки? Отсюда его преимущество.
Игорь Ганапольский
4

ArrayBlockingQueue имеет меньший объем памяти, он может повторно использовать узел элемента, в отличие от LinkedBlockingQueue, который должен создавать объект LinkedBlockingQueue $ Node для каждой новой вставки.

Дэн
источник
1
хорошая точка зрения! Я предпочитаю ArrayBlockingQueue больше, чем LinkedBlockingQueue
триллионы
2
Это не обязательно так - если ваша очередь большую часть времени близка к пустой, но должна иметь возможность расти, у ArrayBlockingQueueнее будет гораздо худший объем памяти - у нее все еще есть большой массив, выделенный в памяти все время, в то время как LinkedBlockingQueueбудет иметь незначительный объем памяти , когда близко к пустой.
Krease 08
1
  1. SynchronousQueue(Взято из другого вопроса )

SynchronousQueueбольше похоже на передачу обслуживания, тогда LinkedBlockingQueueкак он позволяет использовать только один элемент. Разница в том, что put()вызов a SynchronousQueueне вернется, пока не будет соответствующего take()вызова, но с LinkedBlockingQueueразмером 1 put()вызов (в пустую очередь) вернется немедленно. По сути, это BlockingQueueреализация, когда вам действительно не нужна очередь (вы не хотите поддерживать какие-либо ожидающие данные).

  1. LinkedBlockingQueue( LinkedListРеализация, но не совсем JDK. Реализация в LinkedListнем использует статический внутренний класс Node для поддержания связей между элементами)

Конструктор для LinkedBlockingQueue

public LinkedBlockingQueue(int capacity) 
{
        if (capacity < = 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node< E >(null);   // Maintains a underlying linkedlist. ( Use when size is not known )
}

Класс узла, используемый для поддержки ссылок

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

3 ArrayBlockingQueue (реализация массива)

Конструктор для ArrayBlockingQueue

public ArrayBlockingQueue(int capacity, boolean fair) 
{
            if (capacity < = 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity]; // Maintains a underlying array
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
}

IMHO Самая большая разница между ArrayBlockingQueueи LinkedBlockingQueueочевидна из конструктора, у которого есть основная структура данных Array и другой связанныйList .

ArrayBlockingQueueиспользует алгоритм двойного условия одиночной блокировки и LinkedBlockingQueueявляется вариантом алгоритма «очереди с двумя блокировками» и имеет 2 блокировки 2 условия (takeLock, putLock)

Випин
источник
0

ConcurrentLinkedQueue не блокируется, а LinkedBlockingQueue - нет. Каждый раз, когда вы вызываете LinkedBlockingQueue.put () или LinkedBlockingQueue.take (), вам нужно сначала получить блокировку. Другими словами, у LinkedBlockingQueue плохой параллелизм. Если вам важна производительность, попробуйте ConcurrentLinkedQueue + LockSupport.

user319609
источник