Тайм-аут на вызов функции

300

Я вызываю функцию в Python, которая, я знаю, может остановить и заставить меня перезапустить скрипт.

Как мне вызвать функцию или как ее обернуть, чтобы, если это заняло более 5 секунд, скрипт отменил ее и сделал что-то еще?

Teifion
источник

Ответы:

227

Вы можете использовать пакет сигналов , если вы работаете в UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

Через 10 секунд после вызова вызывается alarm.alarm(10)обработчик. Это вызывает исключение, которое вы можете перехватить из обычного кода Python.

Этот модуль плохо работает с потоками (но тогда, кто делает?)

Обратите внимание, что, поскольку мы генерируем исключение, когда происходит тайм-аут, оно может в конечном итоге быть пойманным и проигнорировано внутри функции, например, одной такой функции:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue
Пиро
источник
5
Я использую Python 2.5.4. Возникает такая ошибка: Traceback (последний вызов был последним): файл "aa.py", строка 85, в func signal.signal (signal.SIGALRM, handler) AttributeError: у объекта 'module' нет атрибута 'SIGALRM'
flypen
11
@flypen потому что signal.alarmи связанные с ними SIGALRMнедоступны на платформах Windows.
Двойной AA
2
Если процессов много, и каждый из них вызывает signal.signal--- все ли они будут работать правильно? Разве каждый signal.signalзвонок не отменяет «одновременный»?
броуновский
1
Предупреждение для тех, кто хочет использовать это с расширением C: обработчик сигнала Python не будет вызываться, пока функция C не вернет управление интерпретатору Python. Для этого варианта использования используйте ответ ATOzTOA: stackoverflow.com/a/14924210/1286628
wkschwartz
13
Я второе предупреждение о потоках. signal.alarm работает только в основном потоке. Я попытался использовать это в представлениях Django - немедленный сбой со словоборством только о главном потоке.
JL Peyret
154

Вы можете использовать, multiprocessing.Processчтобы сделать именно это.

Код

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
ATOzTOA
источник
36
Как я могу получить возвращаемое значение целевого метода?
bad_keypoints
4
Это не работает, если вызываемая функция застревает в блоке ввода-вывода.
Судо
4
@bad_keypoints Смотрите этот ответ: stackoverflow.com/a/10415215/1384471 По сути, вы передаете список, в который вы помещаете ответ.
Питер
1
@sudo затем удалите join(). это заставляет ваше число одновременных подпроцессов работать до тех пор, пока они не завершат свою работу, или количество, указанное в join(10). В случае, если у вас есть блокирующий ввод / вывод для 10 процессов, используя join (10), вы установили их так, чтобы они ждали каждого из них максимум 10, пока КАЖДЫЙ процесс не запустился. Используйте флаг демона, как в этом примере stackoverflow.com/a/27420072/2480481 . Конечно, вы можете передать флаг daemon=Trueнепосредственно в multiprocessing.Process()функцию.
m3nda
2
@ATOzTOA проблема с этим решением, по крайней мере для моих целей, заключается в том, что оно потенциально не позволяет детям чистить протекторы после себя. Из документации о функции завершенияterminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned.
abalcerek
78

Как мне вызвать функцию или как ее обернуть, чтобы, если это займет больше 5 секунд, скрипт отменит ее?

Я опубликовал суть, которая решает этот вопрос / проблему с декоратором и threading.Timer. Вот это с разбивкой.

Импорт и настройки для совместимости

Он был протестирован с Python 2 и 3. Он также должен работать под Unix / Linux и Windows.

Сначала импорт. Эти попытки сохранить код непротиворечивым независимо от версии Python:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Используйте независимый от версии код:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Теперь мы импортировали нашу функциональность из стандартной библиотеки.

exit_after декоратор

Далее нам нужна функция для завершения main()дочернего потока:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

А вот и сам декоратор:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

использование

И вот использование, которое прямо отвечает на ваш вопрос о выходе через 5 секунд !:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Демо-версия:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

Второй вызов функции не завершится, вместо этого процесс должен завершиться с отслеживанием!

KeyboardInterrupt не всегда останавливает спящую нить

Обратите внимание, что сон не всегда прерывается прерыванием клавиатуры, на Python 2 в Windows, например:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

и при этом он не может прерывать код, выполняемый в расширениях, если он явно не проверяется PyErr_CheckSignals(), см. Cython, Python и KeyboardInterrupt игнорируются

В любом случае, я бы не спал нить больше секунды - это время процессора.

Как мне вызвать функцию или как ее обернуть, чтобы, если это заняло более 5 секунд, скрипт отменил ее и сделал что-то еще?

Чтобы поймать это и сделать что-то еще, вы можете поймать KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Аарон Холл
источник
Я еще не прочитал весь ваш пост, но мне просто стало интересно: а что, если flush равен 0? Это будет интерпретировано как Ложь в нижеследующем утверждении if, верно?
Koenraad van Duin
2
Почему мне нужно позвонить thread.interrupt_main(), почему я не могу напрямую вызвать исключение?
Анирбан Наг 'tintinmj'
Есть мысли об упаковке multiprocessing.connection.Clientс этим? - Пытаюсь решить: stackoverflow.com/questions/57817955/…
Второй мировой войны
51

У меня есть другое предложение, которое является чистой функцией (с тем же API, что и предложение по созданию потоков) и, кажется, работает нормально (основываясь на предложениях в этой теме)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
Alex
источник
3
Вам также следует восстановить оригинальный обработчик сигнала. См. Stackoverflow.com/questions/492519/…
Мартин Конечни
9
Еще одно замечание: метод сигналов Unix работает только в том случае, если вы применяете его в основном потоке. Применение этого в подпотоке выдает исключение и не будет работать.
Мартин Конечни
12
Это не лучшее решение, потому что оно работает только на Linux.
максимум
17
Макс, не правда - работает на любом POSIX-совместимом Unix. Я думаю, что ваш комментарий должен быть более точным, не работает на Windows.
Крис Джонсон
6
Вы должны избегать установки kwargs на пустой диктовку. Обычная ошибка Python в том, что аргументы по умолчанию для функций являются изменяемыми. Таким образом, этот словарь будет общим для всех вызовов timeout. Гораздо лучше установить значение по умолчанию Noneи в первой строке функции добавить kwargs = kwargs or {}. С Args все в порядке, потому что кортежи не изменяемы.
scottmrogowski
32

Я наткнулся на эту ветку при поиске таймаута в модульных тестах. Я не нашел ничего простого в ответах или сторонних пакетах, поэтому я написал декоратор ниже, вы можете перейти прямо в код:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Тогда это так же просто, как время ожидания теста или любой функции, которая вам нравится:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
Богатый
источник
14
Будьте осторожны, так как это не завершает функцию после истечения времени ожидания!
Сильвен
Обратите внимание, что в Windows это порождает совершенно новый процесс, который значительно сокращает время ожидания, возможно, очень много, если для установки зависимостей требуется много времени.
Аарон Холл
1
Да, это нуждается в некоторой настройке. Это оставляет потоки, идущие навсегда.
Судо
2
IDK, если это лучший способ, но вы можете попробовать / перехватить Exceptionвнутри func_wrapper и сделать pool.close()после перехвата, чтобы убедиться, что поток всегда умирает, несмотря ни на что. Затем вы можете бросить TimeoutErrorили все, что вы хотите после. Кажется, работает на меня.
Судо
2
Это полезно, но как только я сделал это много раз, я получаю RuntimeError: can't start new thread. Будет ли это работать, если я проигнорирую это или есть что-то еще, что я могу сделать, чтобы обойти это? Заранее спасибо!
Бенджи
20

stopitПакет, найденный на PyPI, кажется обрабатывать таймаут хорошо.

Мне нравится @stopit.threading_timeoutableдекоратор, который добавляет timeoutпараметр к оформленной функции, которая делает то, что вы ожидаете, она останавливает функцию.

Проверьте это на pypi: https://pypi.python.org/pypi/stopit

Egeland
источник
1
Это очень удобно и поточно-ориентировано! Спасибо и Плюс один! Это лучший вариант, который я нашел, и даже лучше, чем принятый ответ !!
Яхья
Библиотека утверждает, что некоторые функции не работают в Windows.
Стефан Симик
16

Есть много предложений, но ни одно из них не использует concurrent.futures, что, я думаю, является наиболее разборчивым способом справиться с этим.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Супер просто читать и поддерживать.

Мы создаем пул, отправляем один процесс и затем ждем до 5 секунд, прежде чем вызвать TimeoutError, которую вы можете перехватить и обработать так, как вам нужно.

Родной для python 3.2+ и перенесенной в 2.7 (pip install futures).

Переключение между потоками и процессами так же просто, как замена ProcessPoolExecutorнаThreadPoolExecutor .

Если вы хотите прекратить процесс по тайм-ауту, я бы посоветовал заглянуть в Pebble .

Брайан
источник
2
Что означает «Предупреждение: это не завершает функцию, если тайм-аут»?
Скотт Стаффорд
5
@ScottStafford Процессы / потоки не заканчиваются только потому, что была вызвана ошибка TimeoutError. Таким образом, процесс или поток все равно будут пытаться завершиться и не будут автоматически возвращать вам контроль по истечении времени ожидания.
Брайан
Позволит ли это мне сохранить какие-либо промежуточные результаты в то время? например, если у меня есть рекурсивная функция, для которой я установил тайм-аут на 5, и за это время у меня есть частичные результаты, как мне написать функцию, которая будет возвращать частичные результаты по тайм-ауту?
SumNeuron
Я использую именно это, однако у меня есть 1000 заданий, каждое разрешено за 5 секунд до истечения времени ожидания. Моя проблема в том, что ядра засоряются в задачах, которые никогда не заканчиваются, потому что тайм-аут применяется только к общему количеству задач, а не к отдельным задачам. concurrent.futures не предоставляет решение этой проблемы.
Бастиан
12

Отличный, простой в использовании и надежный тайм-декоратор проекта PyPi ( https://pypi.org/project/timeout-decorator/ )

установка :

pip install timeout-decorator

Использование :

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
Гил
источник
2
Я ценю четкое решение. Но кто-нибудь может объяснить, как работает эта библиотека, особенно когда речь идет о многопоточности. Лично я боюсь использовать неизвестный механизм для обработки потоков или сигналов.
wsysuper
@wsysuper В библиотеке есть 2 режима работы: открыть новый поток или новый подпроцесс (который предполагается потокобезопасным)
Gil
это сработало очень хорошо для меня!
Флориан Хейгл
6

Я являюсь автором wrapt_timeout_decorator

На первый взгляд, большинство решений, представленных здесь, прекрасно работают под Linux - потому что у нас есть fork () и signal (), но в Windows все выглядит немного иначе. А когда речь идет о подпотоках в Linux, вы больше не можете использовать сигналы.

Чтобы порождать процесс под Windows, он должен быть выбираемым - а многие декорированные функции или методы класса - нет.

Таким образом, вам нужно использовать лучший инструмент для выбора, такой как укроп и многопроцессный режим (а не процесс рассола и многопроцессорный режим) - вот почему вы не можете использовать ProcessPoolExecutor (или только с ограниченной функциональностью).

Для самого тайм-аута - вам нужно определить, что означает тайм-аут - потому что в Windows потребуется значительное (и не определяемое) время, чтобы запустить процесс. Это может быть сложно на коротких таймаутах. Предположим, порождение процесса занимает около 0,5 секунды (легко !!!). Если вы даете тайм-аут 0,2 секунды, что должно произойти? Должна ли функция отключаться через 0,5 + 0,2 секунды (так что метод должен работать в течение 0,2 секунды)? Или время вызова вызываемого процесса истечет через 0,2 секунды (в этом случае декорированная функция ВСЕГДА будет иметь тайм-аут, потому что в это время она даже не порождается)?

Также вложенные декораторы могут быть неприятными, и вы не можете использовать сигналы в подпотоке. Если вы хотите создать действительно универсальный, кроссплатформенный декоратор, все это необходимо принять во внимание (и протестировать).

Другие проблемы - передача исключений обратно вызывающей стороне, а также проблемы с ведением журнала (если используется в оформленной функции - запись в файлы в другом процессе НЕ поддерживается)

Я попытался охватить все крайние случаи. Вы можете заглянуть в пакет wrapt_timeout_decorator или, по крайней мере, протестировать свои собственные решения, вдохновленные используемыми там тестами юнитов.

@Alexis Eggermont - к сожалению, у меня нет достаточно комментариев, чтобы прокомментировать - может быть, кто-то еще может уведомить вас - я думаю, что я решил вашу проблему с импортом.

bitranox
источник
3

timeout-decorator не работает в системе Windows, так как Windows не поддерживает signal хорошо.

Если вы используете timeout-decorator в системе Windows, вы получите следующее

AttributeError: module 'signal' has no attribute 'SIGALRM'

Некоторые предложили использовать use_signals=False но у меня не получалось.

Автор @bitranox создал следующий пакет:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Пример кода:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Дает следующее исключение:

TimeoutError: Function mytest timed out after 5 seconds
как будто
источник
Это звучит как очень хорошее решение. Как ни странно, линия, from wrapt_timeout_decorator import * кажется, убивает некоторые из моих других импортов. Например, я получаю ModuleNotFoundError: No module named 'google.appengine', но не получаю эту ошибку, если не импортирую wrapt_timeout_decorator
Алексис Эггермонт
@AlexisEggermont Я собирался использовать это с appengine ... поэтому мне очень любопытно, если эта ошибка сохранилась?
PascalVKooten
2

Мы можем использовать сигналы для того же. Я думаю, что приведенный ниже пример будет полезен для вас. Это очень просто по сравнению с потоками.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
Арканзас
источник
1
Было бы лучше выбрать конкретное исключение и поймать только его. Голые try: ... except: ...всегда плохая идея.
Привет
Я согласен с тобой привет.
AR
Хотя я понимаю причину, как системный администратор / интегратор, я не согласен - код на python печально известен тем, что пренебрегает обработкой ошибок, а обработка одной вещи, которую вы ожидаете, недостаточно хороша для качественного программного обеспечения. Вы можете справиться с 5 вещами, которые вы планируете И общей стратегией для других вещей. «Traceback, None» - это не стратегия, это оскорбление.
Флориан Хейгл
2
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
Хэл Кэнэри
источник
7
В то время как этот код может ответить на вопрос, предоставление дополнительного контекста относительно того, как и / или почему он решает проблему, улучшит долгосрочную ценность ответа
Дэн Корнилеску,
1

У меня была потребность в нестабильных прерываниях по времени (что SIGALARM не может сделать), которые не будут блокироваться time.sleep (что не может сделать подход, основанный на потоках). В итоге я скопировал и слегка изменил код здесь: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

Сам код:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

и пример использования:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'
Джеймс
источник
Это также использует сигнал, следовательно, не будет работать, если вызывается из потока.
garg10may
0

Вот небольшое улучшение данного решения на основе потоков.

Код ниже поддерживает исключения :

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Вызов с 5-секундным таймаутом:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
diemacht
источник
1
Это вызовет новое исключение, скрывающее исходную трассировку. Смотрите мою версию ниже ...
Meitham
1
Это также небезопасно, как если бы внутри runFunctionCatchExceptions()определенных функций Python вызывался GIL. Например , следующее никогда, или очень долго, если возвращение вызывается в функции: eval(2**9999999999**9999999999). См. Stackoverflow.com/questions/22138190/…
Микко Охтамаа,