# -*- coding: utf-8 -*-
#_____________________________________________________________________________
#
# Copyright (c) 2012-2013, Berlin Institute of Technology
# All rights reserved.
#
# Developed by: Philipp Meier <pmeier82@gmail.com>
#
# Neural Information Processing Group (NI)
# School for Electrical Engineering and Computer Science
# Berlin Institute of Technology
# MAR 5-6, Marchstr. 23, 10587 Berlin, Germany
# http://www.ni.tu-berlin.de/
#
# Repository: https://github.com/pmeier82/BOTMpy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal with the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimers.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimers in the documentation
# and/or other materials provided with the distribution.
# * Neither the names of Neural Information Processing Group (NI), Berlin
# Institute of Technology, nor the names of its contributors may be used to
# endorse or promote products derived from this Software without specific
# prior written permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# WITH THE SOFTWARE.
#_____________________________________________________________________________
#
# Acknowledgements:
# Philipp Meier <pmeier82@gmail.com>
#_____________________________________________________________________________
#
# Changelog:
# * <iso-date> <identity> :: <description>
#_____________________________________________________________________________
#
"""matrix based ringbuffer implementation"""
__docformat__ = 'restructuredtext'
__all__ = ['MxRingBuffer']
## IMPORTS
import scipy as sp
## CLASSES
[docs]class MxRingBuffer(object):
"""ringbuffer implementation based on pre-allocated ndarray
Ringbuffer behavior is archived by cycling though the buffer forward,
wrapping around to the start upon reaching capacity.
"""
## constructor
def __init__(self, capacity=64, dimension=1, dtype=None):
"""
:type capacity: int
:param capacity: capacity of the ringbuffer (rows)
Default=64
:type dimension: tuple or int
:param dimension: dimensionality of the items to store in the
ringbuffer. If int, this will be converted internally to (int,)
Default=1
:type dtype: dtype resolvable
:param dtype: dtype of single entries
Default=float32
"""
# checks
if capacity < 1:
raise ValueError('capacity < 1')
if isinstance(dimension, int):
dimension = (dimension,)
elif isinstance(dimension, tuple):
pass
else:
raise ValueError('dimension has to be tuple or int')
# members
self._capacity = int(capacity)
self._dimension = dimension
self._dtype = sp.dtype(dtype or sp.float32)
self._data = sp.empty((self._capacity,) + self._dimension,
dtype=self._dtype)
self._next = 0
self._full = False
# mapping prototypes
self._idx_belowcap_proto = lambda:range(self._next)
self._idx_fullcap_proto =\
lambda:range(self._next, self._capacity) + range(self._next)
# mappings
self._idx_append = self._idx_fullcap_proto
self._idx_retrieve = self._idx_belowcap_proto
## properties
[docs] def get_dimension(self):
return self._dimension
dimension = property(get_dimension)
[docs] def get_is_full(self):
return self._full
is_full = property(get_is_full)
[docs] def get_capacity(self):
return self._capacity
[docs] def set_capacity(self, value):
if not isinstance(value, int):
raise ValueError('takes integer as argument')
if value < 1:
raise ValueError('capacity < 1')
hist = min(len(self), value)
old_data = self[-hist:].copy()
self._capacity = value
self._data = sp.zeros((self._capacity,) + self._dimension,
dtype=self._dtype)
self.clear()
self.extend(old_data[:hist])
capacity = property(get_capacity, set_capacity)
## methods interface
[docs] def append(self, datum):
"""append one datum at the end of the buffer, overwriting the oldest
datum in the buffer if the capacity has been reached.
:type datum: ndarray
:param datum: ndarray of shape :self.dimension:
"""
# checks
datum = sp.asarray(datum)
if datum.shape != self._dimension:
raise ValueError('datum has wrong dimension! expected %s was %s' %
(self._dimension, datum.shape))
# append
self._data[self._idx_append()[0], :] = datum
# index and capacity status bookkeeping
self._next += 1
if self._next == self._capacity:
self._next = 0
if self._full is False:
self._idx_retrieve = self._idx_fullcap_proto
self._full = True
[docs] def extend(self, iterable):
"""append iterable at the end of the buffer using multiple append's
:type iterable: iterable
:param iterable: iterable of objects to be stored in the ringbuffer
"""
for item in iterable:
self.append(item)
[docs] def tolist(self):
"""return the buffer as a list
:returns: list- the buffer as a python list
"""
return self._data.tolist()
[docs] def clear(self):
"""clears the data and resets internals"""
self._next = 0
self._full = False
self._idx_retrieve = self._idx_belowcap_proto
self._data[:] = 0.0
[docs] def flush(self):
"""return the buffer as a list and clear the RingBuffer
Convenience method. This returns self.tolist() and calls self.clear()
afterwards.
:returns: list - the buffer as a python list of the objects stored
"""
try:
return self[:].tolist()
finally:
self.clear()
[docs] def mean(self, last=None):
"""yields the mean over the :last: entries
:type last: int
:param last: number entries from the back of the ringbuffer to
include for mean calculation. If None, use all contents
Default=None
:returns: ndarray(self.dimension) - mean over the last entries,
or the appropriate zero element if the ringbuffer is empty.
"""
# checks
if len(self) == 0:
# XXX: changed to just zeros(dim, dtype)
# return sp.mean(sp.zeros(self._dimension, dtype=self._dtype),
# axis=0)
return sp.zeros(self._dimension, dtype=self._dtype)
if last is None or last > len(self):
last = len(self)
# return
return sp.mean(self._data[self._idx_retrieve()[-last:], :], axis=0)
[docs] def fill(self, datum):
"""fill all slots of the ringbuffer with the same datum.
:type datum: ndarray
:param daaum: ndarray of shape :self.dimension:
"""
# checks
datum = sp.asarray(datum)
if datum.shape != self._dimension:
raise ValueError('datum has wrong dimension! expected %s was' %
(self._dimension, datum.shape))
# append
self._data[:] = 1.0
self._data *= datum
# index and capacity status bookkeeping
self._next = 0
if self._full is False:
self._idx_retrieve = self._idx_fullcap_proto
self._full = True
## special methods
def __str__(self):
nitems = self._capacity
if self._full is False:
nitems = self._next
return 'MxRingbuffer{items:%s - cap:%s@%s}' % (nitems,
self._capacity,
str(self._dimension))
def __len__(self):
return len(self._idx_retrieve())
def __getitem__(self, k):
try:
idx = self._idx_retrieve()[k]
return self._data[idx, ...]
except IndexError:
raise IndexError('ringbuffer index out of range')
def __iter__(self):
return self._data[self._idx_retrieve(), ...].__iter__()
## MAIN
if __name__ == '__main__':
pass