# -*- coding: utf-8 -*-
#_____________________________________________________________________________
#
# Copyright (c) 2012 Berlin Institute of Technology
# All rights reserved.
#
# Developed by: Philipp Meier <pmeier82@gmail.com>
# Neural Information Processing Group (NI)
# School for Electrical Engineering and Computer Science
# Berlin Institute of Technology
# MAR 5-6, Marchstr. 23, 10587 Berlin, Germany
# http://www.ni.tu-berlin.de/
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal with the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimers.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimers in the documentation
# and/or other materials provided with the distribution.
# * Neither the names of Neural Information Processing Group (NI), Berlin
# Institute of Technology, nor the names of its contributors may be used to
# endorse or promote products derived from this Software without specific
# prior written permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# WITH THE SOFTWARE.
#_____________________________________________________________________________
#
# Acknowledgements:
# Philipp Meier <pmeier82@gmail.com>
#_____________________________________________________________________________
#
"""amplitude histogram to track time-variant waveforms"""
__docformat__ = 'restructuredtext'
__all__ = ['AmplitudeHistogram']
##---IMPORTS
import scipy as sp
from .ringbuffer import MxRingBuffer
##---CLASSES
[docs]class AmplitudeHistogram(object):
"""amplitude histogram calculator"""
## constructor
def __init__(self, ampl_range=(-2.0, 2.0, .1), ampl_noise=(-0.2, 0.2),
nchan=1, bin_size=32000, bin_hist=3000, ):
"""
:type ampl_range: tuple(float,float, float)
:param ampl_range: tuple of (min value, max value, step value) for the
amplitude binning.
Default=(-2.0,2.0)
:type ampl_noise: tuple(float,float)
:param ampl_noise: a tuple of min and max values for the noise band
of the histogram. values from the noise ban will be
omitted/considered equal zero for the purpose of visualisation
and statistics.
Default=(-0.2,0.2)
:type nchan: int
:param nchan: number of channels in the data, histograms will be
tracked independently for each of the channels.
Default=1
:type bin_size: int
:param bin_size: size of one histogram-bin in data samples, only used
if single-bin histograms are calculated from the data.
Default=32000
:type bin_hist: int
:param bin_hist: length of the history, once 'bin_hist' bins have been
the accumulated, the oldest window will be forgotten (ringbuffer).
Default=1000
"""
# parameters
self._ampl_range = AmplitudeHistogram.range(*ampl_range)
self._ampl_noise = None
if isinstance(ampl_noise, tuple):
if len(ampl_noise) == 2:
self._ampl_noise = (self._ampl_range > ampl_noise[0]) *\
(self._ampl_range < ampl_noise[1])
self._ampl_noise = self._ampl_noise[::-1]
else:
raise ValueError('ampl_noise must be a tuple of length 2!')
if nchan < 1:
raise ValueError('nchan must be a positive integer!')
self._nchan = int(nchan)
if bin_size < 1:
raise ValueError('bin_size must be a positive integer!')
self._bin_size = int(bin_size)
if bin_hist < 1:
raise ValueError('bin_hist must be a positive integer!')
self._bin_hist = int(bin_hist)
self._hist_data = MxRingBuffer(
capacity=self._bin_hist,
dimension=(self._nchan, self._ampl_range.size - 1),
dtype=int)
self._cur_bin = sp.zeros((self._nchan, self._ampl_range.size - 1))
self._cur_bin_smpl = 0
self._cache_good = False
self._cache = None
[docs] def force_new_bin(self):
""" force a new bin and finalize the current bin"""
self._hist_data.append(self._cur_bin)
self._cur_bin_smpl = 0
self._cur_bin[:] = 0
[docs] def append_bin(self, bin):
"""append an AmplHistBin instance
:type bin: ndarray like
:param bin: the amplHistBin to append
"""
# checks
bin_ = sp.asanyarray(bin)
if bin_.shape != self._cur_bin.shape:
raise ValueError('shape does not match! expected %s, got %s' %
(self._cur_bin.shape, bin_.shape))
if bin_.sum() == 0:
print '!!appending zero bin!!'
# append bin
self._hist_data.append(bin_)
[docs] def append_data_all(self, data, force=False):
"""append bin(s) calculated from a strip of data
with this method a histogram of the amplitude distribution of the
passed data is generated as one observation and appended to the
current amplitude histogram.
:type data: ndarray
:param data: the data to generate the bin(s) to append from
:type force: bool
:param force: if True, immediately start a new bin before calculation
"""
# check data
data_ = sp.asanyarray(data)
if data.ndim < 2:
data_ = sp.atleast_2d(data_)
if data_.shape[0] < data_.shape[1]:
data_ = data_.T
nsmpl, nchan = data_.shape
if nchan != self._nchan:
raise ValueError('data has channel count %s, expected %s' %
(nchan, self._nchan))
# generate bin set
bin_set = [0]
if self._cur_bin_smpl != 0:
bin_set.append(self._bin_size - self._cur_bin_smpl)
while bin_set[-1] < nsmpl:
bin_set.append(bin_set[-1] + self._bin_size)
if bin_set[-1] > nsmpl:
bin_set[-1] = nsmpl
# process bins
idx = 1
while idx < len(bin_set):
data_bin = data_[bin_set[idx - 1]:bin_set[idx], :]
for c in xrange(self._nchan):
self._cur_bin[c] += sp.histogram(data_bin[:, c],
bins=self._ampl_range)[0]
self._cur_bin_smpl += data_bin.shape[0]
if self._cur_bin_smpl == self._bin_size:
self.append_bin(self._cur_bin)
self._cur_bin[:] = 0
self._cur_bin_smpl = 0
idx += 1
[docs] def append_data_peaks(self, data, force=False):
"""append bin(s) calculated from a strip of data
with this method the data is first queried for peaks. this should
reduce the noise/smoothness of the histogram as observed from the
amplitude distribution of the pure signal.
:type data: ndarray
:param data: the data to generate the bin(s) to append from
:type force: bool
:param force: if True, immediately start a new bin before calculation
"""
# check data
data_ = sp.asanyarray(data)
if data.ndim < 2:
data_ = sp.atleast_2d(data_)
if data_.shape[0] < data_.shape[1]:
data_ = data_.T
nsmpl, nchan = data_.shape
if nchan != self._nchan:
raise ValueError('data has channel count %s, expected %s' %
(nchan, self._nchan))
# generate bin set
bin_set = [0]
if self._cur_bin_smpl != 0:
bin_set.append(self._bin_size - self._cur_bin_smpl)
while bin_set[-1] < nsmpl:
bin_set.append(bin_set[-1] + self._bin_size)
if bin_set[-1] > nsmpl:
bin_set[-1] = nsmpl
# process bins
idx = 1
while idx < len(bin_set):
data_bin = data_[bin_set[idx - 1]:bin_set[idx], :]
for c in xrange(self._nchan):
self._cur_bin[c] += sp.histogram(data_bin[:, c],
bins=self._ampl_range)[0]
self._cur_bin_smpl += data_bin.shape[0]
if self._cur_bin_smpl == self._bin_size:
self.append_bin(self._cur_bin)
self._cur_bin[:] = 0
self._cur_bin_smpl = 0
idx += 1
[docs] def get_channel_hist(self, channel=0, omit_noise=True):
"""yield the current amplitude histogram for the requested channel
of the data
:type channel : int
:param cahnnel: channel of the data
:type omit_noise: bool
:param omit_noise: if True, omit value in the noise band
"""
rval = self._hist_data[:][:, channel, :].T[::-1]
if omit_noise is True:
rval[self._ampl_noise, :] = 0
return rval
[docs] def get_hist(self, omit_noise=True):
"""yield tuple of amplitude histograms for all channels of the data
:type omit_noise: bool
:param omit_noise: if True, omit value in the noise band
"""
return tuple([self.get_channel_hist(c, omit_noise)
for c in xrange(self._nchan)])
[docs] def plot_ampl_hist(self):
"""plot the amplitude histogram"""
raise DeprecationWarning('NSIM DEPENDANCY !!!')
# TODO: fix after nsim is good again
from PyQt4 import QtCore, QtGui
from nsim.gui.plotting import MatShow
CMAP_PARAMS = (
QtCore.Qt.white,
QtCore.Qt.red,
(0.0001, QtCore.Qt.blue),
(0.25, QtCore.Qt.cyan),
(0.5, QtCore.Qt.green),
(0.75, QtCore.Qt.yellow) )
class AmplHistWidget(QtGui.QWidget):
def __init__(self, parent=None, **kwargs):
"""
:Parameters:
parent : QWidget
Qt parent.
"""
# super for the plotting
super(AmplHistWidget, self).__init__(parent)
# channels
self.nchan = kwargs.get('nchan', 4)
# setup gui components
self.lo_ampl_ms = QtGui.QVBoxLayout(self)
self.ampl_ms = []
for i in xrange(self.nchan):
self.ampl_ms.append(MatShow(parent=self,
cmap=CMAP_PARAMS))
self.lo_ampl_ms.addWidget(self.ampl_ms[i])
def update_data(self, ah_data):
"""update the amplitude histogram
:Parameters:
ah_data : tuple
tuple of single channel amplitude histograms
"""
for c in xrange(self.nchan):
self.ampl_ms[c].set_data(sp.log(ah_data[c] + 1.0))
# start QT loop
app = QtGui.QApplication([])
ahw = AmplHistWidget(nchan=4)
ahw.update_data(self.get_hist())
ahw.show()
return app.exec_()
## static methods
@staticmethod
[docs] def range(min_val, max_val, bin_size):
"""range function for the binning of the histogram"""
# checks
if min_val > max_val:
min_val, max_val = max_val, min_val
# calculate start and stop bins
min_bin = int(sp.floor_divide(min_val, bin_size))
max_bin = int(sp.floor_divide(max_val, bin_size) + 1)
# return
return sp.arange(min_bin, max_bin) * bin_size
def main1():
from tables import openFile
# from matplotlib import pyplot as P
arc = openFile('/home/ff/amplhist.h5')
ampl = AmplitudeHistogram(
ampl_range=(-.5, .2, .01),
ampl_noise=(-.05, .05),
nchan=4,
bin_size=32000,
bin_hist=1000)
for i in xrange(len(arc.listNodes('/'))):
print 'Processing: Node %s ..' % i,
ampl.append_data(arc.getNode('/%d' % i, 'data').read())
print 'done.'
arc.close()
del arc
print 'done. Computing AmplHist...'
print
print 'plotting event loop..',
ampl.plot_ampl_hist()
print 'done!'
return ampl
def main2():
from spikedb import MunkSession
EXP = 'L011'
TNR = 7
DB = MunkSession()
ampl = AmplitudeHistogram(ampl_range=(-100.0, 100.0, 1.0),
ampl_noise=(-8.0, 8.0),
nchan=4,
bin_size=300000,
bin_hist=2500)
id_exp = DB.get_exp_id(EXP)
id_blks = [DB.get_block_id(EXP, '%s' % b) for b in
['a']]#, 'b', 'c', 'd', 'e']]
id_tet = DB.get_tetrode_id(id_exp, TNR)
tlist = []
for id_blk in id_blks:
tlist.extend(DB.get_trial_range(id_blk))
for tid in tlist:
print 'Processing: %s ..' % DB.get_fname_for_id(tid),
ampl.append_data_all(DB.get_tetrode_data(tid, id_tet))
ampl.force_new_bin()
print 'done.'
print 'done. Computing AmplHist...'
DB.close()
print
print 'plotting event loop..',
ampl.plot_ampl_hist()
print 'done!'
return ampl
if __name__ == '__main__':
main2()