Система событий в Python

197

Какую систему событий для Python вы используете? Я уже в курсе о пидиспетчере , но мне было интересно, что еще можно найти, или обычно используется?

Меня не интересуют менеджеры событий, которые являются частью больших фреймворков, я бы предпочел использовать небольшое решение, которое я могу легко расширить.

Иосип
источник

Ответы:

183

Пакеты PyPI

По состоянию на июнь 2020 года эти пакеты, связанные с событиями, доступны в PyPI и упорядочены по дате последнего выпуска.

Есть еще кое-что

Из множества библиотек можно выбирать, используя совершенно разную терминологию (события, сигналы, обработчики, диспетчеризация методов, перехватчики, ...).

Я пытаюсь держать обзор вышеупомянутых пакетов, плюс методы, упомянутые в ответах здесь.

Сначала немного терминологии ...

Образец наблюдателя

Самым основным стилем системы событий является «пакет методов обработки», который представляет собой простую реализацию шаблона Observer .

По сути, методы-обработчики (callables) хранятся в массиве и каждый из них вызывается, когда событие «срабатывает».

Опубликовать-Подписка

Недостаток систем событий Observer заключается в том, что вы можете зарегистрировать обработчики только для реального объекта Event (или списка обработчиков). Таким образом, на момент регистрации событие уже должно существовать.

Вот почему существует второй стиль систем событий: шаблон публикации-подписки . Здесь обработчики регистрируются не в объекте события (или списке обработчиков), а в центральном диспетчере. Также уведомители общаются только с диспетчером. Что слушать или что публиковать, определяется «сигналом», который является не чем иным, как именем (строкой).

Образец медиатора

Может также представлять интерес: модель Посредника .

Крючки

Система «крючка» обычно используется в контексте плагинов приложения. Приложение содержит фиксированные точки интеграции (хуки), и каждый плагин может подключаться к этому хуку и выполнять определенные действия.

Другие «события»

Примечание: threading.Event не является «системой событий» в вышеприведенном смысле. Это система синхронизации потоков, в которой один поток ожидает, пока другой поток не «сигнализирует» объект Event.

Библиотеки сетевых сообщений также часто используют термин «события»; иногда они похожи по концепции; иногда нет. Конечно, они могут пересекать границы потоков, процессов и компьютеров. См. Например, pyzmq , pymq , Twisted , Tornado , gevent , eventlet .

Слабые ссылки

В Python хранение ссылки на метод или объект гарантирует, что он не будет удален сборщиком мусора. Это может быть желательно, но также может привести к утечкам памяти: связанные обработчики никогда не очищаются.

Некоторые системы событий используют слабые ссылки вместо обычных для решения этой проблемы.

Несколько слов о различных библиотеках

Системы событий в стиле наблюдателя:

  • zope.event показывает, как это работает (см . ответ Леннарта ). Примечание: этот пример даже не поддерживает аргументы обработчика.
  • Реализация LongPoke 'callable list' показывает, что такая система событий может быть реализована очень минималистично с помощью подклассов list.
  • Вариант Felk EventHook также обеспечивает подписи вызывающих и вызывающих абонентов.
  • EventHook от spassig (шаблон событий Майкла Фурда ) является простой реализацией.
  • Класс Josip's Valued Lessons Event в основном тот же, но использует setвместо a listдля хранения сумки и реализует, __call__которые являются разумными дополнениями.
  • PyNotify похожа по своей концепции, а также предоставляет дополнительные понятия о переменных и условиях («событие изменения переменной»). Домашняя страница не работает.
  • Axel - это, по сути, мешок с обработчиками, с большим количеством функций, связанных с многопоточностью, обработкой ошибок, ...
  • Для python-dispatch требуются классы с четным исходным кодом pydispatch.Dispatcher.
  • buslane основан на классах, поддерживает одно- или многократные обработчики и облегчает обширные подсказки типов.
  • Pithikos ' Observer / Event - это легкий дизайн.

Библиотеки публикации-подписки:

  • Blinker имеет несколько отличных функций, таких как автоматическое отключение и фильтрация по отправителю.
  • PyPubSub является стабильным пакетом и обещает «расширенные функции, которые облегчают отладку и поддержку тем и сообщений».
  • pymitter является портом Python Node.js EventEmitter2 и предлагает пространства имен, подстановочные знаки и TTL.
  • PyDispatcher, кажется, подчеркивает гибкость в отношении публикации «многие ко многим» и т. Д. Поддерживает слабые ссылки.
  • Луи является переработанным PyDispatcher и должен работать «в самых разных контекстах».
  • pypydispatcher основан на (как вы уже догадались ...) PyDispatcher, а также работает в PyPy.
  • django.dispatch - это переписанный PyDispatcher «с более ограниченным интерфейсом, но более высокой производительностью».
  • pyeventdispatcher основан на диспетчере событий PHP-платформы Symfony.
  • диспетчер был извлечен из django.dispatch, но становится довольно старым.
  • EventManger Кристиана Гарсии - очень короткая реализация.

Другие:

  • pluggy содержит систему хуков, которая используется pytestплагинами.
  • RxPy3 реализует шаблон Observable и позволяет объединять события, повторять попытки и т. Д.
  • Сигналы и слоты Qt доступны в PyQt или PySide2 . Они работают как обратный вызов, когда используются в одном потоке, или как события (используя цикл событий) между двумя разными потоками. Сигналы и слоты имеют ограничение, что они работают только в объектах классов, которые являются производными QObject.
florisla
источник
2
Есть также Луи, который основан на PyDispatcher: pypi.python.org/pypi/Louie/1.1
the979kid
@ the979kid louie, похоже, плохо поддерживается, страница pypi ссылается на 404-е на GitHub: 11craft.github.io/louie ; github.com/gldnspud/louie . Должно быть github.com/11craft/louie .
Флорисла
2
Слабые слушатели событий - обычная необходимость. В противном случае использование в реальном мире становится трудным. Обратите внимание, какие решения поддерживают, что может быть полезно.
KXR
Pypubsub 4 имеет множество ко многим и имеет мощные средства отладки для сообщений, а также несколько способов ограничения полезных нагрузок сообщений, чтобы вы знали раньше, когда отправляете недопустимые или отсутствующие данные. PyPubSub 4 поддерживает Python 3 (а PyPubSub 3.x поддерживает Python 2).
Оливер
Недавно я опубликовал библиотеку под названием pymq github.com/thrau/pymq, которая может подойти для этого списка.
thrau
100

Я делал это так:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

Однако, как и со всем остальным, что я видел, для этого не существует автоматически сгенерированного pydoc и подписей, которые действительно отстой.

L̳o̳̳n̳̳g̳̳p̳o̳̳k̳̳e̳̳
источник
3
Я нахожу этот стиль довольно интригующим. Это сладко голые. Мне нравится тот факт, что он позволяет манипулировать событиями и их подписчиками как автономными операциями. Я посмотрю, как это будет в реальном проекте.
Руди Латте
2
Очень красивый минималистичный стиль! супер!
akaRem
2
Я не могу высказать это достаточно, это действительно просто и легко.
2
большое одолжение, может кто-нибудь объяснить это, как мне было 10 лет? Этот класс наследуется основным классом? Я не вижу инициализации, поэтому super () не будет использоваться. Это не щелкает для меня по какой-то причине.
omgimdrunk
1
@omgimdrunk Простой обработчик событий будет запускать одну или несколько вызываемых функций при каждом событии. Класс для «управления» этим для вас потребует как минимум следующие методы - add & fire. В этом классе вам нужно будет поддерживать список обработчиков, которые будут выполнены. Давайте поместим это в переменную экземпляра, _bag_of_handlersкоторая является списком. Метод добавления класса будет просто self._bag_of_handlers.append(some_callable). Метод огня класса будет проходить через _bag_of_handlers`, передавая предоставленные аргументы и kwargs обработчикам и выполняя каждый из них по очереди.
Гейб
69

Мы используем EventHook, как это было предложено Майклом Фурдом в его паттерне событий. :

Просто добавьте EventHooks в ваши классы с помощью:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

Мы добавили функциональность для удаления всех слушателей из объекта в класс Michaels и получили следующее:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler
spassig
источник
Недостатком использования этого является то, что вам необходимо сначала добавить событие, прежде чем зарегистрироваться в качестве подписчика. Если только издатели добавляют свои события (не обязательно, просто хорошая практика), то вы должны инициализировать издателей перед подписчиками, что является болью в больших проектах
Джонатан
6
последний метод содержит ошибки, потому что обработчики self .__ изменяются во время итераций. Исправлено: `self .__ handlers = [h для h в self .__ handlers if h.im_self! = Obj]`
Саймон Бергот
1
@Simon прав, но вносит ошибку, потому что у нас могут быть несвязанные функции в обработчиках self .__. Исправлено:self.__handlers = [h for h in self._handlers if getattr(h, 'im_self', False) != obj]
Эрик Маркос
20

Я использую zope.event . Это самые голые кости, которые вы можете себе представить. :-) На самом деле, вот полный исходный код:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Обратите внимание, что вы не можете отправлять сообщения между процессами, например. Это не система обмена сообщениями, просто система событий, ни больше, ни меньше.

Леннарт Регебро
источник
17
pypi.python.org/pypi/zope.event ... чтобы спасти бедного
гугла с небольшим отрывом
Я все еще хотел бы иметь возможность отправлять сообщения. Я бы использовал систему событий в приложении, построенном на Tkinter. Я не использую его систему событий, потому что она не поддерживает сообщения.
Иосип
Вы можете отправить все, что вы хотите с zope.event. Но я хочу сказать, что это неправильная система обмена сообщениями, поскольку вы не можете отправлять события / сообщения другим процессам или другим компьютерам. Вы, вероятно, должны быть более конкретными с вашими требованиями.
Леннарт Регебро
15

Я нашел этот небольшой сценарий на Ценных уроках . Кажется, у меня просто правильное соотношение простоты и мощности. Питер Тэтчер является автором следующего кода (лицензирование не упоминается).

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()
Иосип
источник
1
Использование set () вместо списка удобно, чтобы избежать двойной регистрации обработчиков. Одним из следствий этого является то, что обработчики не вызываются в том порядке, в котором они были зарегистрированы. Не обязательно плохо, хотя ...
florisla
1
@florisla может поменяться на OrderedSet, если захочешь.
Robino
9

Вот минимальный дизайн, который должен работать нормально. Что вам нужно сделать, это просто наследовать Observerв классе, а затем использовать observe(event_name, callback_fn)для прослушивания определенного события. Всякий раз, когда это конкретное событие вызывается где-либо в коде (т. Е. Event('USB connected')), Срабатывает соответствующий обратный вызов.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Пример:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')
Pithikos
источник
Мне нравится ваш дизайн, он минималистичен и прост для понимания. и это было бы легко, если бы не пришлось импортировать некоторые модули.
Атреагаурав
8

Я создал EventManagerкласс (код в конце). Синтаксис следующий:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

Вот пример:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Вывод:

Первоначальный салют
Привет Оскар
Привет Оскар

Теперь снимаю привет
привет оскар

Код EventManger:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)
Кристиан Гарсия
источник
8

Вы можете взглянуть на pymitter ( pypi ). Это небольшой однофайловый подход (~ 250 loc), "обеспечивающий пространства имен, подстановочные знаки и TTL".

Вот основной пример:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
Dalailirium
источник
6

Я сделал вариант минималистичного подхода Longpoke, который также обеспечивает подписи как для вызывающих, так и для вызывающих абонентов:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()
Фельк
источник
3

Если я делаю код в pyQt, я использую парадигму сокетов / сигналов QT, то же самое для django

Если я делаю асинхронный ввод-вывод, используйте родной модуль выбора

Если я использую синтаксический анализатор SAX Python, я использую API событий, предоставляемый SAX. Так что, похоже, я жертва базового API :-)

Может быть, вы должны спросить себя, что вы ожидаете от рамки событий / модуля. Лично я предпочитаю использовать парадигму Socket / Signal от QT. больше информации об этом можно найти здесь

SashaN
источник
2

Вот еще один модуль для рассмотрения. Это кажется приемлемым выбором для более требовательных приложений.

Py-notify - это пакет Python, предоставляющий инструменты для реализации шаблона программирования Observer. Эти инструменты включают сигналы, условия и переменные.

Сигналы - это списки обработчиков, которые вызываются при излучении сигнала. Условия в основном являются булевыми переменными в сочетании с сигналом, который излучается при изменении состояния условия. Они могут быть объединены с использованием стандартных логических операторов (не, и т. Д.) В составные условия. Переменные, в отличие от условий, могут содержать любой объект Python, не только логические, но они не могут быть объединены.

Иосип
источник
1
Домашняя страница не работает, возможно, больше не поддерживается?
Дэвид Паркс
1

Если вы хотите сделать более сложные вещи, такие как объединение событий или повторение, вы можете использовать шаблон Observable и зрелую библиотеку, которая реализует это. https://github.com/ReactiveX/RxPY . Observables очень распространены в Javascript и Java, и их очень удобно использовать для некоторых асинхронных задач.

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

ВЫХОД :

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
Дэвид Дехган
источник
1

Если вам нужна шина событий, которая работает через границы процессов или сетей, вы можете попробовать PyMQ . В настоящее время он поддерживает pub / sub, очереди сообщений и синхронный RPC. Версия по умолчанию работает поверх бэкэнда Redis, поэтому вам нужен работающий сервер Redis. Существует также внутренняя память для тестирования. Вы также можете написать свой собственный бэкэнд.

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

Для инициализации системы:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

Отказ от ответственности: я являюсь автором этой библиотеки

thrau
источник
0

Вы можете попробовать buslaneмодуль.

Эта библиотека облегчает реализацию системы, основанной на сообщениях. Он поддерживает команды (один обработчик) и события (0 или несколько обработчиков). Buslane использует аннотации типа Python для правильной регистрации обработчика.

Простой пример:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='john@lennon.com',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='john@lennon.com',
    password='secret',
))

Чтобы установить buslane, просто используйте pip:

$ pip install buslane
Конрад Холас
источник
0

Некоторое время назад я написал библиотеку, которая может быть полезна для вас. Это позволяет вам иметь локальных и глобальных слушателей, несколько разных способов их регистрации, приоритет выполнения и так далее.

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

Посмотрите pyeventdispatcher

Даниэль Анкута
источник