Есть ли способ убить нить?

767

Можно ли завершить работающий поток без установки / проверки каких-либо флагов / семафоров / и т. Д.?

Внезапная защита
источник

Ответы:

673

Как правило, это плохая схема - внезапное завершение потока в Python и на любом языке. Подумайте о следующих случаях:

  • поток содержит критический ресурс, который должен быть правильно закрыт
  • поток создал несколько других потоков, которые также должны быть уничтожены.

Хороший способ справиться с этим, если вы можете себе это позволить (если вы управляете своими собственными потоками), - это иметь флаг exit_request, который каждый поток проверяет через регулярные промежутки времени, чтобы узнать, пора ли ему выйти.

Например:

import threading

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self,  *args, **kwargs):
        super(StoppableThread, self).__init__(*args, **kwargs)
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def stopped(self):
        return self._stop_event.is_set()

В этом коде вы должны вызывать stop()поток, когда вы хотите, чтобы он вышел, и ждать, пока поток завершит работу, используя join(). Поток должен регулярно проверять флаг остановки.

Однако бывают случаи, когда вам действительно нужно убить поток. Например, когда вы оборачиваете внешнюю библиотеку, которая занята для длительных вызовов, и хотите прервать ее.

Следующий код позволяет (с некоторыми ограничениями) вызвать исключение в потоке Python:

def _async_raise(tid, exctype):
    '''Raises an exception in the threads with id tid'''
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
                                                     ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # "if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"
        ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadWithExc(threading.Thread):
    '''A thread class that supports raising exception in the thread from
       another thread.
    '''
    def _get_my_tid(self):
        """determines this (self's) thread id

        CAREFUL : this function is executed in the context of the caller
        thread, to get the identity of the thread represented by this
        instance.
        """
        if not self.isAlive():
            raise threading.ThreadError("the thread is not active")

        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id

        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid

        # TODO: in python 2.6, there's a simpler way to do : self.ident

        raise AssertionError("could not determine the thread's id")

    def raiseExc(self, exctype):
        """Raises the given exception type in the context of this thread.

        If the thread is busy in a system call (time.sleep(),
        socket.accept(), ...), the exception is simply ignored.

        If you are sure that your exception should terminate the thread,
        one way to ensure that it works is:

            t = ThreadWithExc( ... )
            ...
            t.raiseExc( SomeException )
            while t.isAlive():
                time.sleep( 0.1 )
                t.raiseExc( SomeException )

        If the exception is to be caught by the thread, you need a way to
        check that your thread has caught it.

        CAREFUL : this function is executed in the context of the
        caller thread, to raise an excpetion in the context of the
        thread represented by this instance.
        """
        _async_raise( self._get_my_tid(), exctype )

(Основано на Killable Threads Томера Филиба. Цитата о возвращаемом значении, по- PyThreadState_SetAsyncExcвидимому, взята из старой версии Python .)

Как отмечено в документации, это не волшебная палочка, потому что, если поток занят вне интерпретатора Python, он не будет перехватывать прерывание.

Хорошая схема использования этого кода - заставить поток перехватить определенное исключение и выполнить очистку. Таким образом, вы можете прервать задачу и при этом выполнить надлежащую очистку.

Филипп Ф
источник
78
@ Bluebird75: Более того, я не уверен, что получаю аргумент, что потоки не должны быть внезапно прерваны, «потому что поток может содержать критический ресурс, который должен быть правильно закрыт»: это также верно для основной программы и основных программ. может быть внезапно уничтожен пользователем (например, Ctrl-C в Unix) - в этом случае они пытаются обработать эту возможность настолько хорошо, насколько это возможно. Поэтому я не вижу, что особенного в потоках, и почему они не должны обрабатываться так же, как и основные программы (а именно, что их можно внезапно уничтожить). :) Не могли бы вы уточнить это?
Эрик О Лебиго
18
@EOL: С другой стороны, если все ресурсы, которыми владеет поток, являются локальными ресурсами (открытые файлы, сокеты), Linux достаточно хорош в очистке процесса, и это не приводит к утечке. Однако у меня были случаи, когда я создавал сервер с использованием сокета, и если я выполняю жестокое прерывание с помощью Ctrl-C, я больше не могу запускать программу, поскольку она не может связать сокет. Мне нужно подождать 5 минут. Правильным решением было поймать Ctrl-C и выполнить чистое отключение сокета.
Филипп Ф.
10
@ Bluebird75: кстати. Вы можете использовать SO_REUSEADDRопцию сокета, чтобы избежать Address already in useошибки.
Месса
12
Примечание об этом ответе: по крайней мере для меня (py2.6), мне пришлось пройти Noneвместо 0для res != 1случая, и я должен был позвонить ctypes.c_long(tid)и передать , что любые ctypes функционировать , а не три раза в день непосредственно.
Walt W
21
Стоит отметить, что _stop уже занят в поточной библиотеке Python 3. Поэтому, возможно, используйте другую переменную, иначе вы получите ошибку.
скончался трижды
113

Нет официального API для этого, нет.

Вам нужно использовать API платформы для уничтожения потока, например, pthread_kill или TerminateThread. Вы можете получить доступ к такому API, например, через pythonwin или через ctypes.

Обратите внимание, что это небезопасно. Скорее всего, это приведет к необратимому мусору (из локальных переменных стековых фреймов, которые становятся мусором) и может привести к взаимным блокировкам, если у удаляемого потока есть GIL в момент его завершения.

Мартин против Лёвиса
источник
26
Это будет приводить к тупиков , если нить в вопросе держит GIL.
Матиас Урликс
96

multiprocessing.Processможет ,p.terminate()

В тех случаях, когда я хочу уничтожить поток, но не хочу использовать флаги / блокировки / сигналы / семафоры / события / что угодно, я продвигаю потоки в полномасштабные процессы. Для кода, который использует только несколько потоков, издержки не так уж и плохи.

Например, это удобно, чтобы легко завершать вспомогательные «потоки», которые выполняют блокирующий ввод / вывод.

Преобразование тривиально: в связанном коде замените все threading.Threadна multiprocessing.Processи все queue.Queueна multiprocessing.Queueи добавьте необходимые вызовы p.terminate()вашего родительского процесса, который хочет убить своего потомкаp

Смотрите документацию по Python дляmultiprocessing .

CFI
источник
Спасибо. Я заменил queue.Queue на multiprocessing.JoinableQueue и последовал этому ответу: stackoverflow.com/a/11984760/911207
Дэвид Браун
Много страниц по этому вопросу. Я думаю, что для многих это очевидное решение
геотеория
6
multiprocessingэто хорошо, но имейте в виду, что аргументы зарезервированы для нового процесса. Так что, если один из аргументов является чем-то непопулярным (например, a logging.log), его не стоит использовать multiprocessing.
Lyager
1
multiprocessingаргументы передаются новому процессу в Windows, но Linux использует разветвление для их копирования (Python 3.7, не знаю, какие другие версии). Таким образом, вы получите код, который работает в Linux, но вызывает ошибки Pickle в Windows.
nyanpasu64
multiprocessingс ведением журнала это сложное дело. Необходимо использовать QueueHandler(см. Этот учебник ). Я научился этому нелегко.
Фанчен Бао
74

Если вы пытаетесь завершить всю программу, вы можете установить поток как «демон». см. Thread.daemon

schettino72
источник
Это не имеет никакого смысла. В документации четко сказано: «это должно быть установлено до вызова start (), в противном случае вызывается RuntimeError». Таким образом, если я хочу уничтожить поток, который изначально не был демоном, как я могу это использовать?
Раффи Хачадурян
27
Раффи, я думаю, он предлагает вам установить его заранее, зная, что при выходе из основного потока вы также хотите, чтобы потоки демона выходили.
Фантабол
1
Не уверен, почему это не принятый ответ
Эрик
Разве установка потока в качестве демона не является чем-то, что вы будете делать, если хотите, чтобы поток продолжал работать, даже если ваша основная программа закрылась?
Мишель Пикколини
Вы, сэр, мой герой дня. Именно то, что я искал, и без суеты вообще добавить.
Blizz
42

Как уже упоминали другие, нормой является установка флага остановки. Для чего-то более легкого (без подклассов Thread, без глобальной переменной), лямбда-обратный вызов является опцией. (Обратите внимание на круглые скобки в if stop().)

import threading
import time

def do_work(id, stop):
    print("I am thread", id)
    while True:
        print("I am thread {} doing something".format(id))
        if stop():
            print("  Exiting loop.")
            break
    print("Thread {}, signing off".format(id))


def main():
    stop_threads = False
    workers = []
    for id in range(0,3):
        tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
        workers.append(tmp)
        tmp.start()
    time.sleep(3)
    print('main: done sleeping; time to stop the threads.')
    stop_threads = True
    for worker in workers:
        worker.join()
    print('Finis.')

if __name__ == '__main__':
    main()

Замена print()на pr()функцию, которая всегда сбрасывает ( sys.stdout.flush()), может улучшить точность вывода оболочки.

(Проверено только на Windows / Eclipse / Python3.3)

Джон Кумбс
источник
1
Проверено на Linux / Python 2.7, работает как шарм. Это должен быть официальный ответ, он намного проще.
Пол Кенджора
1
Проверено на Linux Ubuntu Server 17.10 / Python 3.6.3 и работает.
Маркос
1
Также проверено в 2.7. Такой хороший ответ!
Silgon
Что такое pr()функция?
Alper
35

Это основано на thread2 - убиваемые темы (рецепт Python)

Вам нужно вызвать PyThreadState_SetasyncExc (), который доступен только через ctypes.

Это было протестировано только на Python 2.7.3, но, вероятно, будет работать с другими недавними выпусками 2.x.

import ctypes

def terminate_thread(thread):
    """Terminates a python thread from another thread.

    :param thread: a threading.Thread instance
    """
    if not thread.isAlive():
        return

    exc = ctypes.py_object(SystemExit)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(thread.ident), exc)
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")
Йохан Далин
источник
Я использую что-то вроде этого, чтобы дать моим нитям KeyboardInterruptвозможность очистить их. Если после этого они ВСЕГДА зависнут, то SystemExitэто уместно, или просто убейте процесс из терминала.
Древицко
Это работает, если поток в данный момент выполняется. Это не работает, если поток находится в системном вызове; исключение будет игнорироваться.
Матиас Урликс
@MatthiasUrlich, есть идеи, как определить состояние выполнения потока, чтобы можно было напечатать предупреждение или повторить попытку?
Йохан Далин
1
@JohanDahlin Вы можете немного подождать (что, если вы хотите повторить попытку, вам все равно нужно сделать), а затем выполнить тест isAlive (). В любом случае, хотя это сработает, я также не гарантирую, что это не оставит висячих ссылок. Хотя теоретически возможно обеспечить безопасное уничтожение потоков в CPython, при разумном использовании pthread_cleanup_push()/_pop()было бы много работы для правильной реализации, и это заметно замедлило бы работу интерпретатора.
Матиас Урлич
32

Вы никогда не должны принудительно убивать поток, не сотрудничая с ним.

Уничтожение потока снимает любые гарантии того, что блоки try / finally настроены так, что вы можете оставить блокировки заблокированными, открытыми файлами и т. Д.

Единственный раз, когда вы можете утверждать, что принудительное уничтожение потоков - это хорошая идея, это быстрое уничтожение программы, но не отдельных потоков.

Лассе В. Карлсен
источник
11
Почему так сложно просто рассказать потоку, пожалуйста, убейте себя, когда закончите текущий цикл ... Я не понимаю.
Мехди
2
В процессоре нет встроенного механизма для идентификации «цикла» как такового. Лучшее, на что вы можете надеяться, это использовать какой-то сигнал, который код, который в данный момент находится внутри цикла, будет проверять после его выхода. Правильный способ обработки синхронизации потоков - это совместные средства, приостановка, возобновление и уничтожение потоков - это функции, предназначенные для отладчиков и операционной системы, а не код приложения.
Лассе В. Карлсен
2
@Mehdi: если я (лично) пишу код в ветке, да, я согласен с вами. Но есть случаи, когда я использую сторонние библиотеки, и у меня нет доступа к циклу выполнения этого кода. Это один из вариантов использования запрошенной функции.
Дэн Х
@DanH Это даже хуже с сторонним кодом, так как вы не знаете, какой ущерб это может причинить. Если ваша сторонняя библиотека недостаточно надежна, чтобы ее убить, вам следует выполнить одно из следующих действий: (1) попросить автора решить проблему, (2) использовать что-то еще. Если у вас действительно нет выбора, то размещение этого кода в отдельном процессе должно быть более безопасным, поскольку некоторые ресурсы совместно используются только в рамках одного процесса.
Phil1970
25

В Python вы просто не можете уничтожить поток напрямую.

Если вам НЕ действительно нужен поток (!), То вместо использования пакета потоков вы можете использовать многопроцессорный пакет . Здесь, чтобы убить процесс, вы можете просто вызвать метод:

yourProcess.terminate()  # kill the process!

Python убьет ваш процесс (в Unix через сигнал SIGTERM, а в Windows через TerminateProcess()вызов). Обратите внимание, чтобы использовать его при использовании очереди или трубы! (это может повредить данные в очереди / конвейере)

Обратите внимание, что multiprocessing.Eventи multiprocessing.Semaphoreработают точно так же, как threading.Eventи threading.Semaphoreсоответственно. Фактически, первые - клоны последних.

Если вам ДЕЙСТВИТЕЛЬНО нужно использовать поток, нет способа уничтожить его напрямую. Однако вы можете использовать «поток демона» . На самом деле, в Python поток может быть помечен как демон :

yourThread.daemon = True  # set the Thread as a "daemon thread"

Основная программа завершит работу, когда не останется живых потоков, не являющихся демонами. Другими словами, когда ваш основной поток (который, конечно, не является потоком демона) завершит свои операции, программа завершит работу, даже если все еще работают некоторые потоки демона.

Обратите внимание, что необходимо установить поток как daemonперед вызовом start()метода!

Конечно, вы можете и должны использовать daemonдаже с multiprocessing. Здесь, когда основной процесс завершается, он пытается завершить все свои демонические дочерние процессы.

Наконец, обратите внимание, что это sys.exit()и os.kill()не варианты.

Паоло Ровелли
источник
14

Вы можете убить поток, установив трассировку в поток, который выйдет из потока. Смотрите прикрепленную ссылку для одной возможной реализации.

Убить нить в Python

Kozyarchuk
источник
Я уже видел это. Это решение основано на проверке флага self.killed
Sudden Def
1
Один из немногих ответов здесь, который на самом деле работает
Ponkadoodle
5
Две проблемы с этим решением: (а) установка трассировщика с помощью sys.settrace () замедлит работу вашего потока. Целых 10 раз медленнее, если это связано с вычислениями. (б) не повлияет на ваш поток, пока он находится в системном вызове.
Матиас Урликс
10

Если вы явно вызываете time.sleep()как часть своего потока (скажем, опрашиваете какую-то внешнюю службу), то улучшением метода Филиппа является использование тайм-аута в методе event's, wait()где бы вы ни находилисьsleep()

Например:

import threading

class KillableThread(threading.Thread):
    def __init__(self, sleep_interval=1):
        super().__init__()
        self._kill = threading.Event()
        self._interval = sleep_interval

    def run(self):
        while True:
            print("Do Something")

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

Затем запустить его

t = KillableThread(sleep_interval=5)
t.start()
# Every 5 seconds it prints:
#: Do Something
t.kill()
#: Killing Thread

Преимущество использования wait()вместо sleep()проверки и регулярной проверки события состоит в том, что вы можете программировать с более длительными интервалами сна, поток останавливается почти сразу (когда вы в противном случае sleep()), и, на мой взгляд, код для обработки выхода значительно проще ,

SCB
источник
3
почему этот пост был отклонен? Что не так с этим постом? Это выглядит точно так, как мне нужно ...
JDOaktown
9

Лучше, если вы не убьете нить. Можно было бы ввести блок try в цикл потока и вызвать исключение, когда вы хотите остановить поток (например, break / return / ..., который останавливает ваш for / while / ...). Я использовал это в своем приложении, и это работает ...

Giancarlo
источник
8

Определенно возможно реализовать Thread.stopметод, как показано в следующем примере кода:

import sys
import threading
import time


class StopThread(StopIteration):
    pass

threading.SystemExit = SystemExit, StopThread


class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace


class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

###############################################################################


def main():
    test1 = Thread2(target=printer)
    test1.start()
    time.sleep(1)
    test1.stop()
    test1.join()
    test2 = Thread2(target=speed_test)
    test2.start()
    time.sleep(1)
    test2.stop()
    test2.join()
    test3 = Thread3(target=speed_test)
    test3.start()
    time.sleep(1)
    test3.stop()
    test3.join()


def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)


def speed_test(count=0):
    try:
        while True:
            count += 1
    except StopThread:
        print('Count =', count)

if __name__ == '__main__':
    main()

Thread3Класс появляется для запуска кода примерно на 33% быстрее , чем Thread2класс.

Noctis Skytower
источник
3
Это умный способ ввести проверки для того, self.__stopчтобы быть установленным в поток. Обратите внимание, что, как и большинство других решений здесь, он фактически не прерывает блокирующий вызов, поскольку функция трассировки вызывается только при вводе новой локальной области. Также стоит отметить, что он sys.settraceдействительно предназначен для реализации отладчиков, профилей и т. Д. И поэтому считается деталью реализации CPython и не гарантированно существует в других реализациях Python.
Дано
3
@dano: Одна из самых больших проблем с Thread2классом заключается в том, что он выполняет код примерно в десять раз медленнее. Некоторые люди все еще могут найти это приемлемым.
Noctis Skytower
+1 при этом значительно замедляет выполнение кода. Я бы предложил, чтобы автор этого решения включил эту информацию в ответ.
Вишал
6
from ctypes import *
pthread = cdll.LoadLibrary("libpthread-2.15.so")
pthread.pthread_cancel(c_ulong(t.ident))

т твой Threadобъект.

Прочитайте исходный код python ( Modules/threadmodule.cи Python/thread_pthread.h), и вы увидите, что Thread.identэто pthread_tтип, так что вы можете делать все, что pthreadможете сделать в использовании python libpthread.

snyh
источник
И как вы делаете это на Windows?
iChux
12
Вы не делаете; не на Windows и не на Linux либо. Причина: рассматриваемый поток может содержать GIL, пока вы это делаете (Python освобождает GIL, когда вы вызываете C). Если это произойдет, ваша программа будет мгновенно заблокирована. Даже если это не так, наконец: блоки не будут выполняться и т. Д., Так что это очень небезопасная идея.
Матиас Урлич
6

Для обхода потока можно использовать следующий обходной путь:

kill_threads = False

def doSomething():
    global kill_threads
    while True:
        if kill_threads:
            thread.exit()
        ......
        ......

thread.start_new_thread(doSomething, ())

Это может быть использовано даже для завершения потоков, чей код написан в другом модуле, из основного потока. Мы можем объявить глобальную переменную в этом модуле и использовать ее для завершения потоков, порожденных в этом модуле.

Я обычно использую это для завершения всех потоков при выходе из программы. Это может быть не идеальный способ завершить поток (ы), но может помочь.

Амит Чахар
источник
Недурно. Просто для понимания.
Алиссаэлия
6

Я опаздываю к этой игре, но я боролся с подобным вопросом, и следующее, кажется, как решило проблему для меня идеально, так и позволило мне выполнить некоторую базовую проверку и очистку состояния потока при выходе из демонизированного подпотока:

import threading
import time
import atexit

def do_work():

  i = 0
  @atexit.register
  def goodbye():
    print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" %
           (i, threading.currentThread().ident))

  while True:
    print i
    i += 1
    time.sleep(1)

t = threading.Thread(target=do_work)
t.daemon = True
t.start()

def after_timeout():
  print "KILL MAIN THREAD: %s" % threading.currentThread().ident
  raise SystemExit

threading.Timer(2, after_timeout).start()

Урожайность:

0
1
KILL MAIN THREAD: 140013208254208
'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]
slumtrimpet
источник
Это отличный ответ, который должен быть выше в списке
drootang
Почему повышение SystemExitв after_timeoutпотоке делает что-либо с основным потоком (который просто ожидает выхода в первом примере в этом примере)?
Дэвис Херринг
@DavisHerring Я не уверен, к чему ты клонишь. SystemExit убивает основной поток, почему вы думаете, что он будет делать что-нибудь в основном потоке? Без этого вызова программа просто будет ожидать дочернего потока. Вы также можете нажать Ctrl + C или использовать любое другое средство для уничтожения основного потока, но это пример.
трущоб
@slumtrimpet: SystemExitимеет только два специальных свойства: он не производит трассировку (когда любой поток завершается, выбрасывая один), и если основной поток завершается, бросая один, он устанавливает состояние выхода (в то же время ожидая других потоков, не являющихся демонами). выходить).
Дэвис Херринг
@DavisHerring О, я понял. Полностью с вами согласен. Я неправильно запомнил, что мы здесь сделали, и неправильно понял ваш комментарий.
трущоб
4

Одна вещь, которую я хочу добавить, это то, что если вы читаете официальную документацию в потоке lib Python , рекомендуется избегать использования «демонических» потоков, когда вы не хотите, чтобы потоки заканчивались внезапно, с флагом, упомянутым Паоло Ровелли .

Из официальной документации:

Потоки демона внезапно останавливаются при завершении работы. Их ресурсы (такие как открытые файлы, транзакции базы данных и т. Д.) Могут быть освобождены неправильно. Если вы хотите, чтобы ваши потоки изящно останавливались, сделайте их недемоническими и используйте подходящий механизм сигнализации, такой как Событие.

Я думаю, что создание демонических потоков зависит от вашего приложения, но в целом (и на мой взгляд) лучше не убивать их или делать их демоническими. В многопроцессорной обработке вы можете использовать is_alive()для проверки состояния процесса и «завершить» для завершения их (также вы избегаете проблем GIL). Но иногда вы можете найти больше проблем, когда выполняете свой код в Windows.

И всегда помните, что если у вас есть «живые потоки», интерпретатор Python будет их ждать. (Из-за этого демон может помочь вам, если не имеет значения, внезапно заканчивается).

Чема
источник
4
Я не понимаю последний абзац.
Чепанг
@Tshepang Это означает, что если в вашем приложении есть работающие не-демонические потоки, интерпретатор Python продолжит работать, пока все не- демонические потоки не будут завершены . Если вам безразлично, заканчиваются ли потоки после завершения программы, тогда их создание может быть полезным.
Том Мидделтын
3

Для этого есть библиотека, стопит . Хотя некоторые из тех же предостережений, перечисленных здесь, все еще применяются, по крайней мере, эта библиотека представляет регулярную, повторяемую технику для достижения поставленной цели.

Джейсон Р. Кумбс
источник
1

Хотя он довольно старый, для некоторых это может быть удобным решением:

Маленький модуль, который расширяет функциональность модуля потока - позволяет одному потоку вызывать исключения в контексте другого потока. Подняв SystemExit, вы можете, наконец, убить потоки Python.

import threading
import ctypes     

def _async_raise(tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble, 
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class Thread(threading.Thread):
    def raise_exc(self, excobj):
        assert self.isAlive(), "thread must be started"
        for tid, tobj in threading._active.items():
            if tobj is self:
                _async_raise(tid, excobj)
                return

        # the thread was alive when we entered the loop, but was not found 
        # in the dict, hence it must have been already terminated. should we raise
        # an exception here? silently ignore?

    def terminate(self):
        # must raise the SystemExit type, instead of a SystemExit() instance
        # due to a bug in PyThreadState_SetAsyncExc
        self.raise_exc(SystemExit)

Таким образом, он позволяет «потоку вызывать исключения в контексте другого потока» и, таким образом, завершенный поток может обрабатывать завершение без регулярной проверки флага прерывания.

Однако, согласно исходному коду, есть некоторые проблемы с этим кодом.

  • Исключение будет возникать только при выполнении байт-кода Python. Если ваш поток вызывает встроенную / встроенную функцию блокировки, исключение будет вызвано только тогда, когда выполнение вернется к коду Python.
    • Существует также проблема, если встроенная функция внутренне вызывает PyErr_Clear (), что эффективно отменит ожидающее исключение. Вы можете попытаться поднять его снова.
  • Только типы исключений могут быть сгенерированы безопасно. Экземпляры исключений могут вызывать непредвиденное поведение и поэтому ограничены.
  • Я попросил предоставить эту функцию во встроенном модуле потока, но поскольку ctypes стала стандартной библиотекой (начиная с версии 2.5), и эта
    функция вряд ли будет зависеть от реализации, ее можно не
    раскрывать.
wp78de
источник
0

Это похоже на работу с pywin32 на Windows 7

my_thread = threading.Thread()
my_thread.start()
my_thread._Thread__stop()
zzart
источник
0

Питер Хинтдженс, один из основателей проекта ØMQ, говорит, что использование ØMQ и избегание примитивов синхронизации, таких как блокировки, мьютексы, события и т. Д., Является самым безопасным и безопасным способом написания многопоточных программ:

http://zguide.zeromq.org/py:all#Multithreading-with-ZeroMQ

Это включает в себя сообщение дочернему потоку, что он должен отменить свою работу. Это можно сделать, оснастив поток ØMQ-сокетом и опросив на нем сокет, чтобы получить сообщение о том, что он должен быть отменен.

Ссылка также предоставляет пример многопоточного кода Python с ØMQ.

paulkernstock
источник
0

Предполагая, что вы хотите иметь несколько потоков одной и той же функции, это ИМХО самая простая реализация, чтобы остановить один по id:

import time
from threading import Thread

def doit(id=0):
    doit.stop=0
    print("start id:%d"%id)
    while 1:
        time.sleep(1)
        print(".")
        if doit.stop==id:
            doit.stop=0
            break
    print("end thread %d"%id)

t5=Thread(target=doit, args=(5,))
t6=Thread(target=doit, args=(6,))

t5.start() ; t6.start()
time.sleep(2)
doit.stop =5  #kill t5
time.sleep(2)
doit.stop =6  #kill t6

Хорошая вещь здесь, вы можете иметь несколько одинаковых и разных функций, и остановить их все functionname.stop

Если вы хотите иметь только один поток функции, вам не нужно запоминать идентификатор. Просто остановитесь, если doit.stop> 0.

rundekugel
источник
0

Просто чтобы развить идею @ SCB (именно это мне и было нужно) создать подкласс KillableThread с настраиваемой функцией:

from threading import Thread, Event

class KillableThread(Thread):
    def __init__(self, sleep_interval=1, target=None, name=None, args=(), kwargs={}):
        super().__init__(None, target, name, args, kwargs)
        self._kill = Event()
        self._interval = sleep_interval
        print(self._target)

    def run(self):
        while True:
            # Call custom function with arguments
            self._target(*self._args)

        # If no kill signal is set, sleep for the interval,
        # If kill signal comes in while sleeping, immediately
        #  wake up and handle
        is_killed = self._kill.wait(self._interval)
        if is_killed:
            break

    print("Killing Thread")

def kill(self):
    self._kill.set()

if __name__ == '__main__':

    def print_msg(msg):
        print(msg)

    t = KillableThread(10, print_msg, args=("hello world"))
    t.start()
    time.sleep(6)
    print("About to kill thread")
    t.kill()

Естественно, как и в случае с @SBC, поток не ожидает запуска нового цикла для остановки. В этом примере вы увидите сообщение «Killing Thread», напечатанное сразу после «About to kill thread», вместо того, чтобы ждать еще 4 секунды до завершения потока (так как мы проспали уже 6 секунд).

Второй аргумент в конструкторе KillableThread - ваша пользовательская функция (здесь print_msg). Аргумент Args - это аргументы, которые будут использоваться при вызове функции (("hello world")) здесь.

Тим Михан
источник
0

Как уже упоминалось в @ Kozyarchuk в ответ , установка трассировки работает. Поскольку этот ответ не содержал кода, вот рабочий готовый пример:

import sys, threading, time 

class TraceThread(threading.Thread): 
    def __init__(self, *args, **keywords): 
        threading.Thread.__init__(self, *args, **keywords) 
        self.killed = False
    def start(self): 
        self._run = self.run 
        self.run = self.settrace_and_run
        threading.Thread.start(self) 
    def settrace_and_run(self): 
        sys.settrace(self.globaltrace) 
        self._run()
    def globaltrace(self, frame, event, arg): 
        return self.localtrace if event == 'call' else None
    def localtrace(self, frame, event, arg): 
        if self.killed and event == 'line': 
            raise SystemExit() 
        return self.localtrace 

def f(): 
    while True: 
        print('1') 
        time.sleep(2)
        print('2') 
        time.sleep(2)
        print('3') 
        time.sleep(2)

t = TraceThread(target=f) 
t.start() 
time.sleep(2.5) 
t.killed = True

Останавливается после печати 1и 2. 3не печатается.

Basj
источник
-1

Вы можете выполнить свою команду в процессе, а затем уничтожить ее, используя идентификатор процесса. Мне нужно было синхронизировать два потока, один из которых не возвращается сам по себе.

processIds = []

def executeRecord(command):
    print(command)

    process = subprocess.Popen(command, stdout=subprocess.PIPE)
    processIds.append(process.pid)
    print(processIds[0])

    #Command that doesn't return by itself
    process.stdout.read().decode("utf-8")
    return;


def recordThread(command, timeOut):

    thread = Thread(target=executeRecord, args=(command,))
    thread.start()
    thread.join(timeOut)

    os.kill(processIds.pop(), signal.SIGINT)

    return;
user1942887
источник
-1

Запустите подпоток с помощью setDaemon (True).

def bootstrap(_filename):
    mb = ModelBootstrap(filename=_filename) # Has many Daemon threads. All get stopped automatically when main thread is stopped.

t = threading.Thread(target=bootstrap,args=('models.conf',))
t.setDaemon(False)

while True:
    t.start()
    time.sleep(10) # I am just allowing the sub-thread to run for 10 sec. You can listen on an event to stop execution.
    print('Thread stopped')
    break
Sud
источник
-2

Это плохой ответ, смотрите комментарии

Вот как это сделать:

from threading import *

...

for thread in enumerate():
    if thread.isAlive():
        try:
            thread._Thread__stop()
        except:
            print(str(thread.getName()) + ' could not be terminated'))

Дайте ему несколько секунд, после чего ваш поток должен быть остановлен. Проверьте также thread._Thread__delete()метод.

Я бы порекомендовал thread.quit()метод для удобства. Например, если у вас есть сокет в вашем потоке, я бы порекомендовал создать quit()метод в вашем классе дескриптора сокета, завершить сокет, а затем запустить thread._Thread__stop()внутри вашего quit().

DoXiD
источник
Мне пришлось использовать self._Thread__stop () внутри моего Threading.Thread объекта, чтобы остановить его. Я не понимаю, почему простой self.join (), как этот пример ( code.activestate.com/recipes/65448-thread-control-idiom ), не работает.
Хариджай,
12
Было бы полезно узнать больше о том, что «это не остановит поток».
2371
19
По сути, вызов метода _Thread__stop не имеет никакого эффекта, кроме сообщения Python о том, что поток остановлен. Это может на самом деле продолжать работать. См. Gist.github.com/2787191 для примера.
Bluehorn
35
Это совершенно неправильно. _Thread__stop()просто помечает поток как остановленный , он фактически не останавливает поток! Никогда не делай этого. Прочитайте .
dotancohen
-2

Если вам действительно нужна возможность убить подзадачу, используйте альтернативную реализацию. multiprocessingи geventоба поддерживают без разбора убивая "нить".

Поток Python не поддерживает отмену. Даже не пытайся. Скорее всего, ваш код заблокирует, повредит или утечет память, или будет иметь другие непреднамеренные "интересные" трудно отлаживаемые эффекты, которые происходят редко и недетерминированно.

Матиас Урлич
источник
2
… И да, я знаю, что оба они не являются строго «многопоточными», но они оба работают, если ваш код соответствует (или может быть сделан так, чтобы соответствовать) их модели.
Матиас Урликс