Мертвый простой пример использования многопроцессорной очереди, пула и блокировки

92

Я попытался прочитать документацию на http://docs.python.org/dev/library/multiprocessing.html, но все еще борюсь с многопроцессорной обработкой Queue, Pool и Locking. А пока мне удалось построить пример ниже.

Что касается очереди и пула, я не уверен, правильно ли я понял эту концепцию, поэтому поправьте меня, если я ошибаюсь. Я пытаюсь обработать 2 запроса за раз (в этом примере в списке данных 8), так что мне следует использовать? Пул для создания 2 процессов, которые могут обрабатывать две разные очереди (максимум 2), или я должен просто использовать Queue для обработки 2 входов каждый раз? Блокировка заключалась бы в правильной печати выходных данных.

import multiprocessing
import time

data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
        ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)


def mp_handler(var1):
    for indata in var1:
        p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
        p.start()


def mp_worker(inputs, the_time):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

if __name__ == '__main__':
    mp_handler(data)
thclpr
источник

Ответы:

130

Лучшее решение вашей проблемы - использовать Pool. Использование Queues и наличие отдельной функции "подачи очереди", вероятно, излишне.

Вот немного измененная версия вашей программы, на этот раз только с двумя процессами, объединенными в файл Pool. Я считаю, что это самый простой способ с минимальными изменениями исходного кода:

import multiprocessing
import time

data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(2)
    p.map(mp_worker, data)

if __name__ == '__main__':
    mp_handler()

Обратите внимание, что mp_worker()функция теперь принимает один аргумент (кортеж из двух предыдущих аргументов), потому что map()функция разбивает ваши входные данные на подсписки, каждый подсписок передается как единственный аргумент вашей рабочей функции.

Выход:

Processs a  Waiting 2 seconds
Processs b  Waiting 4 seconds
Process a   DONE
Processs c  Waiting 6 seconds
Process b   DONE
Processs d  Waiting 8 seconds
Process c   DONE
Processs e  Waiting 1 seconds
Process e   DONE
Processs f  Waiting 3 seconds
Process d   DONE
Processs g  Waiting 5 seconds
Process f   DONE
Processs h  Waiting 7 seconds
Process g   DONE
Process h   DONE

Отредактируйте в соответствии с комментарием @Thales ниже:

Если вам нужна «блокировка для каждого лимита пула», чтобы ваши процессы выполнялись в тандемных парах, ala:

Ожидание B ожидание | Готово, Б сделано | C ожидания, D ожидания | C сделано, D сделано | ...

затем измените функцию обработчика для запуска пулов (из 2 процессов) для каждой пары данных:

def mp_handler():
    subdata = zip(data[0::2], data[1::2])
    for task1, task2 in subdata:
        p = multiprocessing.Pool(2)
        p.map(mp_worker, (task1, task2))

Теперь ваш результат:

 Processs a Waiting 2 seconds
 Processs b Waiting 4 seconds
 Process a  DONE
 Process b  DONE
 Processs c Waiting 6 seconds
 Processs d Waiting 8 seconds
 Process c  DONE
 Process d  DONE
 Processs e Waiting 1 seconds
 Processs f Waiting 3 seconds
 Process e  DONE
 Process f  DONE
 Processs g Waiting 5 seconds
 Processs h Waiting 7 seconds
 Process g  DONE
 Process h  DONE
Велимир Млакер
источник
Спасибо за простой и прямой пример того, как это сделать, но как я могу применить блокировку для каждого лимита пула? Я имею в виду, если вы выполните код, я бы хотел увидеть что-то вроде «A ожидает B ожидает | A готово, b готово | C ждет, D ожидает | C готово, D готово»
thclpr 02
2
Другими словами, вы не хотите, чтобы C запускался, пока не будут выполнены и A, и B?
Велимир Млакер
Точно, я могу сделать это с помощью multiprocessing.Process, но я не могу понять, как это сделать с помощью пула
thclpr 02
Большое спасибо, работайте по назначению, но в функции mp_handler вы ссылаетесь на данные переменных вместо var1 :)
thclpr 02
Хорошо, спасибо, я удалил var1полностью, dataвместо этого ссылаясь на global .
Велимир Млакер
8

Это может быть не на 100% связано с вопросом, но в моем поиске примера использования многопроцессорной обработки с очередью это сначала появляется в Google.

Это базовый пример класса, который вы можете создавать и помещать элементы в очередь, а также можете ждать, пока очередь не будет завершена. Это все, что мне нужно.

from multiprocessing import JoinableQueue
from multiprocessing.context import Process


class Renderer:
    queue = None

    def __init__(self, nb_workers=2):
        self.queue = JoinableQueue()
        self.processes = [Process(target=self.upload) for i in range(nb_workers)]
        for p in self.processes:
            p.start()

    def render(self, item):
        self.queue.put(item)

    def upload(self):
        while True:
            item = self.queue.get()
            if item is None:
                break

            # process your item here

            self.queue.task_done()

    def terminate(self):
        """ wait until queue is empty and terminate processes """
        self.queue.join()
        for p in self.processes:
            p.terminate()

r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()
Linqu
источник
2
Что такое item1и item2? Это какая-то задача или функции, которые будут выполняться в двух разных процессах?
Zelphir Kaltstahl
2
да, это задачи или входной параметр, которые обрабатываются параллельно.
linqu
8

Вот моя личная инструкция по этой теме:

Суть здесь (запросы на включение приветствуются!): Https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

import multiprocessing
import sys

THREADS = 3

# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()


def func_worker(args):
    """This function will be called by each thread.
    This function can not be a class method.
    """
    # Expand list of args into named args.
    str1, str2 = args
    del args

    # Work
    # ...



    # Serial-only Portion
    GLOBALLOCK.acquire()
    print(str1)
    print(str2)
    GLOBALLOCK.release()


def main(argp=None):
    """Multiprocessing Spawn Example
    """
    # Create the number of threads you want
    pool = multiprocessing.Pool(THREADS)

    # Define two jobs, each with two args.
    func_args = [
        ('Hello', 'World',), 
        ('Goodbye', 'World',), 
    ]


    try:
        # Spawn up to 9999999 jobs, I think this is the maximum possible.
        # I do not know what happens if you exceed this.
        pool.map_async(func_worker, func_args).get(9999999)
    except KeyboardInterrupt:
        # Allow ^C to interrupt from any thread.
        sys.stdout.write('\033[0m')
        sys.stdout.write('User Interupt\n')
    pool.close()

if __name__ == '__main__':
    main()
ТорСаммонер
источник
1
Я не совсем уверен, что .map_async () лучше, чем .map () в любом случае.
ThorSummoner
3
Аргументом для get()является тайм-аут, он не имеет ничего общего с количеством запущенных заданий.
mata
@mata, значит, это должно использоваться в цикле опроса? .get(timeout=1)? и можно ли просто сказать, .get()чтобы получить полный список?
ThorSummoner
Да, .get()ожидает бесконечно, пока не будут доступны все результаты, и возвращает список результатов. Вы можете использовать цикл опроса для проверки доступности результатов погоды или передать в map_async()вызове функцию обратного вызова, которая затем будет вызываться для каждого результата, когда он станет доступным.
mata
2

Для всех, кто использует такие редакторы, как Komodo Edit (win10), добавьте sys.stdout.flush()в:

def mp_worker((inputs, the_time)):
    print " Process %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs
    sys.stdout.flush()

или как первая строка:

    if __name__ == '__main__':
       sys.stdout.flush()

Это помогает увидеть, что происходит во время выполнения сценария; вместо того, чтобы смотреть на черное поле командной строки.

ZF007
источник
1

Вот пример из моего кода (для пула потоков, но просто измените имя класса, и у вас будет пул процессов):

def execute_run(rp): 
   ... do something 

pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
    for en in TESTED_ENERGIES:
        for ecut in TESTED_E_CUT:
            rp = RunParams(
                simulations, DEST_DIR,
                PARTICLE, mat, 960, 0.125, ecut, en
            )
            pool.submit(execute_run, rp)
pool.join()

В принципе:

  • pool = ThreadPoolExecutor(6) создает пул на 6 потоков
  • Затем у вас есть несколько for, которые добавляют задачи в пул
  • pool.submit(execute_run, rp) добавляет задачу в пул, первый аргумент - это функция, вызываемая в потоке / процессе, остальные аргументы передаются вызываемой функции.
  • pool.join ждет, пока все задачи не будут выполнены.
jb.
источник
2
Обратите внимание, что вы используете concurrent.futures, но OP спрашивает о multiprocessingPython 2.7.
Тим Питерс