Python threading.timer - повторять функцию каждые n секунд

96

Я хочу запускать функцию каждые 0,5 секунды и иметь возможность запускать, останавливать и сбрасывать таймер. Я не слишком хорошо разбираюсь в том, как работают потоки Python, и у меня возникают трудности с таймером Python.

Однако я продолжаю получать, RuntimeError: threads can only be started onceкогда выполняю threading.timer.start()дважды. Есть ли обходной путь для этого? Я пробовала подавать заявку threading.timer.cancel()перед каждым стартом.

Псевдокод:

t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()
user1431282
источник

Ответы:

114

Лучший способ - запустить поток таймера один раз. Внутри потока таймера вы должны написать следующее

class MyThread(Thread):
    def __init__(self, event):
        Thread.__init__(self)
        self.stopped = event

    def run(self):
        while not self.stopped.wait(0.5):
            print("my thread")
            # call a function

В коде, запустившем таймер, вы можете затем setостановить событие, чтобы остановить таймер.

stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
Ганс Тогда
источник
4
Затем он закончит сон и потом остановится. Невозможно принудительно приостановить поток в python. Это дизайнерское решение, принятое разработчиками python. Однако чистый результат будет таким же. Ваш поток все еще будет работать (спать) на короткое время, но он не будет выполнять вашу функцию.
Hans Then
13
Ну, на самом деле, если вы хотите иметь возможность немедленно остановить поток таймера, просто используйте threading.Eventи waitвместо sleep. Затем, чтобы его разбудить, просто установите событие. Вам даже не нужно self.stoppedthen, потому что вы просто проверяете флаг события.
nneonneo
3
Событие будет использоваться строго для прерывания потока таймера. Обычно event.waitэто просто тайм-аут и действует как сон, но если вы хотите остановить (или иным образом прервать поток), вы устанавливаете событие потока, и он немедленно просыпается.
nneonneo
2
Я обновил свой ответ, чтобы использовать event.wait (). Спасибо за предложения.
Hans Then
1
просто вопрос, как после этого перезапустить ветку? звонок thread.start()дает мнеthreads can only be started once
Motassem MK
33

Из эквивалента setInterval в python :

import threading

def setInterval(interval):
    def decorator(function):
        def wrapper(*args, **kwargs):
            stopped = threading.Event()

            def loop(): # executed in another thread
                while not stopped.wait(interval): # until stopped
                    function(*args, **kwargs)

            t = threading.Thread(target=loop)
            t.daemon = True # stop if the program exits
            t.start()
            return stopped
        return wrapper
    return decorator

Применение:

@setInterval(.5)
def function():
    "..."

stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set() 

Или вот тот же функционал, но как отдельная функция вместо декоратора :

cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls() 

Вот как это сделать без использования потоков .

jfs
источник
как бы вы изменили интервал при использовании декоратора? скажем, я хочу изменить 0,5 секунды во время выполнения на 1 секунду или что-то еще?
lightxx
@lightxx: просто используйте @setInterval(1).
jfs
хм. так что либо я немного медлительный, либо вы меня неправильно поняли. Я имел ввиду во время выполнения. Я знаю, что могу изменить декоратор в исходном коде в любое время. что, например, у меня было три функции, каждая из которых украшена @setInterval (n). теперь во время выполнения я хочу изменить интервал функции 2, но оставить только функции 1 и 3.
lightxx
@lightxx: вы можете использовать другой интерфейс, например stop = repeat(every=second, call=your_function); ...; stop().
jfs
31

Использование потоков таймера-

from threading import Timer,Thread,Event


class perpetualTimer():

   def __init__(self,t,hFunction):
      self.t=t
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      self.thread.start()

   def start(self):
      self.thread.start()

   def cancel(self):
      self.thread.cancel()

def printer():
    print 'ipsem lorem'

t = perpetualTimer(5,printer)
t.start()

это можно остановить t.cancel()

Swapnil Jariwala
источник
3
Я считаю, что этот код содержит ошибку в cancelметоде. Когда это вызывается, поток либо 1) не работает, либо 2) работает. В 1) мы ждем запуска функции, поэтому отмена будет работать нормально. в 2) мы сейчас работаем, поэтому отмена не повлияет на текущее выполнение. кроме того, текущее выполнение перенастраивается, поэтому оно не повлияет на будущее.
Rich Episcopo
1
Этот код создает новый поток каждый раз, когда таймер заканчивается. Это колоссальная трата по сравнению с принятым ответом.
Adrian W
Этого решения следует избегать по указанной выше причине: оно создает новый поток каждый раз
Пинчия,
19

Немного улучшив ответ Ханса То , мы можем просто создать подкласс функции Timer. Следующее становится нашим полным кодом "таймера повтора", и его можно использовать как замену для threading.Timer со всеми теми же аргументами:

from threading import Timer

class RepeatTimer(Timer):
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

Пример использования:

def dummyfn(msg="foo"):
    print(msg)

timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()

производит следующий вывод:

foo
foo
foo
foo

а также

timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()

производит

bar
bar
bar
bar
right2clicky
источник
Позволит ли мне этот подход запустить / отменить / запустить / отменить поток таймера?
Пол Кнопф
1
Нет. Хотя этот подход позволяет вам делать все, что вы делали бы с обычным таймером, вы не можете сделать это с обычным таймером. С начала / отмены связана с основной нитью, если вы пытаетесь .start () поток , который ранее был .cancel () 'эд , то вы получите исключение, RuntimeError: threads can only be started once.
right2clicky 05
1
Действительно шикарное решение! Странно, что они не просто включили класс, который это делает.
Роджер Даль
это решение очень впечатляет, но я изо всех сил пытался понять, как оно было разработано, просто прочитав документацию по интерфейсу таймера потоковой передачи Python3 . Похоже, что ответ основан на знании реализации, входя в сам threading.pyмодуль.
Adam.at.Epsilon
15

В интересах предоставления правильного ответа с использованием таймера в соответствии с запросом OP я улучшу ответ swapnil jariwala :

from threading import Timer


class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")


def tick():
    print('ipsem lorem')

# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()
Билл Шумахер
источник
3

Я изменил код в коде swapnil-jariwala, чтобы сделать маленькие консольные часы.

from threading import Timer, Thread, Event
from datetime import datetime

class PT():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

def printer():
    tempo = datetime.today()
    h,m,s = tempo.hour, tempo.minute, tempo.second
    print(f"{h}:{m}:{s}")


t = PT(1, printer)
t.start()

ВЫХОД

>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...

Таймер с графическим интерфейсом tkinter

Этот код помещает таймер часов в маленькое окно с tkinter

from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk

app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def printer():
    tempo = datetime.today()
    clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
    try:
        lab['text'] = clock
    except RuntimeError:
        exit()


t = perpetualTimer(1, printer)
t.start()
app.mainloop()

Пример карточной игры (вроде)

from threading import Timer, Thread, Event
from datetime import datetime


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


x = datetime.today()
start = x.second


def printer():
    global questions, counter, start
    x = datetime.today()
    tempo = x.second
    if tempo - 3 > start:
        show_ans()
    #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
    print()
    print("-" + questions[counter])
    counter += 1
    if counter == len(answers):
        counter = 0


def show_ans():
    global answers, c2
    print("It is {}".format(answers[c2]))
    c2 += 1
    if c2 == len(answers):
        c2 = 0


questions = ["What is the capital of Italy?",
             "What is the capital of France?",
             "What is the capital of England?",
             "What is the capital of Spain?"]

answers = "Rome", "Paris", "London", "Madrid"

counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()

выход:

Get ready to answer
>>> 
-What is the capital of Italy?
It is Rome

-What is the capital of France?
It is Paris

-What is the capital of England?
...
Джованни Г. ПЙ
источник
если hFunction блокируется, разве это не добавит некоторой задержки к последующему запуску? Может быть, вы могли бы поменять местами строки так, чтобы handle_function сначала запускал таймер, а затем вызывал hFunction?
Усы
2

Мне пришлось сделать это для проекта. В итоге я начал отдельный поток для функции

t = threading.Thread(target =heartbeat, args=(worker,))
t.start()

**** сердцебиение - моя функция, рабочий - один из моих аргументов ****

внутри моей функции сердцебиения:

def heartbeat(worker):

    while True:
        time.sleep(5)
        #all of my code

Поэтому, когда я запускаю поток, функция будет постоянно ждать 5 секунд, запускать весь мой код и делать это бесконечно. Если вы хотите убить процесс, просто убейте поток.

Эндрю Уилкинс
источник
1

Я реализовал класс, который работает как таймер.

Я оставляю ссылку здесь на случай, если она кому-нибудь понадобится: https://github.com/ivanhalencp/python/tree/master/xTimer

Иван Коппес
источник
3
Хотя теоретически это может дать ответ на вопрос, было бы предпочтительнее включить сюда основные части ответа и предоставить ссылку для справки.
Карл Рихтер
1
from threading import Timer
def TaskManager():
    #do stuff
    t = Timer( 1, TaskManager )
    t.start()

TaskManager()

Вот небольшой пример, он поможет лучше понять, как он работает. функция taskManager () в конце создает для себя отложенный вызов функции.

Попробуйте изменить переменную "dalay", и вы увидите разницу

from threading import Timer, _sleep

# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True

def taskManager():

    global counter, DATA, delay, allow_run
    counter += 1

    if len(DATA) > 0:
        if FIFO:
            print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
        else:
            print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")

    else:
        print("["+str(counter)+"] no data")

    if allow_run:
        #delayed method/function call to it self
        t = Timer( dalay, taskManager )
        t.start()

    else:
        print(" END task-manager: disabled")

# ------------------------------------------
def main():

    DATA.append("data from main(): 0")
    _sleep(2)
    DATA.append("data from main(): 1")
    _sleep(2)


# ------------------------------------------
print(" START task-manager:")
taskManager()

_sleep(2)
DATA.append("first data")

_sleep(2)
DATA.append("second data")

print(" START main():")
main()
print(" END main():")

_sleep(2)
DATA.append("last data")

allow_run = False
ch3ll0v3k
источник
1
не могли бы вы рассказать немного подробнее, почему это работает?
minocha
ваш пример был немного запутанным, первый кодовый блок был всем, что вам нужно было сказать.
Партак
1

Мне нравится ответ right2clicky, особенно в том, что он не требует, чтобы поток прерывался и создавался новый каждый раз, когда тикает таймер. Кроме того, легко переопределить создание класса с функцией обратного вызова таймера, которая периодически вызывается. Это мой обычный вариант использования:

class MyClass(RepeatTimer):
    def __init__(self, period):
        super().__init__(period, self.on_timer)

    def on_timer(self):
        print("Tick")


if __name__ == "__main__":
    mc = MyClass(1)
    mc.start()
    time.sleep(5)
    mc.cancel()
Фрэнк П.
источник
1

Это альтернативная реализация, использующая функцию вместо класса. Вдохновленный @Andrew Wilkins выше.

Поскольку ожидание более точно, чем сон (учитывается время выполнения функции):

import threading

PING_ON = threading.Event()

def ping():
  while not PING_ON.wait(1):
    print("my thread %s" % str(threading.current_thread().ident))

t = threading.Thread(target=ping)
t.start()

sleep(5)
PING_ON.set()
Пол Кенджора
источник
1

Я придумал другое решение с классом SingleTon. Подскажите пожалуйста, есть ли здесь утечка памяти.

import time,threading

class Singleton:
  __instance = None
  sleepTime = 1
  executeThread = False

  def __init__(self):
     if Singleton.__instance != None:
        raise Exception("This class is a singleton!")
     else:
        Singleton.__instance = self

  @staticmethod
  def getInstance():
     if Singleton.__instance == None:
        Singleton()
     return Singleton.__instance


  def startThread(self):
     self.executeThread = True
     self.threadNew = threading.Thread(target=self.foo_target)
     self.threadNew.start()
     print('doing other things...')


  def stopThread(self):
     print("Killing Thread ")
     self.executeThread = False
     self.threadNew.join()
     print(self.threadNew)


  def foo(self):
     print("Hello in " + str(self.sleepTime) + " seconds")


  def foo_target(self):
     while self.executeThread:
        self.foo()
        print(self.threadNew)
        time.sleep(self.sleepTime)

        if not self.executeThread:
           break


sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()

sClass.getInstance().sleepTime = 2
sClass.startThread()
Юнус
источник
0

В дополнение к приведенным выше отличным ответам с использованием потоков, если вам нужно использовать свой основной поток или предпочитаете асинхронный подход - я обернул короткий класс вокруг класса таймера aio_timers (чтобы включить повторение)

import asyncio
from aio_timers import Timer

class RepeatingAsyncTimer():
    def __init__(self, interval, cb, *args, **kwargs):
        self.interval = interval
        self.cb = cb
        self.args = args
        self.kwargs = kwargs
        self.aio_timer = None
        self.start_timer()
    
    def start_timer(self):
        self.aio_timer = Timer(delay=self.interval, 
                               callback=self.cb_wrapper, 
                               callback_args=self.args, 
                               callback_kwargs=self.kwargs
                              )
    
    def cb_wrapper(self, *args, **kwargs):
        self.cb(*args, **kwargs)
        self.start_timer()


from time import time
def cb(timer_name):
    print(timer_name, time())

print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')

часы начинаются с: 16024388 40 .9690785

timer_ 1 16024388 45 .980087

таймер_ 2 16024388 50 .9806316

таймер_ 1 16024388 50 .9808934

таймер_ 1 16024388 55 .9863033

таймер_ 2 16024388 60 .9868324

таймер_ 1 16024388 60 .9876585

морк
источник