Являются ли списки потокобезопасными?

155

Я заметил, что часто предлагается использовать очереди с несколькими потоками вместо списков и .pop(). Это потому, что списки не являются потокобезопасными или по какой-то другой причине?

lemiant
источник
1
Трудно сказать всегда, что именно является поточно-ориентированным в Python, и трудно рассуждать о безопасности потоков в нем. Даже у очень популярного биткойн-кошелька Electrum были ошибки параллелизма, вероятно, вытекающие из этого.
Судо

Ответы:

182

Сами списки являются потокобезопасными. В CPython GIL защищает от одновременного доступа к ним, а другие реализации стараются использовать детализированную блокировку или синхронизированный тип данных для своих реализаций списка. Однако, хотя сами списки не могут испортиться при попытках одновременного доступа, данные списков не защищены. Например:

L[0] += 1

не гарантируется фактическое увеличение L [0] на единицу, если другой поток делает то же самое, потому что +=это не атомарная операция. (Очень, очень мало операций в Python на самом деле являются атомарными, потому что большинство из них может вызвать вызов произвольного кода Python.) Вам следует использовать очереди, потому что, если вы просто используете незащищенный список, вы можете получить или удалить неправильный элемент из-за гонки. условия.

Томас Воутерс
источник
1
Является ли deque также поточно-ориентированным? Это кажется более подходящим для моего использования.
Lemiant
20
Все объекты Python имеют одинаковую безопасность потоков - они сами не портятся, но их данные могут. collection.deque - это то, что стоит за объектами Queue.Queue. Если вы обращаетесь к вещам из двух потоков, вам действительно следует использовать объекты Queue.Queue. В самом деле.
Томас Воутерс
10
lemiant, deque является потокобезопасным. Из главы 2 Fluent Python: «Класс collection.deque - это потокобезопасная двусторонняя очередь, предназначенная для быстрой вставки и удаления с обоих концов. [...] Операции добавления и popleft являются атомарными, поэтому deque безопасен для использовать в качестве LIFO-очереди в многопоточных приложениях без необходимости использования блокировок ».
Аль Суигарт
3
Это ответ о CPython или о Python? Каков ответ на сам Python?
user541686
@Nils: Ну, первая страница , которую вы связаны говорит Python вместо CPython , потому что это описание языка Python. И эта вторая ссылка буквально говорит о том, что существует множество реализаций языка Python, только одна из которых пользуется большей популярностью. Учитывая, что вопрос касался Python, ответ должен описывать то, что может гарантированно произойти в любой соответствующей реализации Python, а не только то, что происходит в CPython в частности.
user541686
90

Чтобы прояснить вопрос в превосходном ответе Томаса, следует упомянуть, что он append() является потокобезопасным.

Это связано с тем, что не нужно беспокоиться о том, что читаемые данные будут в одном и том же месте, как только мы перейдем к записи в них. append()Операция не читает данные, он только записывает данные в список.

dotancohen
источник
1
PyList_Append читает из памяти. Вы имеете в виду, что чтение и запись происходят в одной и той же блокировке GIL? github.com/python/cpython/blob/…
amwinter
1
@amwinter Да, весь вызов PyList_Appendвыполняется в одном GIL-замке. Дается ссылка на объект для добавления. Содержимое этого объекта может быть изменено после его оценки и перед выполнением вызова PyList_Append. Но это все равно будет тот же объект, и он будет безопасно добавлен (если вы это сделаете lst.append(x); ok = lst[-1] is x, то ok, конечно , может быть ложным). Код, на который вы ссылаетесь, не читает из добавленного объекта, кроме как для его INCREF. Он читает и может перераспределять список, к которому добавляется.
Грегго
3
Суть dotancohen заключается в том, что он L[0] += xбудет выполнять __getitem__включение, Lа затем __setitem__включение L- если Lподдерживает, то __iadd__это будет немного по-другому работать на объектном интерфейсе, но Lна уровне интерпретатора python по- прежнему выполняются две отдельные операции (вы увидите их в скомпилированный байт-код). Это appendделается одним вызовом метода в байт-коде.
Грегго
6
Как насчет remove?
18
2
upvoted! поэтому я могу добавлять в один поток непрерывно и вставлять в другой поток?
PirateApp
2

У меня недавно был этот случай, когда мне нужно было непрерывно добавлять в список в одном потоке, циклически проходить по элементам и проверять, готов ли элемент, в моем случае это был AsyncResult и удалять его из списка, только если он был готов. Я не смог найти ни одного примера, который бы четко продемонстрировал мою проблему. Вот пример, демонстрирующий непрерывное добавление в список в одном потоке и непрерывное удаление из этого же списка в другом потоке. Дефектная версия легко работает на меньших числах, но при этом сохраняет достаточно большие числа и запускает несколько раз, и вы увидите ошибку

FLAWED версия

import threading
import time

# Change this number as you please, bigger numbers will get the error quickly
count = 1000
l = []

def add():
    for i in range(count):
        l.append(i)
        time.sleep(0.0001)

def remove():
    for i in range(count):
        l.remove(i)
        time.sleep(0.0001)


t1 = threading.Thread(target=add)
t2 = threading.Thread(target=remove)
t1.start()
t2.start()
t1.join()
t2.join()

print(l)

Вывод при ОШИБКЕ

Exception in thread Thread-63:
Traceback (most recent call last):
  File "/Users/zup/.pyenv/versions/3.6.8/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/zup/.pyenv/versions/3.6.8/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-30-ecfbac1c776f>", line 13, in remove
    l.remove(i)
ValueError: list.remove(x): x not in list

Версия, которая использует блокировки

import threading
import time
count = 1000
l = []
lock = threading.RLock()
def add():
    with lock:
        for i in range(count):
            l.append(i)
            time.sleep(0.0001)

def remove():
    with lock:
        for i in range(count):
            l.remove(i)
            time.sleep(0.0001)


t1 = threading.Thread(target=add)
t2 = threading.Thread(target=remove)
t1.start()
t2.start()
t1.join()
t2.join()

print(l)

Вывод

[] # Empty list

Вывод

Как упоминалось в предыдущих ответах, в то время как процесс добавления или выталкивания элементов из самого списка является потокобезопасным, не является безопасным поток, когда вы добавляете в один поток и вставляете в другой

PirateApp
источник
6
Версия с блокировками имеет то же поведение, что и версия без блокировок. По сути, ошибка возникает потому, что она пытается удалить что-то, чего нет в списке, это никак не связано с безопасностью потоков. Попробуйте запустить версию с блокировками после изменения порядка запуска, то есть начать t2 до t1, и вы увидите ту же ошибку. всякий раз, когда t2 опережает t1, ошибка будет возникать независимо от того, используете вы блокировки или нет.
Дев
1
Кроме того, вам лучше использовать менеджер контекста ( with r:) вместо явного вызова r.acquire()иr.release()
GordonAitchJay
1
@GordonAitchJay 👍
Тимоти К. Куинн