Прерывания клавиатуры с помощью многопроцессорного пула Python

136

Как мне обработать события KeyboardInterrupt с помощью многопроцессорных пулов Python? Вот простой пример:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

При запуске кода выше, KeyboardInterruptкогда я нажимаю ^C, возникает ошибка , но в этот момент процесс просто зависает, и мне приходится убивать его извне.

Я хочу быть в состоянии нажать ^Cв любое время и заставить все процессы завершиться изящно.

Fragsworth
источник
Я решил свою проблему с помощью psutil, вы можете увидеть решение здесь: stackoverflow.com/questions/32160054/…
Tiago Albineli Motta

Ответы:

137

Это ошибка Python. При ожидании условия в threading.Condition.wait () KeyboardInterrupt никогда не отправляется. Репро:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

Исключение KeyboardInterrupt не будет доставлено до тех пор, пока wait () не вернется, и никогда не вернется, поэтому прерывание никогда не происходит. KeyboardInterrupt почти наверняка должен прервать ожидание условия.

Обратите внимание, что это не происходит, если указан тайм-аут; cond.wait (1) немедленно получит прерывание. Таким образом, обходной путь должен указать время ожидания. Для этого замените

    results = pool.map(slowly_square, range(40))

с участием

    results = pool.map_async(slowly_square, range(40)).get(9999999)

или похожие.

Гленн Мейнард
источник
3
Эта ошибка в официальном трекере Python где-нибудь? У меня проблемы с поиском, но я, вероятно, просто не использую лучшие условия поиска.
Джозеф Гарвин
18
Эта ошибка была зарегистрирована как [выпуск 8296] [1]. [1]: bugs.python.org/issue8296
Андрей Власовских,
1
Вот хак, который исправляет pool.imap () таким же образом, делая возможным Ctrl-C при итерации по imap. Перехватите исключение и вызовите pool.terminate (), и ваша программа завершит работу. gist.github.com/626518
Александр Юнгберг
6
Это не совсем исправляет вещи. Иногда я получаю ожидаемое поведение, когда нажимаю Control + C, а иногда нет. Я не уверен, почему, но похоже, что KeyboardInterrupt получен одним из процессов случайным образом, и я получаю правильное поведение, только если родительский процесс перехватывает его.
Райан К. Томпсон
6
Это не работает для меня с Python 3.6.1 на Windows. Я получаю тонны следов стека и другого мусора, когда я делаю Ctrl-C, то есть так же, как без такого обходного пути. На самом деле ни одно из решений, которые я пробовал в этой теме, похоже, не работает ...
szx
56

Из того, что я недавно обнаружил, лучшее решение - настроить рабочие процессы на полное игнорирование SIGINT и ограничить весь код очистки родительским процессом. Это устраняет проблему как для незанятых, так и для занятых рабочих процессов и не требует кода обработки ошибок в ваших дочерних процессах.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Объяснение и полный пример кода можно найти по адресу http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ и http://github.com/jreese/multiprocessing-keyboardinterrupt соответственно.

Джон Риз
источник
4
Привет, Джон. Ваше решение не выполняет то же самое, что и мое, да, к сожалению, сложное решение. Он прячется за time.sleep(10)основным процессом. Если вы должны были удалить этот спящий режим или подождать, пока процесс не попытается присоединиться к пулу, что необходимо сделать, чтобы гарантировать, что задания завершены, то вы все равно будете страдать от той же проблемы, что и основной процесс. не получил KeyboardInterrupt во время ожидания joinоперации опроса .
Bboe
В случае, когда я использовал этот код в производстве, time.sleep () был частью цикла, который проверял бы состояние каждого дочернего процесса, а затем перезапускал некоторые процессы с задержкой, если это необходимо. Вместо join (), который будет ожидать завершения всех процессов, он будет проверять их по отдельности, гарантируя, что основной процесс останется отзывчивым.
Джон Риз,
2
Таким образом, это было скорее занятое ожидание (возможно, с небольшим сном между проверками), которое запрашивало завершение процесса другим методом, а не соединением? Если это так, возможно, было бы лучше включить этот код в ваше сообщение в блоге, так как вы можете гарантировать, что все работники завершили работу, прежде чем пытаться присоединиться.
bboe
4
Это не работает Только дети посылают сигнал. Родитель никогда не получает его, поэтому pool.terminate()никогда не исполняется. Если дети игнорируют сигнал, это ничего не значит. Ответ @ Гленна решает проблему.
Cerin
1
Моя версия этого по адресу gist.github.com/admackin/003dd646e5fadee8b8d6 ; он не вызывает, .join()кроме как по прерыванию - он просто вручную проверяет результат .apply_async()использования, AsyncResult.ready()чтобы увидеть, готов ли он, что означает, что мы чисто закончили.
Энди Маккинлей
29

По некоторым причинам, только исключения, унаследованные от базового Exceptionкласса, обрабатываются нормально. В качестве обходного пути вы можете повторно поднять свой KeyboardInterruptв качестве Exceptionпримера:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Обычно вы получите следующий вывод:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Так что если вы нажмете ^C, вы получите:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Андрей Власовских
источник
2
Кажется, что это не полное решение. Если a KeyboardInterruptприбывает, когда multiprocessingвыполняет собственный обмен данными IPC, тогда try..catchактивация не будет (очевидно).
Андрей Власовских
Вы можете заменить raise KeyboardInterruptErrorна return. Вам просто нужно убедиться, что дочерний процесс завершается сразу после получения KeyboardInterrupt. Возвращаемое значение, кажется, игнорируется, в то mainже время получено KeyboardInterrupt.
Бернхард
8

Обычно это простая структура работает Ctrl- Cна бассейн:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Как было сказано в нескольких похожих постах:

Захватить прерывание клавиатуры в Python без try-кроме

КГНУ
источник
1
Это должно быть сделано и для каждого из рабочих процессов, и все равно может завершиться ошибкой, если KeyboardInterrupt вызывается во время инициализации многопроцессорной библиотеки.
MarioVilas
7

Голосованный ответ не решает основную проблему, но похожий побочный эффект.

Джесси Ноллер, автор многопроцессорной библиотеки, объясняет, как правильно обращаться с CTRL + C при использовании multiprocessing.Poolв старом сообщении в блоге .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
noxdafox
источник
Я обнаружил, что ProcessPoolExecutor также имеет ту же проблему. Единственное исправление, которое я смог найти, это позвонить os.setpgrp()изнутри будущего
portforwardpodcast
1
Конечно, единственное отличие состоит в том, ProcessPoolExecutorчто не поддерживает функции инициализатора. В Unix вы можете использовать эту forkстратегию, отключив sighandler в главном процессе перед созданием пула, а затем снова включив его. В pebbleSIGINT по умолчанию я отключаю дочерние процессы. Я не знаю причину, по которой они не делают то же самое с Python Pools. В конце пользователь может переустановить SIGINTобработчик в случае, если он / она хочет навредить себе.
noxdafox
Это решение, по-видимому, не позволяет Ctrl-C прерывать основной процесс.
Пол Прайс
1
Я только что протестировал на Python 3.5, и он работает, какую версию Python вы используете? Какая ОС?
noxdafox
5

Кажется, есть две проблемы, которые делают исключения при раздражающей многопроцессорности. Первый (отмеченный Гленном) заключается в том, что вам нужно использовать map_asyncтайм-аут вместо mapтого, чтобы получить немедленный ответ (т. Е. Не завершать обработку всего списка). Второй (отмеченный Андреем) заключается в том, что многопроцессорная обработка не захватывает исключения, которые не наследуются от Exception(например, SystemExit). Итак, вот мое решение, которое касается обоих из них:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
Пол Прайс
источник
1
Я не заметил какого-либо снижения производительности, но в моем случае functionэто довольно долгое время (сотни секунд).
Пол Прайс
Это на самом деле уже не так, по крайней мере, на мой взгляд и опыт. Если вы перехватываете исключение клавиатуры в отдельных дочерних процессах и перехватываете его еще раз в основном процессе, то вы можете продолжать использовать, mapи все хорошо. @Linux Cli Aikниже приведено решение, которое вызывает такое поведение. Использование map_asyncне всегда желательно, если основной поток зависит от результатов дочерних процессов.
Код Догго
4

Я обнаружил, что на данный момент лучшее решение - не использовать функцию multiprocessing.pool, а использовать собственную функцию пула. Я представил пример, демонстрирующий ошибку с apply_async, а также пример, показывающий, как вообще не использовать функциональность пула.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

bboe
источник
Работает как шарм. Это чистое решение, а не какой-то хак (/ мне кажется) .btw, трюк с .get (99999), как предлагают другие, сильно ухудшает производительность.
Уолтер
Я не заметил какого-либо снижения производительности при использовании тайм-аута, хотя я использовал 9999 вместо 999999. Исключением является случай, когда возникает исключение, которое не наследуется от класса Exception: тогда вам нужно подождать, пока тайм-аут не будет ударить. Решение этой проблемы - перехват всех исключений (см. Мое решение).
Пол Прайс
1

Я новичок в Python. Я искал ответ везде и наткнулся на этот и несколько других блогов и видео на YouTube. Я попытался скопировать вставить код автора выше и воспроизвести его на моем Python 2.7.13 в Windows 7 64-битной. Это близко к тому, чего я хочу достичь.

Я заставил свои дочерние процессы игнорировать ControlC и заставить родительский процесс завершаться. Похоже, что обход дочернего процесса помогает избежать этой проблемы для меня.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

Часть, начинающаяся с pool.terminate()никогда, кажется, не выполняется.

Linux Cli Aik
источник
Я только что понял это! Я искренне считаю, что это лучшее решение для такой проблемы. Принятое решение заставляет map_asyncпользователя, что мне не особенно нравится. Во многих ситуациях, как у меня, основной поток должен ждать завершения отдельных процессов. Это одна из причин, почему mapсуществует!
Код Догго
1

Вы можете попробовать использовать метод apply_async объекта Pool, например так:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Вывод:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Преимущество этого метода в том, что результаты, обработанные до прерывания, будут возвращены в словарь результатов:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
bparker856
источник
Славный и полный пример
eMTy
-5

Как ни странно, похоже, что вы должны обращаться KeyboardInterruptс детьми. Я бы ожидал, что это будет работать как написано ... попробуйте изменить slowly_squareна:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Это должно работать, как вы ожидали.

D.Shawley
источник
1
Я попробовал это, и это фактически не завершает весь набор заданий. Он завершает текущие задания, но сценарий по-прежнему назначает оставшиеся задания в вызове pool.map, как будто все в порядке.
Fragsworth
это нормально, но вы можете потерять отслеживание возникающих ошибок. возврат ошибки с помощью трассировки стека может сработать, поэтому родительский процесс может определить, что произошла ошибка, но он все равно не завершается сразу же, когда возникает ошибка.
Мехтунгух