эффективный кольцевой буфер?

109

Я хочу создать эффективный кольцевой буфер в python (с целью получения средних значений целочисленных значений в буфере).

Является ли это эффективным способом использования списка для сбора значений?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

Что было бы эффективнее (и почему)?

jedierikb
источник
Это неэффективный способ реализации кольцевого буфера, поскольку pop (0) - это операция O (n) в списке. pop (0) удаляет первый элемент в списке, и все элементы должны быть сдвинуты влево. Вместо этого используйте collections.deque с атрибутом maxlen. deque имеет операцию O (1) для добавления и извлечения.
Влад Безден

Ответы:

205

Я бы использовал collections.dequeс maxlenаргументом

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

Существует рецепт в документации для , dequeкоторый похож на то , что вы хотите. Мое утверждение, что это самый эффективный, полностью основывается на том факте, что он реализован на C невероятно опытной командой, которая имеет обыкновение создавать первоклассный код.

ааронастерлинг
источник
7
+1 Да это хорошие батарейки в комплекте. Операции для кольцевого буфера - O (1), и, как вы говорите, дополнительные накладные расходы находятся в C, поэтому все равно должны быть довольно быстрыми
Джон Ла Рой
7
Мне не нравится это решение, потому что документы не гарантируют произвольный доступ O (1), если maxlenон определен. O (n) понятно, когда dequeможет расти до бесконечности, но если maxlenзадан, индексирование элемента должно быть постоянным по времени.
lvella
1
Я предполагаю, что он реализован как связанный список, а не как массив.
e-satis
1
Кажется примерно правильным, если время в моем ответе ниже правильное.
djvg
13

выскакивание из заголовка списка вызывает копирование всего списка, поэтому неэффективно

Вместо этого вы должны использовать список / массив фиксированного размера и индекс, который перемещается через буфер при добавлении / удалении элементов.

Джон Ла Рой
источник
4
Согласен. Независимо от того, насколько элегантно или неэлегантно он может выглядеть или на каком языке используется. На самом деле, чем меньше вы беспокоитесь о сборщике мусора (или диспетчере кучи, механизмах подкачки / сопоставления или о том, что делает настоящую магию памяти), тем лучше.
@RocketSurgeon Это не волшебство, это просто массив, первый элемент которого удален. Итак, для массива размера n это означает n-1 операций копирования. Здесь не задействован сборщик мусора или подобное устройство.
Christian
3
Я согласен. Сделать это также намного проще, чем некоторые думают. Просто используйте постоянно увеличивающийся счетчик и используйте оператор по модулю (% arrayylen) при доступе к элементу.
Андре Блюм
idem, вы можете проверить мой пост выше, вот как я это сделал
MoonCactus
10

Основываясь на ответе MoonCactus , вот circularlistкласс. Разница с его версией в том, что здесь c[0]всегда будет отображаться элемент с самым старым добавлением, элемент c[-1]с последним добавлением, c[-2]предпоследний ... Это более естественно для приложений.

c = circularlist(4)
c.append(1); print c, c[0], c[-1]    #[1]              1, 1
c.append(2); print c, c[0], c[-1]    #[1, 2]           1, 2
c.append(3); print c, c[0], c[-1]    #[1, 2, 3]        1, 3
c.append(8); print c, c[0], c[-1]    #[1, 2, 3, 8]     1, 8
c.append(10); print c, c[0], c[-1]   #[10, 2, 3, 8]    2, 10
c.append(11); print c, c[0], c[-1]   #[10, 11, 3, 8]   3, 11

Класс:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

[Отредактировано]: добавлен необязательный dataпараметр, разрешающий инициализацию из существующих списков, например:

circularlist(4, [1, 2, 3, 4, 5])      #  [2, 3, 4, 5] (4 items)
circularlist(4, set([1, 2, 3, 4, 5])) #  [2, 3, 4, 5] (4 items)
circularlist(4, (1, 2, 3, 4, 5))      #  [2, 3, 4, 5] (4 items)
Basj
источник
Хорошее дополнение. Списки Python уже допускают отрицательные индексы, но (-1), например, не вернет ожидаемое значение после заполнения кольцевого буфера, так как «последнее» добавление к списку заканчивается внутри списка.
MoonCactus
1
Он действительно работает @MoonCactus, см. 6 примеров, которые я привел поверх ответа; в последних вы можете видеть, что c[-1]это всегда правильный элемент. __getitem__делает это правильно.
Basj
о да, я имею в виду мой провал, а не ваш, извините: D Я сделаю свой комментарий более понятным! - о, я не могу, комментарий слишком старый.
MoonCactus
красивое простое решение. Я добавил необязательный аргумент, чтобы разрешить инициализацию списка из существующих данных, так это более питонно.
Orwellophile
9

Deque Python работает медленно. Вместо этого вы также можете использовать numpy.roll. Как вы поворачиваете числа в массиве numpy формы (n,) или (n, 1)?

В этом тесте deque составляет 448 мс. Numpy.roll составляет 29 мс http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

Орвар Корвар
источник
1
Но numpy.rollвозвращает копию массива, верно?
djvg
3
Этот ответ вводит в заблуждение - deque Python кажется довольно быстрым, но преобразование из массивов numpy и в них значительно замедляет его в тестах, на которые вы ссылаетесь.
xitrium
7

хорошо с использованием класса deque, но для требований вопроса (в среднем) это мое решение:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
SmartElectron
источник
Я получаю TypeError: 'numpy.float64' object is not callableпри попытке вызвать averageметод
scls
Да ... на самом деле я предполагаю, что deque внутренне использует массивы numpy (после удаления @property он работает нормально)
scls
17
Я гарантирую, что deque не использует внутренние массивы numpy. collectionsявляется частью стандартной библиотеки, numpyне является. Зависимости от сторонних библиотек сделали бы стандартную библиотеку ужасной.
6

Хотя здесь уже есть множество отличных ответов, я не смог найти прямого сравнения таймингов для упомянутых вариантов. Поэтому, пожалуйста, найдите мою скромную попытку сравнения ниже.

Только для целей тестирования класс может переключаться между listбуфером на collections.dequeоснове, буфером на Numpy.rollоснове и буфером на основе.

Обратите внимание, что updateдля простоты метод добавляет за раз только одно значение.

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

В моей системе это дает:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)
djvg
источник
4

Как насчет решения из Поваренной книги Python , включая реклассификацию экземпляра кольцевого буфера, когда он становится заполненным?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

Примечательный выбор дизайна в реализации заключается в том, что, поскольку эти объекты претерпевают необратимый переход состояния в какой-то момент своего времени жизни - от неполного буфера до полного буфера (и в этот момент поведение изменяется), - я смоделировал это путем изменения self.__class__. Это работает даже в Python 2.2, если оба класса имеют одинаковые слоты (например, он отлично работает для двух классических классов, таких как RingBuffer и__Full в этом рецепте).

Изменение класса экземпляра может быть странным для многих языков, но это питоническая альтернатива другим способам представления случайных, массивных, необратимых и дискретных изменений состояния, которые сильно влияют на поведение, как в этом рецепте. Хорошо, что Python поддерживает его для всех типов классов.

Предоставлено: Себастьян Кейм.

d8aninja
источник
Я сделал несколько тестов скорости этого vs deque. Это примерно в 7 раз медленнее, чем deque.
PolyMesh 08
@PolyMesh круто, дайте знать автору!
d8aninja 08
1
какой в ​​этом смысл? Это старый опубликованный документ. Цель моего комментария - сообщить другим, что этот ответ устарел, и вместо этого использовать deque.
PolyMesh 08
@PolyMesh, когда он опубликовал, наверное, все еще медленнее; инструкции для связи с автором находятся во введении к книге. Я просто рассказываю об одной возможной альтернативе. Кроме того, «Если бы только скорость была лучшим показателем; увы, это могло бы быть только хорошим показателем».
d8aninja 08
3

Вы также можете увидеть этот довольно старый рецепт Python .

Вот моя собственная версия с массивом NumPy:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value
scls
источник
4
+1 за использование numpy, но -1 за отсутствие кольцевого буфера. Как вы это реализовали, вы переносите все данные каждый раз, когда добавляете один элемент, это требует O(n)времени. Чтобы реализовать правильный кольцевой буфер , у вас должны быть как индекс, так и переменная размера, и вам нужно правильно обрабатывать случай, когда данные «оборачиваются» вокруг конца буфера. При извлечении данных вам может потребоваться объединить две секции в начале и в конце буфера.
Бас Суинкелс
2

Это не требует какой-либо библиотеки. Он увеличивает список, а затем цикл по индексу.

Размер места очень мал (без библиотеки), и он работает по крайней мере в два раза быстрее, чем удаление из очереди. Это действительно хорошо для вычисления скользящих средних, но имейте в виду, что элементы не отсортированы по возрасту, как указано выше.

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

Чтобы получить среднее значение, например:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

Результаты в:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

Это примерно 1/3 времени эквивалента с удалением из очереди.

ЛунаКактус
источник
1
Не следует ли ваш __getitem__быть немного более мощным: self._data[(key + self._index + 1) % self._size]?
Mateen Ulhaq
Зачем вам сдвиг на +1? Теперь да, см. Вариант Басжа ниже для идеи
MoonCactus
1

У меня была эта проблема до последовательного программирования. В то время, чуть больше года назад, я тоже не мог найти никаких эффективных реализаций, поэтому в итоге я написал его как расширение C, и оно также доступно на pypi по лицензии MIT. Он супер простой, обрабатывает только буферы 8-битных подписанных символов, но имеет гибкую длину, поэтому вы можете использовать Struct или что-то поверх него, если вам нужно что-то другое, кроме символов. Теперь с помощью поиска в Google я вижу, что в наши дни есть несколько вариантов, так что вы, возможно, захотите посмотреть и на них.

сирларк
источник
1

Вы ответ неверный. Главный круговой буфер имеет два принципа ( https://en.wikipedia.org/wiki/Circular_buffer )

  1. Устанавливается длина буфера;
  2. Первым прибыл, первым обслужен;
  3. Когда вы добавляете или удаляете элемент, другие элементы не должны перемещать свое положение.

ваш код ниже:

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

Давайте рассмотрим ситуацию, когда список заполнен, используя ваш код:

self.mylist = [1, 2, 3, 4, 5]

теперь мы добавляем 6, список меняется на

self.mylist = [2, 3, 4, 5, 6]

элементы, ожидающие 1 в списке, изменили свое положение

ваш код - это очередь, а не круговой буфер.

Ответ Басжа я считаю наиболее действенным.

Кстати, круговой буфер может улучшить производительность операции добавления элемента.

Джонни Вонг
источник
1

Из Github:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https://github.com/heineman/python-data-structures/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

Иджаз Ахмад Хан
источник
0

Первоначальный вопрос был: « эффективный » кольцевой буфер. В соответствии с запрошенной эффективностью ответ от aaronasterling кажется окончательно правильным. Использование специального класса, запрограммированного на Python, и сравнение обработки времени с помощью collections.deque показывает ускорение в 5,2 раза с deque! Вот очень простой код для проверки:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

Чтобы преобразовать двухстороннюю очередь в список, просто используйте:

my_list = [v for v in my_deque]

Затем вы получите O (1) произвольный доступ к элементам двухсторонней очереди. Конечно, это полезно только в том случае, если вам нужно сделать много случайных обращений к двухсторонней очереди после ее однократной установки.

Шмук
источник
0

Это применяет тот же принцип к некоторым буферам, предназначенным для хранения самых последних текстовых сообщений.

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '\n' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])  
Дэвид Торренс
источник
0

Вы можете проверить этот круговой буфер на основе массива numpy предопределенного размера. Идея состоит в том, что вы создаете буфер (выделяете память для массива numpy), а затем добавляете к нему. Ввод данных и извлечение происходит очень быстро. Я создал этот модуль для той же цели, что и вам. В моем случае у меня есть устройство, которое генерирует целочисленные данные. Я читаю данные и помещаю их в кольцевой буфер для дальнейшего анализа и обработки.

Валентин
источник