Как запускать асинхронные задачи в приложениях Python GObject Introspection

16

Я пишу приложение Python + GObject, которое должно при запуске читать нетривиальный объем данных с диска. Данные считываются синхронно, и для завершения операции чтения требуется около 10 секунд, в течение которых загрузка пользовательского интерфейса задерживается.

Я хотел бы выполнить задачу асинхронно и получить уведомление, когда она будет готова, без блокировки пользовательского интерфейса, более или менее, например:

def take_ages():
    read_a_huge_file_from_disk()

def on_finished_long_task():
    print "Finished!"

run_long_task(task=take_ages, callback=on_finished_long_task)
load_the_UI_without_blocking_on_long_task()

В прошлом я использовал GTask для такого рода вещей, но я обеспокоен тем, что его код не затрагивался в течение 3 лет, не говоря уже о переносе в GObject Introspection. Самое главное, он больше не доступен в Ubuntu 12.04. Поэтому я ищу простой способ выполнения задач асинхронно, либо стандартным способом Python, либо стандартным способом GObject / GTK +.

Редактировать: вот код с примером того, что я пытаюсь сделать. Я попытался, python-deferкак предложено в комментариях, но мне не удалось запустить длинную задачу асинхронно и позволить загрузке пользовательского интерфейса, не дожидаясь ее завершения. Просмотрите код теста .

Существует ли простой и широко используемый способ запуска асинхронных задач и получения уведомлений по их окончании?

Дэвид Планелла
источник
Это не красивый пример, но я уверен, что это то, что вы ищете: raw.github.com/gist/1132418/…
RobotHumans
Круто, я думаю, что ваша async_callфункция может быть то, что мне нужно Не могли бы вы немного рассказать об этом и добавить ответ, чтобы я мог принять его и поставить вам кредит после проверки? Благодарность!
Дэвид Планелла
1
Отличный вопрос, очень полезный! ;-)
Rafał Cieślak

Ответы:

15

Ваша проблема является очень распространенной, поэтому существует множество решений (сараи, очереди с многопроцессорной обработкой или многопоточностью, рабочие пулы, ...)

Поскольку это так часто встречается, существует также встроенное решение для Python (в 3.2, но здесь есть обратная связь: http://pypi.python.org/pypi/futures ), называемое concurrent.futures. «Фьючерсы» доступны на многих языках, поэтому python называет их одинаковыми. Вот типичные вызовы (и вот ваш полный пример , однако, часть db заменена на sleep, см. Ниже, почему).

from concurrent import futures
executor = futures.ProcessPoolExecutor(max_workers=1)
#executor = futures.ThreadPoolExecutor(max_workers=1)
future = executor.submit(slow_load)
future.add_done_callback(self.on_complete)

Теперь к вашей проблеме, которая намного сложнее, чем предлагает ваш простой пример. В общем, у вас есть потоки или процессы, чтобы решить эту проблему, но вот почему ваш пример такой сложный:

  1. Большинство реализаций Python имеют GIL, что делает потоки не полностью использующими многоядерные. Итак: не используйте темы с питоном!
  2. Объекты, которые вы хотите вернуть slow_loadиз БД, нельзя перехватывать, что означает, что их нельзя просто передавать между процессами. Итак: нет многопроцессорной обработки с результатами программного центра!
  3. Библиотека, которую вы вызываете (softwarecenter.db), не является потокобезопасной (кажется, включает gtk или аналогичную), поэтому вызов этих методов в потоке приводит к странному поведению (в моем тесте все, от «все работает», до «дампа ядра», до простого выход без результатов). Итак: нет темы с софтцентром.
  4. Каждый асинхронный обратный вызов в gtk не должен делать ничего, кроме планирования обратного вызова, который будет вызываться в главном цикле glib. Итак: нет print, никаких изменений состояния GTK, кроме добавления обратного вызова!
  5. Gtk и другие не работают с потоками из коробки. Это необходимо сделать threads_init, и если вы вызываете метод gtk или аналогичный, вы должны защитить этот метод (в более ранних версиях это было gtk.gdk.threads_enter(), gtk.gdk.threads_leave()см., Например, gstreamer: http://pygstdocs.berlios.de/pygst-tutorial/playbin. HTML ).

Я могу дать вам следующее предложение:

  1. Перепишите ваш результат, slow_loadчтобы получить результаты, которые можно использовать для поиска, и использовать фьючерсы с процессами.
  2. Переключитесь с softwarecenter на python-apt или аналогичный (вам, вероятно, это не нравится). Но поскольку вы работаете в Canonical, вы можете напрямую попросить разработчиков программного центра добавить документацию к своему программному обеспечению (например, заявив, что она не является поточно-ориентированной) и, что еще лучше, сделать программный центр безопасным для потоков.

Как примечание: решения, предоставленные другими ( Gio.io_scheduler_push_job, async_call) , работают с, time.sleepно не с softwarecenter.db. Это потому, что все сводится к потокам или процессам и потокам, которые не работают с gtk и softwarecenter.

xubuntix
источник
Благодарность! Я собираюсь принять ваш ответ, так как он очень подробно указывает мне, почему это невозможно. К сожалению, я не могу использовать программное обеспечение, которое не упаковано для Ubuntu 12.04, в моем приложении (оно для Quantal, хотя launchpad.net/ubuntu/+source/python-concurrent.futures ), поэтому я думаю, что застрял в неспособности выполнить мою задачу асинхронно. Что касается записки для общения с разработчиками Центра программного обеспечения, я нахожусь в том же положении, что и любой волонтер, чтобы вносить изменения в код и документацию или разговаривать с ними :-)
Дэвид Планелла
GIL высвобождается во время ввода-вывода, поэтому прекрасно использовать потоки. Хотя это и не обязательно, если используется асинхронный ввод-вывод.
JFS
10

Вот еще один вариант использования планировщика ввода-вывода GIO (я никогда раньше не использовал его в Python, но приведенный ниже пример работает нормально).

from gi.repository import GLib, Gio, GObject
import time

def slow_stuff(job, cancellable, user_data):
    print "Slow!"
    for i in xrange(5):
        print "doing slow stuff..."
        time.sleep(0.5)
    print "finished doing slow stuff!"
    return False # job completed

def main():
    GObject.threads_init()
    print "Starting..."
    Gio.io_scheduler_push_job(slow_stuff, None, GLib.PRIORITY_DEFAULT, None)
    print "It's running async..."
    GLib.idle_add(ui_stuff)
    GLib.MainLoop().run()

def ui_stuff():
    print "This is the UI doing stuff..."
    time.sleep(1)
    return True

if __name__ == '__main__':
    main()
Зигфрид Геваттер
источник
Смотрите также GIO.io_scheduler_job_send_to_mainloop (), если вы хотите запустить что-то в главном потоке после завершения slow_stuff.
Зигфрид Геваттер
Спасибо Зигфриду за ответ и пример. К сожалению, похоже, что с моей текущей задачей у меня нет шансов использовать API Gio, чтобы он работал асинхронно.
Дэвид Планелла
Это было действительно полезно, но, насколько я могу судить, Gio.io_scheduler_job_send_to_mainloop не существует в Python :(
sil
2

Вы также можете использовать GLib.idle_add (callback), чтобы вызвать долгосрочную задачу, когда GLib Mainloop завершит все события с более высоким приоритетом (что, я считаю, включает в себя создание пользовательского интерфейса).

mhall119
источник
Спасибо майк Да, это определенно поможет запустить задачу, когда пользовательский интерфейс будет готов. Но, с другой стороны, я понимаю, что при callbackвызове это делается синхронно, блокируя пользовательский интерфейс, верно?
Дэвид Планелла
Idle_add не совсем так работает. Создание блокирующих вызовов в idle_add все еще является плохой вещью, и это предотвратит обновления пользовательского интерфейса. И даже асинхронный API все еще может блокировать, где единственный способ избежать блокировки пользовательского интерфейса и других задач - это делать это в фоновом потоке.
Добей
В идеале вы бы разбили свою медленную задачу на куски, так что вы можете выполнить ее немного в режиме обратного вызова в режиме ожидания, вернуть (и позволить другим вещам, таким как обратные вызовы пользовательского интерфейса), продолжить выполнять дополнительную работу после повторного вызова вашего обратного вызова и т. Д. на.
Зигфрид Геваттер
Суть idle_addв том, что возвращаемое значение обратного вызова имеет значение. Если это правда, он будет вызван снова.
Flimm
2

Используйте интроспектированный GioAPI для чтения файла с его асинхронными методами, и при выполнении первоначального вызова, сделать это как тайм - аут с , GLib.timeout_add_seconds(3, call_the_gio_stuff)где call_the_gio_stuffесть функция , которая возвращает False.

Здесь необходимо добавить время ожидания (хотя может потребоваться иное количество секунд), поскольку асинхронные вызовы Gio асинхронны, но не неблокированы, а это означает, что при чтении большого файла или большого объема данных на жестком диске Количество файлов, может привести к блокировке пользовательского интерфейса, так как пользовательский интерфейс и ввод-вывод все еще находятся в одном (основном) потоке.

Если вы хотите написать свои собственные функции, которые должны быть асинхронными, и интегрироваться с основным циклом, используя API-интерфейсы файлового ввода / вывода Python, вам придется написать код как GObject или передать обратные вызовы или использовать их, python-deferчтобы помочь вам. сделай это. Но лучше использовать Gio здесь, так как он может принести вам много полезных функций, особенно если вы делаете открытия / сохранения файлов в UX.

Добей
источник
Спасибо @dobey. На самом деле я не читаю файл с диска напрямую, наверное, я должен был сделать это более понятным в оригинальном посте. Долгосрочная задача, которую я выполняю, - это чтение базы данных Центра программного обеспечения в соответствии с ответом на askubuntu.com/questions/139032/… , поэтому я не уверен, что смогу использовать GioAPI. Что мне было интересно, так это то, существует ли способ асинхронно запускать какую-либо общую долгосрочную задачу точно так же, как это делал GTask.
Дэвид Планелла
Я точно не знаю, что такое GTask, но если вы имеете в виду gtask.sourceforge.net, то я не думаю, что вам следует это использовать. Если это что-то еще, то я не знаю, что это. Но, похоже, вам придется пойти по второму пути, который я упомянул, и реализовать некоторый асинхронный API для обертывания этого кода или просто сделать все это в потоке.
Добей
В этом вопросе есть ссылка. GTask является (был): chergert.github.com/gtask
Дэвид Планелла
1
Ах, это выглядит очень похоже на API, предоставляемый python-defer (и twisted deferred API). Возможно, вам стоит взглянуть на использование python-defer?
Добей
1
Вам все еще нужно отложить вызов до тех пор, пока не произойдут события основного приоритета, например, с помощью GLib.idle_add (). Как это: pastebin.ubuntu.com/1011660
Добей
1

Я думаю, стоит отметить, что это запутанный способ сделать то, что предложил @mhall.

По сути, вы должны запустить это, а затем запустить эту функцию async_call.

Если вы хотите увидеть, как это работает, вы можете поиграть с таймером сна и продолжать нажимать кнопку. По сути, это то же самое, что и ответ @ mhall, за исключением того, что есть пример кода.

Исходя из этого, это не моя работа.

import threading
import time
from gi.repository import Gtk, GObject



# calls f on another thread
def async_call(f, on_done):
    if not on_done:
        on_done = lambda r, e: None

    def do_call():
        result = None
        error = None

        try:
            result = f()
        except Exception, err:
            error = err

        GObject.idle_add(lambda: on_done(result, error))
    thread = threading.Thread(target = do_call)
    thread.start()

class SlowLoad(Gtk.Window):

    def __init__(self):
        Gtk.Window.__init__(self, title="Hello World")
        GObject.threads_init()        

        self.connect("delete-event", Gtk.main_quit)

        self.button = Gtk.Button(label="Click Here")
        self.button.connect("clicked", self.on_button_clicked)
        self.add(self.button)

        self.file_contents = 'Slow load pending'

        async_call(self.slow_load, self.slow_complete)

    def on_button_clicked(self, widget):
        print self.file_contents

    def slow_complete(self, results, errors):
        '''
        '''
        self.file_contents = results
        self.button.set_label(self.file_contents)
        self.button.show_all()

    def slow_load(self):
        '''
        '''
        time.sleep(5)
        self.file_contents = "Slow load in progress..."
        time.sleep(5)
        return 'Slow load complete'



if __name__ == '__main__':
    win = SlowLoad()
    win.show_all()
    #time.sleep(10)
    Gtk.main()

Дополнительное примечание: вы должны позволить другому потоку завершить работу, прежде чем он завершится должным образом, или проверить файл.lock в вашем дочернем потоке.

Изменить по адресу комментарий:
Изначально я забыл GObject.threads_init(). Очевидно, что когда кнопка сработала, она инициализировала потоки для меня. Это замаскировало ошибку для меня.

Обычно поток создает окно в памяти, немедленно запускает другой поток, когда поток завершает обновление кнопки. Я добавил дополнительный спящий режим еще до того, как позвонил в Gtk.main, чтобы убедиться, что полное обновление МОЖЕТ выполняться до того, как окно будет нарисовано. Я также прокомментировал это, чтобы убедиться, что запуск потока вообще не мешает рисованию окна.

RobotHumans
источник
1
Благодарю. Я не уверен, что смогу следовать этому. Во-первых, я ожидал, slow_loadчто он будет выполнен вскоре после запуска пользовательского интерфейса, но, кажется, он никогда не вызывается, если только кнопка не нажата, что немного смущает меня, так как я думал, что цель кнопки - просто обеспечить визуальную индикацию. состояния задачи.
Дэвид Планелла
Извините, я пропустил одну строчку. Это сделал это. Я забыл сказать GObject, чтобы подготовиться к темам.
RobotHumans
Но вы вызываете основной цикл из потока, что может вызвать проблемы, хотя они могут быть нелегко раскрыты в вашем тривиальном примере, который не выполняет никакой реальной работы.
Добей
Допустимый момент, но я не думаю, что тривиальный пример заслуживает отправки уведомления через DBus (что, я думаю, должно делать нетривиальное приложение)
RobotHumans
Хм, работа async_callв этом примере работает для меня, но это порождает хаос, когда я портирую его в свое приложение и добавляю реальную slow_loadфункцию, которая у меня есть.
Дэвид Планелла