Как использовать ConcurrentLinkedQueue?

97

Как использовать ConcurrentLinkedQueueв Java?
Используя это LinkedQueue, мне нужно беспокоиться о параллельности в очереди? Или мне просто нужно определить два метода (один для извлечения элементов из списка, а другой для добавления элементов в список)?
Примечание: очевидно, что эти два метода должны быть синхронизированы. Правильно?


РЕДАКТИРОВАТЬ: Я пытаюсь сделать следующее: у меня есть класс (на Java) с одним методом для извлечения элементов из очереди и другим классом с одним методом для добавления элементов в очередь. Элементы, добавленные и извлеченные из списка, являются объектами моего собственного класса.

Еще один вопрос: нужно ли это делать в методе удаления:

while (queue.size() == 0){ 
  wait(); 
  queue.poll();
}

У меня только один потребитель и один производитель.

Рикардо Фелгейрас
источник
Спасибо за ответ на вопрос mt. Я пытаюсь сделать следующее: у меня есть класс (на Java) с одним методом для извлечения элементов из очереди и другим классом с одним методом для добавления элементов в очередь. Элементы, добавленные и извлеченные из списка, являются объектами моего собственного класса.
Рикардо Фелгейрас,
2
Вам следует отредактировать свой вопрос и внести это уточнение в сам вопрос.
Adam Jaskiewicz 05

Ответы:

158

Нет, методы не нужно синхронизировать, и вам не нужно определять какие-либо методы; они уже находятся в ConcurrentLinkedQueue, просто используйте их. ConcurrentLinkedQueue выполняет все необходимые вам операции блокировки и другие операции; ваш производитель (и) добавляет данные в очередь, и ваши потребители опрашивают их.

Сначала создайте свою очередь:

Queue<YourObject> queue = new ConcurrentLinkedQueue<YourObject>();

Теперь, где бы вы ни создавали свои объекты производителя / потребителя, передайте очередь, чтобы у них было куда поместить свои объекты (вместо этого вы можете использовать сеттер, но я предпочитаю делать такие вещи в конструкторе):

YourProducer producer = new YourProducer(queue);

а также:

YourConsumer consumer = new YourConsumer(queue);

и добавьте в него что-нибудь в своем продюсере:

queue.offer(myObject);

и извлеките материал из своего потребителя (если очередь пуста, poll () вернет null, поэтому проверьте его):

YourObject myObject = queue.poll();

Для получения дополнительной информации см. Javadoc

РЕДАКТИРОВАТЬ:

Если вам нужно заблокировать ожидание, пока очередь не станет пустой, вы, вероятно, захотите использовать LinkedBlockingQueue и использовать метод take (). Однако LinkedBlockingQueue имеет максимальную емкость (по умолчанию Integer.MAX_VALUE, что превышает два миллиарда) и, следовательно, может или не может быть подходящим в зависимости от ваших обстоятельств.

Если у вас есть только один поток, помещающий данные в очередь, а другой поток извлекает данные из очереди, ConcurrentLinkedQueue, вероятно, является излишним. Это больше для случаев, когда у вас могут быть сотни или даже тысячи потоков, обращающихся к очереди одновременно. Ваши потребности, вероятно, будут удовлетворены с помощью:

Queue<YourObject> queue = Collections.synchronizedList(new LinkedList<YourObject>());

Плюс этого в том, что он блокирует экземпляр (очередь), поэтому вы можете синхронизировать по очереди, чтобы гарантировать атомарность составных операций (как объяснил Джаред). Вы НЕ МОЖЕТЕ сделать это с помощью ConcurrentLinkedQueue, поскольку все операции выполняются БЕЗ блокировки экземпляра (с использованием переменных java.util.concurrent.atomic). Вам НЕ нужно делать это, если вы хотите заблокировать, пока очередь пуста, потому что poll () просто вернет null, пока очередь пуста, а poll () является атомарным. Проверьте, возвращает ли poll () значение null. Если это так, подождите () и попробуйте еще раз. Не нужно запирать.

В заключение:

Честно говоря, я бы просто использовал LinkedBlockingQueue. Это все еще излишне для вашего приложения, но есть вероятность, что оно будет работать нормально. Если он недостаточно эффективен (ПРОФИЛЬ!), Вы всегда можете попробовать что-то еще, и это означает, что вам не нужно иметь дело с ЛЮБЫМ синхронизированным материалом:

BlockingQueue<YourObject> queue = new LinkedBlockingQueue<YourObject>();

queue.put(myObject); // Blocks until queue isn't full.

YourObject myObject = queue.take(); // Blocks until queue isn't empty.

В остальном все то же самое. Put, вероятно , не будет блокировать, потому что вы вряд ли поместите в очередь два миллиарда объектов.

Адам Яскевич
источник
Спасибо за ваш ответ. Еще один вопрос: нужно ли это делать в методе удаления: while (queue.size () == 0) wait (); queue.poll ();
Рикардо Фелгейрас,
Я отвечу на это как на правку своего ответа, поскольку это очень важно.
Адам Яскевич 05
Поскольку возникла путаница в другом вопросе, Collection.synchronizedListвозвращает, Listкоторый не реализуется Queue.
Tom Hawtin - tackline
@AdamJaskiewicz использует ConcurrentLinkedQueueдля Producer Consumer хорошую идею, я имею в виду этот пост stackoverflow.com/questions/1426754/…
rd22 06
37

Это во многом дублирует другой вопрос .

Вот часть ответа, имеющая отношение к этому вопросу:

Нужно ли мне выполнять собственную синхронизацию, если я использую java.util.ConcurrentLinkedQueue?

Атомарные операции с параллельными коллекциями синхронизируются за вас. Другими словами, каждый отдельный вызов очереди гарантирован потокобезопасным без каких-либо действий с вашей стороны. Потокобезопасность не гарантируется, так как любые операции, которые вы выполняете с коллекцией, не являются атомарными.

Например, это потокобезопасный вариант без каких-либо действий с вашей стороны:

queue.add(obj);

или

queue.poll(obj);

Однако; неатомарные вызовы очереди не являются автоматически потокобезопасными. Например, следующие операции не являются автоматически потокобезопасными:

if(!queue.isEmpty()) {
   queue.poll(obj);
}

Последний поток не является потокобезопасным, так как вполне возможно, что между вызовом isEmpty и вызовом опроса другие потоки будут добавлять или удалять элементы из очереди. Поточно-ориентированный способ сделать это выглядит следующим образом:

synchronized(queue) {
    if(!queue.isEmpty()) {
       queue.poll(obj);
    }
}

Опять же ... атомарные вызовы очереди автоматически становятся потокобезопасными. Неатомарные вызовы - нет.

Джаред
источник
1
Единственное распространенное использование, которое я могу придумать, - это блокировка до тех пор, пока очередь не станет пустой. Разве для этого не лучше использовать реализацию BlockingQueue (take () является атомарным и блокируется до тех пор, пока не будет чего потреблять)?
Адам Яскевич 05
6
Также с этим нужно быть осторожным. В отличие от synchronizedList, ConcurrentLinkedQueue не синхронизируется сам по себе, поэтому в вашем коде производитель все еще может предлагать очереди, пока вы находитесь в синхронизированном блоке.
Адам Яскевич 05
Зачем комбинировать queue.isEmpty () и queue.poll ()? Разве вы не можете просто опросить и проверить, нулевой ли результат? (
Насколько
Я согласен со всем в вашем коде, кроме последнего блока кода. Синхронизация с ConcurrentLInkedQueue ничего вам не гарантирует, потому что другие вызовы не синхронизируются. Таким образом, могло произойти «add (..)» из другого потока между вашими «isEmpty ()» и «poll (...)», даже если вы синхронизировали этот поток
Klitos G.
6

Используйте опрос, чтобы получить первый элемент, и добавьте, чтобы добавить новый последний элемент. Все, ни синхронизации, ни чего еще.

Хэнк Гей
источник
6

Вероятно, это то, что вы ищете с точки зрения безопасности потоков и «привлекательности» при попытке использовать все в очереди:

for (YourObject obj = queue.poll(); obj != null; obj = queue.poll()) {
}

Это гарантирует, что вы выйдете из очереди, когда очередь пуста, и продолжите выталкивать объекты из нее, пока она не пуста.

марка
источник
Очень полезно для очистки очереди. Спасибо.
DevilCode
2

ConcurentLinkedQueue - очень эффективная реализация без ожидания / блокировки (см. Javadoc для справки), поэтому не только вам не нужно синхронизировать, но и очередь ничего не блокирует, что делает ее практически такой же быстрой, как несинхронизированная (не поток сейф) один.

Сиддхадев
источник
1

Просто используйте его, как не параллельную коллекцию. Классы Concurrent [Collection] обертывают обычные коллекции, так что вам не нужно думать о синхронизации доступа.

Изменить: ConcurrentLinkedList на самом деле не просто оболочка, а лучшая параллельная реализация. В любом случае вам не нужно беспокоиться о синхронизации.

Бен С
источник
ConcurrentLinkedQueue - нет. Он специально построен с нуля для одновременного доступа нескольких производителей и потребителей. Немного красивее, чем простые обертки, возвращаемые Collections.synchronized *
Адам Яскевич
Да, в J2SE 5 было добавлено много
интересных