Я хочу запускать функцию каждые 0,5 секунды и иметь возможность запускать, останавливать и сбрасывать таймер. Я не слишком хорошо разбираюсь в том, как работают потоки Python, и у меня возникают трудности с таймером Python.
Однако я продолжаю получать, RuntimeError: threads can only be started once
когда выполняю threading.timer.start()
дважды. Есть ли обходной путь для этого? Я пробовала подавать заявку threading.timer.cancel()
перед каждым стартом.
Псевдокод:
t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
python
python-3.x
python-2.7
user1431282
источник
источник
threading.Event
иwait
вместоsleep
. Затем, чтобы его разбудить, просто установите событие. Вам даже не нужноself.stopped
then, потому что вы просто проверяете флаг события.event.wait
это просто тайм-аут и действует как сон, но если вы хотите остановить (или иным образом прервать поток), вы устанавливаете событие потока, и он немедленно просыпается.thread.start()
дает мнеthreads can only be started once
Из эквивалента setInterval в python :
import threading def setInterval(interval): def decorator(function): def wrapper(*args, **kwargs): stopped = threading.Event() def loop(): # executed in another thread while not stopped.wait(interval): # until stopped function(*args, **kwargs) t = threading.Thread(target=loop) t.daemon = True # stop if the program exits t.start() return stopped return wrapper return decorator
Применение:
@setInterval(.5) def function(): "..." stop = function() # start timer, the first call is in .5 seconds stop.set() # stop the loop stop = function() # start new timer # ... stop.set()
Или вот тот же функционал, но как отдельная функция вместо декоратора :
cancel_future_calls = call_repeatedly(60, print, "Hello, World") # ... cancel_future_calls()
Вот как это сделать без использования потоков .
источник
@setInterval(1)
.stop = repeat(every=second, call=your_function); ...; stop()
.stop = call_repeatedly(interval, your_function); ...; stop()
реализацияИспользование потоков таймера-
from threading import Timer,Thread,Event class perpetualTimer(): def __init__(self,t,hFunction): self.t=t self.hFunction = hFunction self.thread = Timer(self.t,self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t,self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): print 'ipsem lorem' t = perpetualTimer(5,printer) t.start()
это можно остановить
t.cancel()
источник
cancel
методе. Когда это вызывается, поток либо 1) не работает, либо 2) работает. В 1) мы ждем запуска функции, поэтому отмена будет работать нормально. в 2) мы сейчас работаем, поэтому отмена не повлияет на текущее выполнение. кроме того, текущее выполнение перенастраивается, поэтому оно не повлияет на будущее.Немного улучшив ответ Ханса То , мы можем просто создать подкласс функции Timer. Следующее становится нашим полным кодом "таймера повтора", и его можно использовать как замену для threading.Timer со всеми теми же аргументами:
from threading import Timer class RepeatTimer(Timer): def run(self): while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs)
Пример использования:
def dummyfn(msg="foo"): print(msg) timer = RepeatTimer(1, dummyfn) timer.start() time.sleep(5) timer.cancel()
производит следующий вывод:
а также
timer = RepeatTimer(1, dummyfn, args=("bar",)) timer.start() time.sleep(5) timer.cancel()
производит
источник
RuntimeError: threads can only be started once
.threading.py
модуль.В интересах предоставления правильного ответа с использованием таймера в соответствии с запросом OP я улучшу ответ swapnil jariwala :
from threading import Timer class InfiniteTimer(): """A Timer class that does not stop, unless you want it to.""" def __init__(self, seconds, target): self._should_continue = False self.is_running = False self.seconds = seconds self.target = target self.thread = None def _handle_target(self): self.is_running = True self.target() self.is_running = False self._start_timer() def _start_timer(self): if self._should_continue: # Code could have been running when cancel was called. self.thread = Timer(self.seconds, self._handle_target) self.thread.start() def start(self): if not self._should_continue and not self.is_running: self._should_continue = True self._start_timer() else: print("Timer already started or running, please wait if you're restarting.") def cancel(self): if self.thread is not None: self._should_continue = False # Just in case thread is running and cancel fails. self.thread.cancel() else: print("Timer never started or failed to initialize.") def tick(): print('ipsem lorem') # Example Usage t = InfiniteTimer(0.5, tick) t.start()
источник
Я изменил код в коде swapnil-jariwala, чтобы сделать маленькие консольные часы.
from threading import Timer, Thread, Event from datetime import datetime class PT(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def printer(): tempo = datetime.today() h,m,s = tempo.hour, tempo.minute, tempo.second print(f"{h}:{m}:{s}") t = PT(1, printer) t.start()
>>> 11:39:11 11:39:12 11:39:13 11:39:14 11:39:15 11:39:16 ...
Таймер с графическим интерфейсом tkinter
Этот код помещает таймер часов в маленькое окно с tkinter
from threading import Timer, Thread, Event from datetime import datetime import tkinter as tk app = tk.Tk() lab = tk.Label(app, text="Timer will start in a sec") lab.pack() class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): tempo = datetime.today() clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second) try: lab['text'] = clock except RuntimeError: exit() t = perpetualTimer(1, printer) t.start() app.mainloop()
Пример карточной игры (вроде)
from threading import Timer, Thread, Event from datetime import datetime class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() x = datetime.today() start = x.second def printer(): global questions, counter, start x = datetime.today() tempo = x.second if tempo - 3 > start: show_ans() #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="") print() print("-" + questions[counter]) counter += 1 if counter == len(answers): counter = 0 def show_ans(): global answers, c2 print("It is {}".format(answers[c2])) c2 += 1 if c2 == len(answers): c2 = 0 questions = ["What is the capital of Italy?", "What is the capital of France?", "What is the capital of England?", "What is the capital of Spain?"] answers = "Rome", "Paris", "London", "Madrid" counter = 0 c2 = 0 print("Get ready to answer") t = perpetualTimer(3, printer) t.start()
выход:
Get ready to answer >>> -What is the capital of Italy? It is Rome -What is the capital of France? It is Paris -What is the capital of England? ...
источник
Мне пришлось сделать это для проекта. В итоге я начал отдельный поток для функции
**** сердцебиение - моя функция, рабочий - один из моих аргументов ****
внутри моей функции сердцебиения:
def heartbeat(worker): while True: time.sleep(5) #all of my code
Поэтому, когда я запускаю поток, функция будет постоянно ждать 5 секунд, запускать весь мой код и делать это бесконечно. Если вы хотите убить процесс, просто убейте поток.
источник
Я реализовал класс, который работает как таймер.
Я оставляю ссылку здесь на случай, если она кому-нибудь понадобится: https://github.com/ivanhalencp/python/tree/master/xTimer
источник
from threading import Timer def TaskManager(): #do stuff t = Timer( 1, TaskManager ) t.start() TaskManager()
Вот небольшой пример, он поможет лучше понять, как он работает. функция taskManager () в конце создает для себя отложенный вызов функции.
Попробуйте изменить переменную "dalay", и вы увидите разницу
from threading import Timer, _sleep # ------------------------------------------ DATA = [] dalay = 0.25 # sec counter = 0 allow_run = True FIFO = True def taskManager(): global counter, DATA, delay, allow_run counter += 1 if len(DATA) > 0: if FIFO: print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]") else: print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]") else: print("["+str(counter)+"] no data") if allow_run: #delayed method/function call to it self t = Timer( dalay, taskManager ) t.start() else: print(" END task-manager: disabled") # ------------------------------------------ def main(): DATA.append("data from main(): 0") _sleep(2) DATA.append("data from main(): 1") _sleep(2) # ------------------------------------------ print(" START task-manager:") taskManager() _sleep(2) DATA.append("first data") _sleep(2) DATA.append("second data") print(" START main():") main() print(" END main():") _sleep(2) DATA.append("last data") allow_run = False
источник
Мне нравится ответ right2clicky, особенно в том, что он не требует, чтобы поток прерывался и создавался новый каждый раз, когда тикает таймер. Кроме того, легко переопределить создание класса с функцией обратного вызова таймера, которая периодически вызывается. Это мой обычный вариант использования:
class MyClass(RepeatTimer): def __init__(self, period): super().__init__(period, self.on_timer) def on_timer(self): print("Tick") if __name__ == "__main__": mc = MyClass(1) mc.start() time.sleep(5) mc.cancel()
источник
Это альтернативная реализация, использующая функцию вместо класса. Вдохновленный @Andrew Wilkins выше.
Поскольку ожидание более точно, чем сон (учитывается время выполнения функции):
import threading PING_ON = threading.Event() def ping(): while not PING_ON.wait(1): print("my thread %s" % str(threading.current_thread().ident)) t = threading.Thread(target=ping) t.start() sleep(5) PING_ON.set()
источник
Я придумал другое решение с классом SingleTon. Подскажите пожалуйста, есть ли здесь утечка памяти.
import time,threading class Singleton: __instance = None sleepTime = 1 executeThread = False def __init__(self): if Singleton.__instance != None: raise Exception("This class is a singleton!") else: Singleton.__instance = self @staticmethod def getInstance(): if Singleton.__instance == None: Singleton() return Singleton.__instance def startThread(self): self.executeThread = True self.threadNew = threading.Thread(target=self.foo_target) self.threadNew.start() print('doing other things...') def stopThread(self): print("Killing Thread ") self.executeThread = False self.threadNew.join() print(self.threadNew) def foo(self): print("Hello in " + str(self.sleepTime) + " seconds") def foo_target(self): while self.executeThread: self.foo() print(self.threadNew) time.sleep(self.sleepTime) if not self.executeThread: break sClass = Singleton() sClass.startThread() time.sleep(5) sClass.getInstance().stopThread() sClass.getInstance().sleepTime = 2 sClass.startThread()
источник
В дополнение к приведенным выше отличным ответам с использованием потоков, если вам нужно использовать свой основной поток или предпочитаете асинхронный подход - я обернул короткий класс вокруг класса таймера aio_timers (чтобы включить повторение)
import asyncio from aio_timers import Timer class RepeatingAsyncTimer(): def __init__(self, interval, cb, *args, **kwargs): self.interval = interval self.cb = cb self.args = args self.kwargs = kwargs self.aio_timer = None self.start_timer() def start_timer(self): self.aio_timer = Timer(delay=self.interval, callback=self.cb_wrapper, callback_args=self.args, callback_kwargs=self.kwargs ) def cb_wrapper(self, *args, **kwargs): self.cb(*args, **kwargs) self.start_timer() from time import time def cb(timer_name): print(timer_name, time()) print(f'clock starts at: {time()}') timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1') timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')
часы начинаются с: 16024388 40 .9690785
timer_ 1 16024388 45 .980087
таймер_ 2 16024388 50 .9806316
таймер_ 1 16024388 50 .9808934
таймер_ 1 16024388 55 .9863033
таймер_ 2 16024388 60 .9868324
таймер_ 1 16024388 60 .9876585
источник