Queue.Queue vs. Коллекции.deque

181

Мне нужна очередь, в которую может помещаться несколько потоков, и из которых могут считываться несколько потоков.

В Python есть как минимум два класса очереди, Queue.Queue и collection.deque, причем первый, по-видимому, использует второй для внутреннего использования. Оба утверждают, что они являются потокобезопасными в документации.

Тем не менее, документы очереди также утверждают:

collection.deque - это альтернативная реализация неограниченных очередей с быстрыми атомарными операциями append () и popleft (), которые не требуют блокировки.

Что, я думаю, я не совсем понимаю: означает ли это, что deque в конце концов не является полностью поточно-ориентированным?

Если это так, я не могу полностью понять разницу между двумя классами. Я вижу, что очередь добавляет функциональность блокировки. С другой стороны, он теряет некоторые скрытые функции, такие как поддержка оператора.

Прямой доступ к внутреннему объекту deque

х в очереди (). deque

потокобезопасный?

Кроме того, почему Queue использует мьютекс для своих операций, когда deque уже поточно-ориентирован?

miracle2k
источник
RuntimeError: deque mutated during iterationто, что вы могли бы получить, это использование общего dequeпотока между несколькими потоками и отсутствие блокировки ...
toine
4
@toine, который не имеет ничего общего с потоками, хотя. Вы можете получить эту ошибку всякий раз, когда добавляете / удаляете dequeвремя, повторяя даже в том же потоке. Единственная причина, по которой вы не можете получить эту ошибку, Queueзаключается в том, что Queueона не поддерживает итерацию.
макс

Ответы:

281

Queue.Queueи collections.dequeслужат различным целям. Queue.Queue предназначен для обеспечения возможности взаимодействия различных потоков с использованием сообщений / данных, находящихся в очереди, тогда как collections.dequeон просто предназначен в качестве структуры данных. Вот почему Queue.Queueесть такие методы , как put_nowait(), get_nowait()и join(), в то время как collections.dequeне делает. Queue.Queueне предназначен для использования в качестве коллекции, поэтому ему не хватает подобных inоператоров.

Это сводится к следующему: если у вас есть несколько потоков, и вы хотите, чтобы они могли общаться без необходимости блокировок, вы ищете Queue.Queue; если вы просто хотите использовать очередь или двустороннюю очередь в качестве структуры данных, используйте collections.deque.

Наконец, доступ к внутренней деке a и манипулирование ею - Queue.Queueэто игра с огнем - вы действительно не хотите этого делать.

Кит Гоган
источник
6
Нет, это не очень хорошая идея. Если вы посмотрите на источник Queue.Queue, он использует dequeпод капотом. collections.dequeэто коллекция, а Queue.Queueэто механизм связи. Queue.QueueСверхзадача в том, чтобы сделать его поточно-ориентированным. Использование dequeдля общения между потоками приведет только к болезненным гонкам. Всякий раз, когда dequeоказывается поточнобезопасным, это счастливая случайность того, как реализован интерпретатор, а не то, на что можно положиться. Вот почему Queue.Queueсуществует в первую очередь.
Кит Гоган
2
Просто помните, что если вы общаетесь между потоками, вы играете с огнем, используя deque. deque является потокобезопасным случайно из-за существования GIL. Реализация без GIL будет иметь совершенно другие характеристики производительности, поэтому игнорировать другие реализации нецелесообразно. Кроме того, вы рассчитали время очереди для использования между потоками в отличие от наивного теста его использования в одном потоке? Если ваш код , который чувствителен к скорости очереди против дека, Python не может быть языком , который вы ищете.
Кит Гоган
3
@KeithGaughan deque is threadsafe by accident due to the existence of GIL; Это правда, что dequeGIL обеспечивает безопасность потоков, но это не так by accident. В официальной документации на python четко указано, что методы deque pop*/ append*являются потокобезопасными. Таким образом, любая действительная реализация на python должна предоставлять такую ​​же гарантию (реализации без GIL должны будут выяснить, как это сделать без GIL). Вы можете смело полагаться на эти гарантии.
максимум
2
@fantabolous мой предыдущий комментарий, несмотря на то, я не совсем понимаю, как вы будете использовать dequeдля общения. Если вы обернетесь popв try/except, вы закончите с занятым циклом, поглощающим огромное количество процессоров, ожидающих новых данных. Это кажется ужасно неэффективным подходом по сравнению с предлагаемыми блокирующими вызовами Queue, которые гарантируют, что поток, ожидающий данные, перейдет в спящий режим и не будет тратить время ЦП.
максимум
3
Вы можете взять чтение исходного кода Queue.Queueто, потому что это написано с помощью collections.deque: hg.python.org/cpython/file/2.7/Lib/Queue.py - он использует переменные условия для эффективного позволить dequeего обтекает доступ за границы потока безопасно и эффективно. Объяснение того, как вы использовали бы dequeдля общения, прямо там, в источнике.
Кит Гоган,
44

Если все, что вам нужно, это потокобезопасный способ переноса объектов между потоками , тогда оба будут работать (как для FIFO, так и для LIFO). Для FIFO:

Примечание:

  • Другие операции dequeне могут быть потокобезопасными, я не уверен.
  • dequeне блокируется pop()или, popleft()таким образом, вы не можете основывать поток своих потребительских потоков на блокировке, пока не прибудет новый элемент.

Однако, похоже, что deque имеет значительное преимущество в эффективности . Вот некоторые результаты тестов в секундах с использованием CPython 2.7.3 для вставки и удаления 100 тыс. Элементов

deque 0.0747888759791
Queue 1.60079066852

Вот эталонный код:

import time
import Queue
import collections

q = collections.deque()
t0 = time.clock()
for i in xrange(100000):
    q.append(1)
for i in xrange(100000):
    q.popleft()
print 'deque', time.clock() - t0

q = Queue.Queue(200000)
t0 = time.clock()
for i in xrange(100000):
    q.put(1)
for i in xrange(100000):
    q.get()
print 'Queue', time.clock() - t0
Джонатан
источник
1
Вы утверждаете, что «Другие операции dequeне могут быть потокобезопасными». Откуда ты это взял?
Мэтт
@Matt - перефразировано, чтобы лучше передать мой смысл
Джонатан
3
Хорошо, спасибо. Это мешало мне использовать deque, потому что я думал, что вы знаете что-то, чего я не знал. Я предполагаю, что просто предположу, что это потокобезопасно, пока не узнаю иначе.
Мэтт
@Matt "Операции deque: append (), appendleft (), pop (), popleft () и len (d) поточно-ориентированы в CPython." источник: bugs.python.org/issue15329
Филиппо Витале
7

Для информации есть билет Python, на который ссылаются для безопасности потоков deque ( https://bugs.python.org/issue15329 ). Заголовок «уточнить, какие методы deque являются поточно-ориентированными»

Итог здесь: https://bugs.python.org/issue15329#msg199368

Операции deque append (), appendleft (), pop (), popleft () и len (d) являются поточно-ориентированными в CPython. Методы добавления имеют DECREF в конце (для случаев, когда был установлен maxlen), но это происходит после того, как все обновления структуры были сделаны и инварианты были восстановлены, поэтому можно рассматривать эти операции как атомарные.

В любом случае, если вы не уверены на 100% и предпочитаете надежность, а не производительность, просто поставьте «лайк»;)

Злой волк
источник
6

Все одноэлементные методы dequeявляются атомарными и потокобезопасными. Все остальные методы также поточно-ориентированы. Такие вещи, как len(dq), dq[4]дают мгновенные правильные значения. Но подумайте, например, о том, что dq.extend(mylist): вы не получаете гарантию того, что все элементы mylistнаходятся в строке, когда другие потоки также добавляют элементы на одной стороне, но обычно это не является обязательным требованием для взаимодействия между потоками и для поставленной задачи.

Таким образом, a dequeв ~ 20 раз быстрее, чем Queue(который использует dequeскрытую структуру), и если вам не нужен «удобный» API синхронизации (блокировка / тайм-аут), строгое maxsizeвыполнение или «Переопределите эти методы (_put, _get, ..»). ) для реализации поведения подклассов в других организациях очередей , или когда вы сами позаботитесь о таких вещах, тогда голое dequeявляется хорошим и эффективным решением для высокоскоростной связи между потоками.

Фактически интенсивное использование дополнительных мьютексов, дополнительных методов ._get()и т. Д., Вызываемых методами Queue.py, обусловлено ограничениями обратной совместимости, прошлым чрезмерным дизайном и отсутствием заботы о предоставлении эффективного решения этой важной проблемы узкого места в скорости при межпотоковой связи. Список использовался в более старых версиях Python, но даже list.append () /. Pop (0) был и является атомарным и безопасным для потоков ...

KXR
источник
3

Добавление notify_all()к каждому deque appendи popleftприводит к гораздо худшим результатам для dequeчем улучшение 20х достигается по умолчанию dequeповедения :

deque + notify_all: 0.469802
Queue:              0.667279

@Jonathan немного изменил свой код, и я получил эталонный тест с использованием cPython 3.6.2 и добавил условие в цикл deque для имитации поведения, которое делает Queue.

import time
from queue import Queue
import threading
import collections

mutex = threading.Lock()
condition = threading.Condition(mutex)
q = collections.deque()
t0 = time.clock()
for i in range(100000):
    with condition:
        q.append(1)
        condition.notify_all()
for _ in range(100000):
    with condition:
        q.popleft()
        condition.notify_all()
print('deque', time.clock() - t0)

q = Queue(200000)
t0 = time.clock()
for _ in range(100000):
    q.put(1)
for _ in range(100000):
    q.get()
print('Queue', time.clock() - t0)

И, похоже, производительность ограничена этой функцией condition.notify_all()

collection.deque - это альтернативная реализация неограниченных очередей с быстрыми атомарными операциями append () и popleft (), которые не требуют блокировки. очередь документов

nikan1996
источник
2

dequeПотокобезопасен. «Операции, которые не требуют блокировки» означает, что вам не нужно делать блокировку самостоятельно, все dequeзаботится об этом.

Взглянув на Queueисточник, вызывается внутренняя deque, которая self.queueиспользует мьютекс для аксессоров и мутаций, поэтому неQueue().queue является поточно-ориентированным для использования.

Если вы ищете оператор «in», то, вероятно, не самая подходящая структура данных для вашей проблемы - очередь или очередь.

Брайен-бразилия
источник
1
Что я хочу сделать, так это убедиться, что дубликаты не добавляются в очередь. Разве это не то, что очередь может потенциально поддерживать?
miracle2k
1
Вероятно, было бы лучше иметь отдельный набор и обновлять его, когда вы добавляете / удаляете что-то из очереди. Это будет O (log n), а не O (n), но вы должны быть осторожны, чтобы синхронизировать набор и очередь (т. Е. Блокировку).
Брайан, Бразилия,
Набор Python - это хеш-таблица, поэтому это будет O (1). Но да, вам все равно придется делать блокировку.
AFoglia
1

(кажется, у меня нет репутации, чтобы комментировать ...) Вы должны быть осторожны, какие методы deque вы используете из разных потоков.

deque.get () кажется потокобезопасным, но я обнаружил, что делать

for item in a_deque:
   process(item)

может потерпеть неудачу, если другой поток добавляет элементы одновременно. Я получил RuntimeException, который жаловался на "deque muted во время итерации".

Проверьте Коллекции модуля.c, чтобы увидеть, какие операции влияют на это

Элиот Бленнерхассетт
источник
такого рода ошибки не являются особенными для потоков и основной безопасности потоков. Это происходит, например, просто делая >>> di = {1:None} >>> for x in di: del di[x]
KXR
1
По сути, вы никогда не должны зацикливаться на том, что может быть изменено другим потоком (хотя в некоторых случаях вы можете сделать это, добавив собственную защиту). Как и в случае с очередью, вы должны извлечь / извлечь элемент из очереди перед его обработкой, и вы обычно делаете это с помощью whileцикла.
Фанатбол