Поймать исключение потока в потоке вызывающего в Python

209

Я очень плохо знаком с Python и многопоточным программированием в целом. По сути, у меня есть скрипт, который будет копировать файлы в другое место. Я хотел бы, чтобы это было помещено в другой поток, чтобы я мог выводить, ....чтобы указать, что скрипт все еще работает.

Проблема, с которой я сталкиваюсь, заключается в том, что если файлы не могут быть скопированы, это вызовет исключение. Это нормально, если работает в основном потоке; однако следующий код не работает:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

В самом потоке я попытался повторно сгенерировать исключение, но оно не работает. Я видел, как люди здесь задают похожие вопросы, но все они, кажется, делают что-то более конкретное, чем то, что я пытаюсь сделать (и я не совсем понимаю предлагаемые решения). Я видел, как люди упоминают об использовании sys.exc_info(), но я не знаю, где и как его использовать.

Вся помощь очень ценится!

РЕДАКТИРОВАТЬ: код для класса потока ниже:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise
Phanto
источник
Можете ли вы дать больше понимания того, что происходит внутри TheThread? Пример кода возможно?
Ятанизм
Конечно. Я отредактирую свой ответ выше, чтобы включить некоторые детали.
Phanto
1
Рассматривали ли вы его переключение таким образом, чтобы основной поток выполнял все, а индикатор прогресса находился в порожденном потоке?
Дан Хед
1
Дэн Хэд, вы имеете в виду, что основной поток сначала вызывает функцию «...», а затем запускает функцию копирования? Это может сработать и избежать проблемы исключений. Но я все еще хотел бы научиться правильно нить в Python.
Phanto

Ответы:

115

Проблема в том, что thread_obj.start()возвращается немедленно. Созданный вами дочерний поток выполняется в своем собственном контексте со своим собственным стеком. Любое исключение возникает в контексте дочернего потока и находится в его собственном стеке. Прямо сейчас я могу подумать о том, чтобы передать эту информацию родительскому потоку, используя своего рода передачу сообщений, так что вы можете посмотреть на это.

Попробуйте это для размера:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()
Санта
источник
5
Почему бы не присоединиться к потоку вместо этого уродливого цикла while? См. multiprocessingЭквивалент: gist.github.com/2311116
schlamar
1
Почему бы не использовать шаблон EventHook stackoverflow.com/questions/1092531/event-system-in-python/… на основе ответа @Lasse? Вместо того, чтобы делать петли?
Андре Мирас
1
Очередь - не лучший способ сообщить об ошибке, если вы не хотите иметь полную очередь из них. Гораздо лучшая конструкция - многопоточность.
Событие
1
Это кажется небезопасным для меня. Что происходит, когда поток вызывает исключение сразу после bucket.get()повышения Queue.Empty? Тогда поток join(0.1)завершится isAlive() is Falseи вы пропустите исключение.
Стив
1
Queueв этом простом случае не требуется - вы можете просто сохранить информацию об исключении в качестве свойства, ExcThreadесли только вы убедитесь, что оно run()завершается сразу после исключения (что и происходит в этом простом примере). Затем вы просто повторно поднимаете исключение после (или во время) t.join(). Нет проблем с синхронизацией, потому join()что убедитесь, что поток завершен. Смотрите ответ Рока Стрниша ниже stackoverflow.com/a/12223550/126362
ejm
42

concurrent.futuresМодуль позволяет легко сделать работу в отдельных потоках (или процессов) и обрабатывать любые результирующие исключения:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futuresвходит в состав Python 3.2 и доступен в качестве futuresмодуля обратного переноса для более ранних версий.

Джон-Эрик
источник
5
Хотя это не делает в точности то, что просил ОП, это именно тот намек, который мне нужен. Спасибо.
Безумный физик
2
Кроме того concurrent.futures.as_completed, вы можете сразу же получать уведомления при
возникновении
1
Этот код блокирует основной поток. Как вы делаете это асинхронно?
Николай Шиндаров
40

На этот вопрос есть много действительно сложных ответов. Я упрощаю это, потому что мне кажется, что этого достаточно для большинства вещей.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

Если вы уверены, что когда-либо будете работать только на одной или другой версии Python, вы можете сократить run()метод до версии с искажениями (если вы будете работать только на версиях Python до 3), или просто чистая версия (если вы будете работать только на версиях Python, начинающихся с 3).

Пример использования:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

И вы увидите исключение, возникшее в другом потоке, когда вы присоединитесь.

Если вы используете sixили используете только Python 3, вы можете улучшить информацию трассировки стека, полученную при повторном возникновении исключения. Вместо стека в точке соединения вы можете заключить внутреннее исключение в новое внешнее исключение и получить обе трассировки стека с помощью

six.raise_from(RuntimeError('Exception in thread'),self.exc)

или

raise RuntimeError('Exception in thread') from self.exc
ArtOfWarfare
источник
1
Я не уверен, почему этот ответ не так популярен. Здесь есть и другие, которые тоже занимаются простым распространением, но требуют расширения класса и переопределения. Этот просто делает то, что многие ожидают, и требует только перехода с Thread на ProagatingThread. И 4 пробела, так что мое копирование / вставка было тривиальным :-) ... единственное улучшение, которое я бы предложил, это использование six.raise_from (), чтобы вы получили хороший вложенный набор трассировок стека, а не только стек для сайт ререйза.
aggieNick02
Большое спасибо. Очень простое решение.
Сонулохани
Моя проблема в том, что у меня есть несколько дочерних потоков. Объединения выполняются в последовательности, и исключение может быть вызвано из последующих присоединяемых потоков. Есть ли простое решение моей проблемы? запустить объединение одновременно?
Чуан
Спасибо, это работает отлично! Не уверен, почему это не обрабатывается напрямую python tho ...
GG.
Это определенно самый полезный ответ, это решение гораздо более общее, чем другие, но простое. Будет использовать его в проекте!
Константин Секереш
30

Хотя невозможно напрямую перехватить исключение, созданное в другом потоке, вот код, который позволяет совершенно прозрачно получить что-то очень близкое к этой функциональности. Ваш дочерний поток должен наследовать ExThreadкласс вместо, threading.Threadа родительский поток должен вызывать child_thread.join_with_exception()метод вместо того, чтобы child_thread.join()ждать, пока поток завершит свою работу.

Технические подробности этой реализации: когда дочерний поток генерирует исключение, он передается родительскому элементу через a Queueи снова генерируется в родительском потоке. Обратите внимание, что в этом подходе нет ожидания.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()
оборота Матеуш Кобос
источник
1
Разве вы не хотите поймать BaseException, не так Exceptionли? Все, что вы делаете, это распространяете исключение из одного Threadв другое. Прямо сейчас IE, a KeyboardInterrupt, будет молча игнорироваться, если он будет вызван в фоновом потоке.
ArtOfWarfare
join_with_exceptionзависает на неопределенный срок, если вызывается второй раз на мертвой ветке. Исправление: github.com/fraserharris/threading-extensions/blob/master/…
Фрейзер Харрис
Я не думаю, что Queueэто необходимо; см. мой комментарий к ответу @ Санты. Вы можете упростить его до чего-то вроде ответа Рока Стрниши ниже stackoverflow.com/a/12223550/126362
ejm
22

Если в потоке возникает исключение, лучший способ - повторно вызвать его в потоке вызывающего во время join. Вы можете получить информацию об исключении, которое обрабатывается в данный момент, используя sys.exc_info()функцию. Эта информация может быть просто сохранена как свойство объекта потока, пока не joinбудет вызвана, после чего она может быть повторно вызвана.

Обратите внимание, что a Queue.Queue(как предлагается в других ответах) не является необходимым в этом простом случае, когда поток выбрасывает не более 1 исключения и завершается сразу после создания исключения . Мы избегаем условий гонки, просто ожидая завершения потока.

Например, расширение ExcThread(ниже), переопределение excRun(вместо run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

Форма с тремя аргументами для raiseотсутствует в Python 3, поэтому измените последнюю строку на:

raise new_exc.with_traceback(self.exc[2])
Рок Стрниша
источник
2
Почему вы используете threading.Thread.join (self) вместо super (ExcThread, self) .join ()?
Ричард Мён,
9

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

Следующее решение:

  • немедленно возвращается к основному потоку при вызове исключения
  • не требует дополнительных пользовательских классов, потому что это не нужно:
    • явный Queue
    • добавить что-то еще, кроме вашего рабочего потока

Источник:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

Возможный вывод:

0
0
1
1
2
2
0
Exception()
1
2
None

К сожалению, невозможно уничтожить фьючерсы, чтобы отменить другие, так как один не получается:

Если вы делаете что-то вроде:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

затем withловит его и ждет окончания второго потока, прежде чем продолжить. Следующее ведет себя аналогично:

for future in concurrent.futures.as_completed(futures):
    future.result()

так как future.result()повторно вызывает исключение, если оно произошло.

Если вы хотите выйти из всего процесса Python, вы можете сойти с рук os._exit(0) , но это, вероятно, означает, что вам нужен рефакторинг.

Пользовательский класс с идеальной семантикой исключений

Я закончил программировать идеальный интерфейс для себя на: Правильный способ ограничить максимальное количество одновременно работающих потоков? раздел «Пример очереди с обработкой ошибок». Этот класс призван быть как удобным, так и дать вам полный контроль над отправкой и обработкой результатов / ошибок.

Протестировано на Python 3.6.7, Ubuntu 18.04.

оборота Сиро Сантилли 新疆 改造 中心 996ICU 六四 事件
источник
4

Это была неприятная маленькая проблема, и я хотел бы добавить свое решение. Некоторые другие решения, которые я нашел (например, async.io), выглядели многообещающе, но также представляли собой черную коробку. Подход с использованием очереди / события связывает вас с определенной реализацией. Параллельный исходный код фьючерса, однако, составляет всего около 1000 строк, и его легко понять . Это позволило мне легко решить мою проблему: создавать рабочие потоки ad-hoc без особых настроек и иметь возможность перехватывать исключения в основном потоке.

Мое решение использует API параллельных фьючерсов и API потоков. Это позволяет вам создать работника, который дает вам как поток, так и будущее. Таким образом, вы можете присоединиться к потоку, чтобы дождаться результата:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

... или вы можете позволить работнику просто отправить обратный вызов, когда закончите:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

... или вы можете выполнить цикл, пока событие не завершится:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

Вот код:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

... и тестовая функция:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')
оборота Calvin Froedge
источник
2

Как новичок в Threading, мне потребовалось много времени, чтобы понять, как реализовать код Матеуша Кобоса (см. Выше). Вот уточненная версия, чтобы помочь понять, как ее использовать.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException
BurningKrome
источник
2

Подобным образом, как у RickardSjogren без Queue, sys и т. Д., Но также без некоторых прослушивателей сигналов: выполняется непосредственно обработчик исключений, который соответствует блоку исключения.

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

Только self._callback и блок исключений в run () являются дополнительными к нормальной работе с потоками. Тема.

Куриная метка
источник
2

Я знаю, что немного опоздал на вечеринку здесь, но у меня была очень похожая проблема, но она включала использование tkinter в качестве графического интерфейса, и основной цикл лишил возможности использовать любое из решений, которые зависят от .join (). Поэтому я адаптировал решение, данное в РЕДАКТИРЕ исходного вопроса, но сделал его более общим, чтобы его было легче понять другим.

Вот новый класс потока в действии:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

Конечно, вы всегда можете обработать исключение другим способом, например, распечатать его или вывести на консоль.

Это позволяет вам использовать класс ExceptionThread точно так же, как и класс Thread, без каких-либо специальных изменений.

Firo
источник
1

Один метод, который мне нравится, основан на модели наблюдателя . Я определяю класс сигнала, который мой поток использует для выдачи исключений слушателям. Он также может быть использован для возврата значений из потоков. Пример:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

У меня недостаточно опыта работы с потоками, чтобы утверждать, что это абсолютно безопасный метод. Но это сработало для меня, и мне нравится гибкость.

RickardSjogren
источник
вы отключаетесь после join ()?
Ealeon
Я не знаю, но я думаю, это была бы хорошая идея, чтобы у вас не было ссылок на неиспользуемые вещи
RickardSjogren
я заметил, что «handle_exception» все еще является частью дочернего потока. надо как-то передать это ветке вызывающей стороны
ealeon
1

Использование обнаженных исключений не является хорошей практикой, потому что вы обычно ловите больше, чем ожидаете.

Я хотел бы предложить изменить exceptТОЛЬКО исключение, которое вы хотели бы обработать. Я не думаю, что его повышение имеет желаемый эффект, потому что когда вы отправляетесь для создания экземпляра TheThreadво внешнем try, если оно вызывает исключение, назначение никогда не произойдет.

Вместо этого вы можете просто предупредить об этом и двигаться дальше, например:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

Затем, когда это исключение будет обнаружено, вы можете обработать его там. Затем, когда внешнее приложение tryловит исключение TheThread, вы знаете, что оно не будет тем, с которым вы уже работали, и поможет вам изолировать поток процесса.

jathanism
источник
1
Хорошо, если в этом потоке вообще есть ошибка, я хочу, чтобы полная программа информировала пользователя о том, что была проблема, и изящно завершила. По этой причине я хочу, чтобы основной поток перехватывал и обрабатывал все исключения. Тем не менее, проблема все еще существует, когда, если TheThread выдает исключение, основная попытка потока / исключение все равно не поймает его. Я мог бы сделать так, чтобы поток обнаружил исключение и возвратил ложное значение, указывающее, что операция была неудачной. Это дало бы тот же желаемый результат, но я все же хотел бы знать, как правильно перехватить исключение из подпотока.
Phanto
1

Простой способ перехвата исключения потока и обратной связи с методом вызывающего может быть путем передачи словаря или списка workerметоду.

Пример (передача словаря в рабочий метод):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])
оборота радо стоянов
источник
1

Wrap Thread с хранилищем исключений.

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()
ahuigo
источник
0

pygolang предоставляет sync.WorkGroup, который, в частности, распространяет исключение из порожденных рабочих потоков в основной поток. Например:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

дает следующее при запуске:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

Исходный код из вопроса будет просто:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()
Kirr
источник