Каков наилучший способ повторного выполнения функции каждые х секунд?

285

Я хочу постоянно выполнять функцию в Python каждые 60 секунд навсегда (точно так же, как NSTimer в Objective C). Этот код будет работать как демон и по сути похож на вызов сценария python каждую минуту с использованием cron, но без необходимости его установки пользователем.

В этом вопросе о cron, реализованном в Python , решение, по-видимому, эффективно просто sleep () на x секунд. Мне не нужны такие расширенные функциональные возможности, так что, возможно, что-то подобное будет работать

while True:
    # Code executed here
    time.sleep(60)

Есть ли предсказуемые проблемы с этим кодом?

davidmytton
источник
84
Педантичный момент, но может быть критическим, ваш код выше кода не выполняется каждые 60 секунд, это ставит 60-секундный промежуток между выполнениями. Это происходит каждые 60 секунд, если ваш исполняемый код совсем не требует времени.
Симон
4
также time.sleep(60)может вернуться как раньше, так и позже
JFS
5
Я все еще задаюсь вопросом: есть ли предсказуемые проблемы с этим кодом?
банан
1
«Предвидимая проблема» заключается в том, что вы не можете ожидать 60 итераций в час, просто используя time.sleep (60). Поэтому, если вы добавляете один элемент за итерацию и сохраняете список заданной длины ... среднее значение этого списка не будет представлять согласованный «период» времени; поэтому такие функции, как «скользящее среднее», могут ссылаться на слишком старые точки данных, что искажает ваши показания.
litepresence
2
@Banana Да, вы можете ожидать любые проблемы, вызванные тем, что ваш скрипт выполняется ТОЛЬКО каждые 60 секунд. Например. Я начал делать что-то вроде этого, чтобы разделить видеопотоки и загрузить их, и в итоге я получил strems на 5-10 ~ дольше, потому что медиа-очередь буферизировалась, пока я обрабатывал данные внутри цикла. Это зависит от ваших данных. Если функция представляет собой некий простой сторожевой таймер, который предупреждает вас, например, когда ваш диск заполнен, у вас не должно быть никаких проблем с этим. Если вы проверяете предупреждения об атомной электростанции, вы можете получить город полностью взорван х
DGoiko

Ответы:

230

Если ваша программа еще не имеет цикла обработки событий, используйте модуль sched , который реализует планировщик событий общего назначения.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

Если вы уже используете библиотеку цикла события , как asyncio, trio, tkinter, PyQt5, gobject, kivy, и многие другие - просто запланировать задачу с помощью методов существующей библиотеки цикла обработки событий, вместо.

nosklo
источник
16
Модуль sched предназначен для планирования функций, запускаемых через некоторое время. Как использовать его для повторения вызова функции каждые x секунд без использования time.sleep ()?
Baishampayan Ghose
2
@Baishampayan: просто запланируйте новый пробег.
nosklo
3
Затем apscheduler по адресу packages.python.org/APScheduler также должен получить упоминание об этом.
Даниэль Ф
6
примечание: эта версия может дрейфовать. Вы можете использовать, enterabs()чтобы избежать этого. Вот не дрейфующая версия для сравнения .
Jfs
8
@JavaSa: потому что «делай свое дело» не мгновенно, и time.sleepздесь могут накапливаться ошибки . «Выполнять каждые X секунд» и «Выполнять с задержкой ~ X секунд повторно» - это не одно и то же. Смотрите также этот комментарий
JFS
181

Заблокируйте ваш цикл времени на системные часы, как это:

import time
starttime = time.time()
while True:
    print "tick"
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))
Дейв Роув
источник
23
+1. ваш и twistedответ - единственные ответы, которые запускают функцию каждую xсекунду. Остальные выполняют функцию с задержкой в xсекунды после каждого звонка.
JFS
13
Если вы добавите к этому некоторый код, который занял бы больше одной секунды ... Это выкинуло бы время ожидания и начало отставать. Принятый ответ в этом случае правильный ... Любой может зациклить простую команду печати и он запускается каждую секунду без промедления ...
Злой 84
5
Я предпочитаю from time import time, sleepиз-за экзистенциальных последствий;)
Уилл
14
Работает фантастически. Нет нужды вычитать ваш, starttimeесли вы начнете синхронизировать его с определенным временем: time.sleep(60 - time.time() % 60)у меня все работает нормально. Я использовал его как time.sleep(1200 - time.time() % 1200)и он дает мне логи :00 :20 :40, так, как я хотел.
TemporalWolf
2
@AntonSchigur, чтобы избежать дрейфа после нескольких итераций. Индивидуум итерация может начаться немного раньше или позже в зависимости от того sleep(), timer()точности и сколько времени требуется , чтобы выполнить тело цикла , но в среднем итераций всегда происходят на границах интервалов (даже если некоторые из них пропущены): while keep_doing_it(): sleep(interval - timer() % interval). Сравните это с тем, while keep_doing_it(): sleep(interval)где ошибки могут накапливаться после нескольких итераций.
JFS
72

Возможно, вы захотите рассмотреть Twisted - сетевую библиотеку Python, которая реализует шаблон Reactor .

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Хотя «while True: sleep (60)», вероятно, будет работать, Twisted, вероятно, уже реализует многие функции, которые вам в конечном итоге понадобятся (демонизация, ведение журнала или обработка исключений, как указано в bobince), и, вероятно, будет более надежным решением.

Аарон Маенпаа
источник
Отличный ответ, очень точный без дрейфа. Интересно, переводит ли это процессор в спящий режим во время ожидания выполнения задачи (иначе не занят-ожидание)?
smoothware
1
это дрейфует на миллисекундном уровне
Дерек Иден
1
Что означает "дрейф на уровне миллисекунд"?
Жан-Поль Кальдероне
Есть ли способ разорвать петлю, скажем, через 10 минут? @ Аарон Маенпаа
Альпер
67

Если вы хотите, чтобы неблокирующий способ выполнял вашу функцию периодически, вместо блокирующего бесконечного цикла, я бы использовал потоковый таймер. Таким образом, ваш код может продолжать работать и выполнять другие задачи, при этом ваша функция вызывается каждые n секунд. Я часто использую эту технику для распечатки информации о проделанной работе в длинных, ресурсоемких / сетевых задачах.

Вот код, который я разместил в похожем вопросе с элементами управления start () и stop ():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Использование:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Особенности:

  • Только стандартная библиотека, без внешних зависимостей
  • start()и stop()безопасно звонить несколько раз, даже если таймер уже запущен / остановлен
  • вызываемая функция может иметь позиционные и именованные аргументы
  • Вы можете изменить в intervalлюбое время, это будет эффективно после следующего запуска. То же самое args, kwargsи даже function!
MestreLion
источник
Это решение кажется дрейфующим со временем; Мне нужна версия, которая призвана вызывать функцию каждые n секунд без дрейфа. Я выложу обновление в отдельном вопросе.
эраул
В def _run(self)Я пытаюсь обернуть вокруг моей головы , почему вы называете self.start()до того self.function(). Можете ли вы уточнить? Я бы подумал, что звонить start()первым self.is_runningвсегда будет Falseтак, тогда мы всегда будем раскручивать новый поток.
Богатое Епископо
1
Я думаю, что я дошел до сути. Решение @ MestreLion запускает функцию каждую xсекунду (т. Е. T = 0, t = 1x, t = 2x, t = 3x, ...), где в исходном коде образца постеров выполняется функция с интервалом x секунд между ними. Кроме того, это решение, на мой взгляд, содержит ошибку, если intervalоно короче времени, которое требуется functionдля выполнения. В этом случае self._timerбудет перезаписано в startфункции.
Богатый епископ
Да, @RichieEpiscopo, вызов .function()after .start()должен запускать функцию при t = 0. И я не думаю, что это будет проблемой, если functionзаймет больше времени interval, но да, в коде могут быть некоторые гоночные условия.
MestreLion
Это единственный неблокирующий способ, который я мог получить. Спасибо.
обратная косая
35

Более простой способ, которым я верю:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

Таким образом, ваш код выполняется, затем он ждет 60 секунд, затем он выполняется снова, ждет, выполняет и т. Д ... Не нужно усложнять вещи: D

Itxaka
источник
Ключевое слово True должно быть в верхнем регистре
Шон Каин
39
На самом деле это не ответ: time sleep () может использоваться только для ожидания X секунд после каждого выполнения. Например, если выполнение вашей функции занимает 0,5 секунды, и вы используете time.sleep (1), это означает, что ваша функция выполняется каждые 1,5 секунды, а не 1. Вы должны использовать другие модули и / или потоки, чтобы убедиться, что что-то работает в течение Y раз в каждой секунде X
kommradHomer
1
@kommradHomer: ответ Дейва Роува показывает, что вы можете использовать time.sleep()запуск чего-либо каждые X секунд
jfs
2
На мой взгляд, код должен вызывать time.sleep()в while Trueцикле, как:def executeSomething(): print('10 sec left') ; while True: executeSomething(); time.sleep(10)
Леонард Лепадату
22
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

Если вы хотите сделать это без блокировки оставшегося кода, вы можете использовать его, чтобы запустить в своем собственном потоке:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

Это решение сочетает в себе несколько функций, редко встречающихся в других решениях:

  • Обработка исключений: насколько это возможно на этом уровне, исключения обрабатываются должным образом, т.е. регистрируются для целей отладки без прерывания нашей программы.
  • Отсутствие цепочки: обычная цепочечная реализация (для планирования следующего события), которую вы найдете во многих ответах, является хрупкой в ​​том аспекте, что если что-то пойдет не так в механизме планирования ( threading.Timerили чем-то еще), это приведет к прекращению цепочки. Тогда дальнейших казней не будет, даже если причина проблемы уже устранена. Простой цикл и ожидание с простым sleep()гораздо более надежным по сравнению.
  • Нет дрейфа: мое решение точно отслеживает время, в которое оно должно работать. Нет смещения в зависимости от времени выполнения (как и во многих других решениях).
  • Пропуск: Мое решение будет пропускать задачи, если одно выполнение занимало слишком много времени (например, делать X каждые пять секунд, но X занимало 6 секунд). Это стандартное поведение cron (и не без причины). Многие другие решения затем просто выполняют задачу несколько раз подряд без какой-либо задержки. В большинстве случаев (например, задачи очистки) это нежелательно. Если это будет угодно, просто использовать next_time += delayвместо этого.
Альфей
источник
2
лучший ответ не дрейфовать.
Себастьян Старк
1
@PirateApp Я бы сделал это в другой теме. Вы можете сделать это в том же потоке, но затем вы запрограммируете свою собственную систему планирования, которая слишком сложна для комментариев.
Alfe
1
В Python, благодаря GIL, доступ к переменным в двух потоках совершенно безопасен. И простое чтение в двух потоках никогда не должно быть проблемой (также не в других многопоточных средах). Только запись из двух разных потоков в системе без GIL (например, в Java, C ++ и т. Д.) Требует определенной явной синхронизации.
Alfe
1
@ user50473 Без дополнительной информации я бы сначала подошел к задаче с резьбовой стороны. Один поток читает данные время от времени и затем спит, пока не пришло время снова сделать это. Решение выше может быть использовано для этого, конечно. Но я мог представить множество причин пойти другим путем. Так что удачи :)
Alfe
1
Сон можно заменить на многопоточность. Ожидание события с таймаутом будет более отзывчивым при выходе из приложения. stackoverflow.com/questions/29082268/…
themadmax
20

Вот обновление кода от MestreLion, которое позволяет избежать дрейфа с течением времени.

Класс RepeatedTimer здесь вызывает данную функцию каждые «интервальные» секунды в соответствии с запросом OP; расписание не зависит от того, сколько времени займет выполнение функции. Мне нравится это решение, так как оно не имеет внешних библиотечных зависимостей; это просто чистый питон.

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

Пример использования (скопировано из ответа MestreLion):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
eraoul
источник
5

Я столкнулся с подобной проблемой некоторое время назад. Может быть http://cronus.readthedocs.org может помочь?

Для v0.2 работает следующий фрагмент

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec
Анай
источник
4

Основное различие между этим и cron заключается в том, что исключение убьет демона навсегда. Возможно, вы захотите заключить в ловушку исключений и регистратор.

bobince
источник
4

Один из возможных ответов:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()
SKS
источник
1
Это занято ожиданием, поэтому очень плохо.
Альфе
Хорошее решение для тех, кто ищет неблокирующий таймер.
Ноэль
3

Я закончил тем, что использовал модуль расписания . API это хорошо.

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
Изучение статистики на примере
источник
Мне трудно использовать этот модуль, в частности, мне нужно разблокировать основной поток, я проверил FAQ на веб-сайте документации расписания, но я не совсем понял, как это обойти. Кто-нибудь знает, где я могу найти рабочий пример, который не блокирует основной поток?
5Daydreams
1

Я использую метод Tkinter after (), который не «крадет игру» (как модуль sched, который был представлен ранее), т.е. он позволяет другим вещам работать параллельно:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()и do_something2()может работать параллельно и с любым интервалом скорости. Здесь второй будет выполнен вдвое быстрее. Обратите внимание, что я использовал простой счетчик в качестве условия для завершения любой функции. Вы можете использовать любое другое условие, которое вам нравится, или нет, если вы хотите выполнить функцию до завершения программы (например, часы).

Апостолоса
источник
Будьте осторожны с вашей формулировкой: afterне позволяет вещам работать параллельно. Tkinter является однопоточным и может делать только одно за раз. Если что-то запланированное afterвыполняется, оно не работает параллельно с остальным кодом. Если оба do_something1и do_something2запланированы для запуска в то же время, они будут работать последовательно, а не параллельно.
Брайан Оукли,
@Apostolos все ваше решение делает использовать Tkinter MainLoop вместо SCHED основного цикла, поэтому он работает точно таким же образом , но позволяет TkInter интерфейсы продолжать отвечать на запросы. Если вы не используете tkinter для других целей, это ничего не меняет в отношении решения sched. Вы можете использовать две или более запланированные функции с разными интервалами в schedрешении, и оно будет работать точно так же, как у вас.
nosklo
Нет, это не работает так же. Я объяснил это. Один «блокирует» программу (т.е. останавливает поток, вы больше ничего не можете сделать - даже не запускаете другую запланированную работу, как вы предлагаете), пока она не закончится, а другая освобождает ваши руки / освобождает (то есть вы можете сделать другие вещи после того, как он начался. Вам не нужно ждать, пока он не закончится. Это огромная разница. Если бы вы попробовали метод, который я представил, вы бы увидели сами. Я попробовал ваш. Почему бы вам не попробуй мой тоже?
Апостолос
1

Вот адаптированная версия к коду от MestreLion. В дополнение к исходной функции этот код:

1) добавить first_interval, используемый для запуска таймера в определенное время (вызывающему необходимо вычислить first_interval и передать)

2) решить условие гонки в оригинальном коде. В исходном коде, если потоку управления не удалось отменить запущенный таймер («Остановите таймер и отмените выполнение действия таймера. Это будет работать только в том случае, если таймер все еще находится в стадии ожидания». Цитируется из https: // docs.python.org/2/library/threading.html ), таймер будет работать бесконечно.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False
dproc
источник
1

Это кажется намного проще, чем принятое решение - есть ли у него недостатки, которые я не рассматриваю? Пришел сюда в поисках какой-то простой пасты и был разочарован.

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)

thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

Который асинхронно выводит.

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

Это имеет дрейф в том смысле, что если выполняемая задача занимает заметное количество времени, то интервал становится равным 2 секундам + время задачи, поэтому, если вам нужно точное планирование, то это не для вас.

Обратите внимание на daemon=Trueфлаг означает, что этот поток не будет блокировать закрытие приложения. Например, имелась проблема с pytestзависанием на неопределенный срок после запуска тестов в ожидании прекращения этой игры.

Адам Хьюз
источник
Нет, он печатает только первое время и затем останавливается ...
Алекс Пока
Вы уверены - я просто скопировал и вставил в терминал. Он сразу возвращается, но распечатка продолжается в фоновом режиме для меня.
Адам Хьюз
Похоже, я что-то здесь упускаю. Я копировать / вставить код в test.py , и работать с питона test.py . С Python2.7 мне нужно удалить daemon = True, который не распознается, и я читаю несколько отпечатков. В Python3.8 он останавливается после первой печати, и после его завершения ни один процесс не активен. Удаление демона = True Я читаю несколько отпечатков ...
Алекс Пока
хм странно - я на питоне 3.6.10, но не знаю, почему это важно
Адам Хьюз
Снова: Python3.4.2 (Debian GNU / Linux 8 (jessie)), пришлось удалить daemon = True, чтобы можно было печатать несколько раз. С демоном я получаю синтаксическую ошибку. Предыдущие тесты с Python2.7 и 3.8 были на Ubuntu 19.10. Может ли быть, что демон по-разному обрабатывается в зависимости от ОС?
Алекс Пока
0

Я использую это, чтобы вызвать 60 событий в час с большинством событий, происходящих в то же самое количество секунд после целой минуты:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

В зависимости от реальных условий вы можете получить отметки длины:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

но через 60 минут у вас будет 60 тиков; и большинство из них будет происходить с правильным смещением до минуты, которую вы предпочитаете.

В моей системе я получаю типичный дрейф <1/20 секунды, пока не возникнет необходимость в коррекции.

Преимущество этого метода - разрешение смещения часов; что может вызвать проблемы, если вы делаете такие вещи, как добавление одного элемента за тик и ожидаете 60 добавленных элементов в час. Невозможность учесть дрейф может привести к тому, что вторичные признаки, такие как скользящие средние, будут учитывать данные слишком глубоко в прошлом, что приведет к ошибочному выводу.

litepresence
источник
0

например, Показать текущее местное время

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)
rise.riyo
источник
0
    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
raviGupta
источник
На основе пользовательского ввода он будет повторять этот метод на каждом интервале времени.
raviGupta
0

Вот еще одно решение без использования дополнительных библиотек.

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()
Дат Нгуен
источник