# -*- coding: utf-8 -*-
#_____________________________________________________________________________
#
# Copyright (c) 2012-2013, Berlin Institute of Technology
# All rights reserved.
#
# Developed by: Philipp Meier <pmeier82@gmail.com>
#
# Neural Information Processing Group (NI)
# School for Electrical Engineering and Computer Science
# Berlin Institute of Technology
# MAR 5-6, Marchstr. 23, 10587 Berlin, Germany
# http://www.ni.tu-berlin.de/
#
# Repository: https://github.com/pmeier82/BOTMpy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal with the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimers.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimers in the documentation
# and/or other materials provided with the distribution.
# * Neither the names of Neural Information Processing Group (NI), Berlin
# Institute of Technology, nor the names of its contributors may be used to
# endorse or promote products derived from this Software without specific
# prior written permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# WITH THE SOFTWARE.
#_____________________________________________________________________________
#
# Acknowledgements:
# Philipp Meier <pmeier82@gmail.com>
#_____________________________________________________________________________
#
# Changelog:
# * <iso-date> <identity> :: <description>
#_____________________________________________________________________________
#
"""detector nodes for capacitative artifacts in multi-channeled data
These detectors find events and event epochs on potentially multi-channeled data
signal. Mostly, you will want to reset the internals of the detector after
processing a chunk of data. There are different kinds of detectors, the common
product of the detector is the discrete events or epochs in the data signal.
DATA_TYPE: fixed for float32 (single precision)"""
__docformat__ = 'restructuredtext'
__all__ = ['ArtifactDetectorNode', 'SpectrumArtifactDetector']
## IMPORTS
import scipy as sp
from matplotlib.mlab import specgram
from ..common import epochs_from_binvec, merge_epochs, invert_epochs, INDEX_DTYPE
from .spike_detection import ThresholdDetectorNode
## CLASSES
[docs]class ArtifactDetectorNode(ThresholdDetectorNode):
"""detects artifacts by detecting zero-crossing frequencies
For a zero-mean gaussian process the the zero-crossing rate `zcr` is
independent of its moments and approaches 0.5 as the integration window
size approaches infinity:
.. math::
s_t \\sim N(0,\\Sigma)
zcr_{wsize}(s_t) = \\frac{1}{wsize-1} \\sum_{t=1}^{wsize-1}
{{\\mathbb I}\\left\{{s_t s_{t-1} < 0}\\right\\}}
\\lim_{wsize \\rightarrow \\infty} zcr_{wsize}(s_t) = 0.5
The capacitive artifacts seen in the Munk dataset have a significantly
lower frequency, s.t. zcr decreases to 0.1 and below, for the integration
window sizes relevant to our application. Detecting epochs where the zcr
significantly deviates from the expectation, assuming a coloured Gaussian
noise process, can thus lead be used for detection of artifact epochs.
The zero crossing rate (zcr) is given by the convolution of a moving
average window (although this is configurable to use other weighting
methods) with the XOR of the signbits of X(t) and X(t+1).
"""
## constructor
def __init__(self, wsize_ms=15.0, psize_ms=(5.0, 10.0), wfunc=sp.ones,
srate=32000.0, zcr_th=0.1, mindist_ms=10.0):
"""
:type wsize_ms: float
:param wsize_ms: window size of the integration window in `ms`. Should
be large enough to cover the low band of the artifacts and not
overlap with the lower band of spikes (spike clusters).
Default=15.0
:type psize_ms: tuple
:param psize_ms: window size of the padding windows in `ms`. Will be
applied to detected artifact epochs. (left_pad, right_pad)
Default=5.0
:type wfunc: function
:param wfunc: function that creates the integration window. The
function has to take one parameter denoting the window size in
samples.
Default=scipy.ones
:type srate: float
:param srate: sample rate in `Hz`. Used to convert the windows sizes
from `ms` to data samples.
Default=32000.0
:type zcr_th: float
:param zrc_th: zrc (zero crossing rate) threshold, epochs of the data
where the zrc falls below the threshold will be classified as
artifact epochs.
Default=0.11
:type mindist_ms: float
:param mindist_ms: minimum size for non-artifact epochs in `ms`.
Data epochs in between artifacts epochs that are smaller than this
window, are merged into the artifact epochs to reduce
segmentation.
Default=10.0
"""
# super
super(ArtifactDetectorNode, self).__init__()
# members
self.srate = float(srate)
self.window = wfunc(int(wsize_ms * self.srate / 1000.0))
self.window /= self.window.sum()
self.pad = (int(psize_ms[0] * self.srate / 1000.0),
int(psize_ms[1] * self.srate / 1000.0))
self.mindist = int(mindist_ms * self.srate / 1000.0)
self.zcr_th = float(zcr_th)
## privates
def _energy_func(self, x, **kwargs):
x_signs = sp.signbit(x)
return sp.vstack((sp.bitwise_xor(x_signs[:-1], x_signs[1:]), [False] * x.shape[1]))
def _execute(self, x, *args, **kwargs):
# init
epochs = []
# per channel detection
for c in xrange(self.nchan):
# filter energy with window
xings = sp.correlate(self.energy[:, c], self.window, 'same')
# replace filter artifacts with the mean
mu = xings[self.window.size:-self.window.size].mean()
xings[:self.window.size] = xings[-self.window.size:] = mu
ep = epochs_from_binvec(xings < self.zcr_th)
epochs.append(ep)
# pad and merge artifact epochs
epochs = sp.vstack(epochs)
if epochs.size > 0:
epochs[:, 0] -= self.pad[0]
epochs[:, 1] += self.pad[1]
self.events = merge_epochs(epochs, min_dist=self.mindist)
# return
self.events = self.events.astype(INDEX_DTYPE)
return x
## evaluations
[docs] def get_fragmentation(self):
"""returns the artifact fragmentation"""
if self.size is None:
raise RuntimeError('No data given!')
nae_len = float(
self.size - (self.events[:, 1] - self.events[:, 0]).sum())
return - sp.log(nae_len / (self.size * (self.events.shape[0] + 1)))
[docs] def get_nonartefact_epochs(self):
"""return the index set that represents the non-artifact epochs"""
if self.size is None:
raise RuntimeError('No data given!')
if self.events.size == 0:
return sp.array([[0, self.size]])
else:
return invert_epochs(self.events, end=self.size)
[docs]class SpectrumArtifactDetector(ThresholdDetectorNode):
"""detects artifacts by identifying unwanted frequency packages in the spectrum of the signal
For a zero-mean gaussian process the the zero-crossing rate `zcr` is
independent of its moments and approaches 0.5 as the integration window
size approaches infinity:
.. math::
s_t \\sim N(0,\\Sigma)
zcr_{wsize}(s_t) = \\frac{1}{wsize-1} \\sum_{t=1}^{wsize-1}
{{\\mathbb I}\\left\{{s_t s_{t-1} < 0}\\right\\}}
\\lim_{wsize \\rightarrow \\infty} zcr_{wsize}(s_t) = 0.5
The capacitive artifacts seen in the Munk dataset have a significantly
lower frequency, s.t. zcr decreases to 0.1 and below, for the integration
window sizes relevant to our application. Detecting epochs where the zcr
significantly deviates from the expectation, assuming a coloured Gaussian
noise process, can thus lead be used for detection of artifact epochs.
The zero crossing rate (zcr) is given by the convolution of a moving
average window (although this is configurable to use other weighting
methods) with the XOR of the signbits of X(t) and X(t+1).
"""
## constructor
def __init__(self, wsize_ms=8.0, srate=32000.0, cutoff_hz=2000.0, nfft=512,
en_func='max_normed', overlap=1, max_merge_dist = 6,
min_allowed_length = 2, **kw):
"""lala"""
# super
kw['ch_separate'] = True
super(SpectrumArtifactDetector, self).__init__(**kw)
# members
self.srate = float(srate)
self.wsize = None
self.cutoff_hz = float(cutoff_hz)
self.nfft = 1
self.en_func = en_func
self.overlap = overlap # 0- No overlap, 1 - 50% overlap, 2 - 75% overlap
self.max_merge_dist = max_merge_dist
self.min_allowed_length = min_allowed_length
while self.nfft < nfft:
self.nfft <<= 1
## privates
def _threshold_func(self, x):
return 1.0
def _energy_func(self, x, **kwargs):
from matplotlib.mlab import specgram
rval = sp.zeros_like(x)
ns, nc = x.shape
ov_samples = 0
offset = 0
if self.overlap == 1:
ov_samples = self.nfft * 0.5
offset = self.nfft / 4
elif self.overlap == 2:
ov_samples = self.nfft * 0.75
offset = self.nfft * 0.375
step = self.nfft - ov_samples
for c in xrange(nc):
psd_arr, freqs, times = specgram(x[:, c], NFFT=self.nfft, Fs=self.srate, noverlap=ov_samples)
mask = freqs < self.cutoff_hz
for b in xrange(len(times)):
bin_s = b * step + offset
bin_e = bin_s + step
if self.en_func == 'mean_coeff':
rval[bin_s:bin_e, c] = psd_arr[mask == True, b].mean() / psd_arr[mask == False, b].mean()
elif self.en_func == 'max_coeff':
rval[bin_s:bin_e, c] = psd_arr[mask == True, b].max() / psd_arr[mask == False, b].max()
elif self.en_func == 'max_normed':
rval[bin_s:bin_e, c] = psd_arr[mask == True, b].max() / psd_arr[:, b].sum(axis = 0)
else:
raise RuntimeError('Energy function does not exist!')
return rval
def _execute(self, x, *args, **kwargs):
# init
epochs = []
self._calc_threshold()
if self.overlap == 0:
step = self.nfft
elif self.overlap == 1:
step = self.nfft / 2
else:
step = self.nfft / 4
# per channel detection
for c in xrange(self.nchan):
ep = epochs_from_binvec(self.energy[:, c] > self.threshold[c])
epochs.extend(ep)
if len(epochs) == 0:
epochs = sp.zeros((0, 2))
else:
epochs = merge_epochs(epochs, min_dist=step * self.max_merge_dist + 1)
epochs = epochs[epochs[:, 1] - epochs[:, 0] >= step * self.min_allowed_length]
self.events = sp.asarray(epochs, dtype=INDEX_DTYPE)
return x
## publics
[docs] def get_nonartefact_epochs(self):
"""return the index set that represents the non-artifact epochs"""
if self.size is None:
raise RuntimeError('No data given!')
if self.events.size == 0:
return sp.array([[0, self.size]])
else:
return invert_epochs(self.events, end=self.size)
## MAIN
if __name__ == '__main__':
pass