многопроцессорность: как разделить dict между несколькими процессами?

113

Программа, которая создает несколько процессов, работающих в очереди с возможностью присоединения Q, и может в конечном итоге управлять глобальным словарем Dдля сохранения результатов. (поэтому каждый дочерний процесс может использовать Dдля хранения своего результата, а также видеть, какие результаты производят другие дочерние процессы)

Если я распечатаю словарь D в дочернем процессе, я увижу внесенные в него изменения (то есть в D). Но после того, как основной процесс присоединяется к Q, если я печатаю D, это пустой dict!

Я понимаю, что это проблема синхронизации / блокировки. Может кто подскажет, что здесь происходит, и как мне синхронизировать доступ к D?

допинг
источник
1
Это не работает должным образом, по крайней мере, на python 3.7.2 с использованием osx 10.14.4 Dict не синхронизируется, и его содержимое перезаписывается другими процессами. Однако <code> multiprocessing.Manager (). List () </code> работает должным образом.
Андрей Друченко

Ответы:

164

Общий ответ предполагает использование Managerобъекта. Адаптировано из документации:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

Вывод:

$ python mul.py 
{1: '111', '2': 6}
senderle
источник
4
Спасибо, отправитель. Действительно, D = multiprocessing.Manager (). Dict () решает мою проблему. Я использовал D = dict ().
Присадка
3
@LorenzoBelli, если вы спрашиваете, синхронизирован ли доступ к менеджеру, я считаю, что ответ - да. multiprocessing.Manager()возвращает экземплярSyncManager , название которого предполагает то же самое!
senderle 02
@senderle Я хочу поделиться множеством случайных состояний родительского процесса с дочерним процессом. Я пробовал использовать, Managerно все равно не повезло. Не могли бы вы взглянуть на мой вопрос здесь и посмотреть, можете ли вы предложить решение? Я все еще могу получать разные случайные числа, если я делаю это np.random.seed(None)каждый раз, когда генерирую случайное число, но это не позволяет мне использовать случайное состояние родительского процесса, чего я не хочу. Любая помощь приветствуется.
Amir
1
@RadioControlled рад написать обновление, но вкратце, хотя я не думаю, что вы можете сделать это напрямую, вы можете легко создать новый управляемый dict с теми же ключами и значениями и использовать его вместо оригинала. Это подходит для вашего случая?
отправитель
1
@senderle, это то, чем я закончил. Итак, ответ был бы таков, что вам нужно сделать именно это.
Radio Controlled
26

многопроцессорность не похожа на многопоточность. Каждый дочерний процесс получит копию памяти основного процесса. Обычно состояние передается через связь (каналы / сокеты), сигналы или разделяемую память.

Многопроцессорность делает некоторые абстракции доступными для вашего варианта использования - общее состояние, которое обрабатывается как локальное с помощью прокси-серверов или общей памяти: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Соответствующие разделы:

Джереми Браун
источник
1
Большое спасибо. Вы привели меня к / решению: multiprocessing.Manager (). Dict ().
Присадка
Может ли кто-нибудь уточнить, что означает утверждение «Каждый дочерний процесс получит копию памяти основного процесса».
Itsme2003 07
@ Itsme2003 по умолчанию порожденный процесс не имеет доступа к памяти родительского процесса (это одно из ключевых отличий от потоков). Поэтому, когда процессу нужен объект родительского процесса, он должен создать его копию (вместо получения ссылки на фактический объект). Ответ выше подробно описывает, как разделять объекты между процессами.
Никлас Мерч
1
Поскольку это часто бывает ошибочным: пока вы не изменяете объект, по крайней мере, в обычной настройке Linux, объект будет фактически сохранен в памяти только один раз. Он будет скопирован, как только он будет изменен. Это может быть очень важно, если вам нужно сэкономить память и не изменять объект.
Radio Controlled
16

Я хотел бы поделиться своей собственной работой, которая работает быстрее, чем dict менеджера, проще и стабильнее, чем библиотека pyshmht, которая использует тонны памяти и не работает для Mac OS. Хотя мой dict работает только для простых строк и в настоящее время неизменен. Я использую реализацию линейного зондирования и храню пары ключей и значений в отдельном блоке памяти после таблицы.

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable


class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')


def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))


def uin(a, k):
    return to_str(k) in a


def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s


def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s


def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))


def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))


def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))


if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

На моем ноутбуке результаты производительности следующие:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

простой пример использования:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')
алексей
источник
14
Github? Документация? как мы можем использовать этот инструмент?
Павлос Пантелиадис,
10

Помимо @senderle здесь, некоторым также может быть интересно, как использовать функциональность multiprocessing.Pool.

Приятно то, что .Pool()в managerэкземпляре есть метод , имитирующий все знакомые API верхнего уровня multiprocessing.

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

Вывод:

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

Это немного другой пример, где каждый процесс просто регистрирует свой идентификатор процесса в глобальном DictProxyобъекте d.

Брэд Соломон
источник
3

Возможно, вы можете попробовать pyshmht , расширение хэш-таблицы на основе памяти для Python.

Уведомление

  1. Это не полностью протестировано, просто для справки.

  2. В настоящее время в нем отсутствуют механизмы lock / sem для многопроцессорной обработки.

felix021
источник