# -*- coding: utf-8 -*-
#_____________________________________________________________________________
#
# Copyright (c) 2012 Berlin Institute of Technology
# All rights reserved.
#
# Developed by: Philipp Meier <pmeier82@gmail.com>
# Neural Information Processing Group (NI)
# School for Electrical Engineering and Computer Science
# Berlin Institute of Technology
# MAR 5-6, Marchstr. 23, 10587 Berlin, Germany
# http://www.ni.tu-berlin.de/
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal with the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimers.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimers in the documentation
# and/or other materials provided with the distribution.
# * Neither the names of Neural Information Processing Group (NI), Berlin
# Institute of Technology, nor the names of its contributors may be used to
# endorse or promote products derived from this Software without specific
# prior written permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# WITH THE SOFTWARE.
#_____________________________________________________________________________
#
# Acknowledgements:
# Philipp Meier <pmeier82@gmail.com>
#_____________________________________________________________________________
#
"""detector nodes for capacitative artifacts in multichanneled data
These detectors find events and event epochs on potentially multichanneled data
signal. Mostly, you will want to reset the internals of the detector after
processing a chunk of data. There are different kinds of detectors, the common
product of the detector is the discrete events or epochs in the data signal.
DATA_TYPE: fixed for float32 (single precision)"""
__docformat__ = 'restructuredtext'
__all__ = ['ArtifactDetectorNode', 'SpectrumArtifactDetector']
##--- IMPORTS
import scipy as sp
from ..common import epochs_from_binvec, merge_epochs, invert_epochs, INDEX_DTYPE
from .spike_detection import ThresholdDetectorNode
##--- CLASSES
[docs]class ArtifactDetectorNode(ThresholdDetectorNode):
"""detects artifacts by detecting zero-crossing frequencies
For a zero-mean gaussian process the the zero-crossing rate `zcr` is
independent of its moments and approaches 0.5 as the integration window
size approaches infinity:
.. math::
s_t \\sim N(0,\\Sigma)
zcr_{wsize}(s_t) = \\frac{1}{wsize-1} \\sum_{t=1}^{wsize-1}
{{\\mathbb I}\\left\{{s_t s_{t-1} < 0}\\right\\}}
\\lim_{wsize \\rightarrow \\infty} zcr_{wsize}(s_t) = 0.5
The capacitive artifacts seen in the Munk dataset have a significantly
lower frequency, s.t. zcr decreases to 0.1 and below, for the integration
window sizes relevant to our application. Detecting epochs where the zcr
significantly deviates from the expectation, assuming a coloured Gaussian
noise process, can thus lead be used for detection of artifact epochs.
The zero crossing rate (zcr) is given by the convolution of a moving
average window (although this is configurable to use other weighting
methods) with the XOR of the signbits of X(t) and X(t+1).
"""
## constructor
def __init__(self, wsize_ms=15.0, psize_ms=(5.0, 10.0), wfunc=sp.ones,
srate=32000.0, zcr_th=0.1, mindist_ms=10.0):
"""
:type wsize_ms: float
:param wsize_ms: window size of the integration window in `ms`. Should
be large enough to cover the low band of the artifacts and not
overlap with the lower band of spikes (spike clusters).
Default=15.0
:type psize_ms: tuple
:param psize_ms: window size of the padding windows in `ms`. Will be
applied to detected artifact epochs. (left_pad, right_pad)
Default=5.0
:type wfunc: function
:param wfunc: function that creates the integration window. The
function has to take one parameter denoting the window size in
samples.
Default=scipy.ones
:type srate: float
:param srate: sample rate in `Hz`. Used to convert the windows sizes
from `ms` to data samples.
Default=32000.0
:type zcr_th: float
:param zrc_th: zrc (zero crossing rate) threshold, epochs of the data
where the zrc falls below the threshold will be classified as
artifact epochs.
Default=0.11
:type mindist_ms: float
:param mindist_ms: minimum size for non-artifact epochs in `ms`.
Data epochs in between artifacts epochs that are smaller than this
window, are merged into the artifact epochs to reduce
segmentation.
Default=10.0
"""
# super
super(ArtifactDetectorNode, self).__init__()
# members
self.srate = float(srate)
self.window = wfunc(int(wsize_ms * self.srate / 1000.0))
self.window /= self.window.sum()
self.pad = (int(psize_ms[0] * self.srate / 1000.0),
int(psize_ms[1] * self.srate / 1000.0))
self.mindist = int(mindist_ms * self.srate / 1000.0)
self.zcr_th = float(zcr_th)
## privates
def _energy_func(self, x, **kwargs):
x_signs = sp.signbit(x)
return sp.vstack((sp.bitwise_xor(x_signs[:-1], x_signs[1:]), [False] * x.shape[1]))
def _execute(self, x, *args, **kwargs):
# init
epochs = []
# per channel detection
for c in xrange(self.nchan):
# filter energy with window
xings = sp.correlate(self.energy[:, c], self.window, 'same')
# replace filter artifacts with the mean
mu = xings[self.window.size:-self.window.size].mean()
xings[:self.window.size] = xings[-self.window.size:] = mu
ep = epochs_from_binvec(xings < self.zcr_th)
epochs.append(ep)
# pad and merge artifact epochs
epochs = sp.vstack(epochs)
if epochs.size > 0:
epochs[:, 0] -= self.pad[0]
epochs[:, 1] += self.pad[1]
self.events = merge_epochs(epochs, min_dist=self.mindist)
# return
self.events = self.events.astype(INDEX_DTYPE)
return x
## evaluations
[docs] def get_fragmentation(self):
"""returns the artifact fragmentation"""
if self.size is None:
raise RuntimeError('No data given!')
nae_len = float(
self.size - (self.events[:, 1] - self.events[:, 0]).sum())
return - sp.log(nae_len / (self.size * (self.events.shape[0] + 1)))
[docs] def get_nonartefact_epochs(self):
"""return the index set that represents the non-artifact epochs"""
if self.size is None:
raise RuntimeError('No data given!')
if self.events.size == 0:
return sp.array([[0, self.size]])
else:
return invert_epochs(self.events, end=self.size)
# TODO: matplotlib.mlab.specgram is deprecated! please reiterate !!
[docs]class SpectrumArtifactDetector(ThresholdDetectorNode):
"""detects artifacts by identifying unwanted frequency packages in the spectrum of the signal
For a zero-mean gaussian process the the zero-crossing rate `zcr` is
independent of its moments and approaches 0.5 as the integration window
size approaches infinity:
.. math::
s_t \\sim N(0,\\Sigma)
zcr_{wsize}(s_t) = \\frac{1}{wsize-1} \\sum_{t=1}^{wsize-1}
{{\\mathbb I}\\left\{{s_t s_{t-1} < 0}\\right\\}}
\\lim_{wsize \\rightarrow \\infty} zcr_{wsize}(s_t) = 0.5
The capacitive artifacts seen in the Munk dataset have a significantly
lower frequency, s.t. zcr decreases to 0.1 and below, for the integration
window sizes relevant to our application. Detecting epochs where the zcr
significantly deviates from the expectation, assuming a coloured Gaussian
noise process, can thus lead be used for detection of artifact epochs.
The zero crossing rate (zcr) is given by the convolution of a moving
average window (although this is configurable to use other weighting
methods) with the XOR of the signbits of X(t) and X(t+1).
"""
## constructor
def __init__(self, wsize_ms=8.0, srate=32000.0, cutoff_hz=2000.0, nfft=512,
en_func='max_normed', overlap=1, max_merge_dist = 6,
min_allowed_length = 2, **kw):
"""lala"""
# super
kw['ch_separate'] = True
super(SpectrumArtifactDetector, self).__init__(**kw)
# members
self.srate = float(srate)
self.wsize = None
self.cutoff_hz = float(cutoff_hz)
self.nfft = 1
self.en_func = en_func
self.overlap = overlap # 0- No overlap, 1 - 50% overlap, 2 - 75% overlap
self.max_merge_dist = max_merge_dist
self.min_allowed_length = min_allowed_length
while self.nfft < nfft:
self.nfft <<= 1
## privates
def _threshold_func(self, x):
return 1.0
# TODO: matplotlib.mlab.specgram is deprecated! please reiterate !!
def _energy_func(self, x, **kwargs):
# from matplotlib.mlab import specgram
rval = sp.zeros_like(x)
# ns, nc = x.shape
# ov_samples = 0
# offset = 0
# if self.overlap == 1:
# ov_samples = self.nfft * 0.5
# offset = self.nfft / 4
# elif self.overlap == 2:
# ov_samples = self.nfft * 0.75
# offset = self.nfft * 0.375
# step = self.nfft - ov_samples
#
# for c in xrange(nc):
# psd_arr, freqs, times = specgram(x[:, c], NFFT=self.nfft, Fs=self.srate, noverlap=ov_samples)
# mask = freqs < self.cutoff_hz
# for b in xrange(len(times)):
# bin_s = b * step + offset
# bin_e = bin_s + step
#
# if self.en_func == 'mean_coeff':
# rval[bin_s:bin_e, c] = psd_arr[mask == True, b].mean() / psd_arr[mask == False, b].mean()
# elif self.en_func == 'max_coeff':
# rval[bin_s:bin_e, c] = psd_arr[mask == True, b].max() / psd_arr[mask == False, b].max()
# elif self.en_func == 'max_normed':
# rval[bin_s:bin_e, c] = psd_arr[mask == True, b].max() / psd_arr[:, b].sum(axis = 0)
# else:
# raise RuntimeError('Energy function does not exist!')
return rval
def _execute(self, x, *args, **kwargs):
# init
epochs = []
self._calc_threshold()
if self.overlap == 0:
step = self.nfft
elif self.overlap == 1:
step = self.nfft / 2
else:
step = self.nfft / 4
# per channel detection
for c in xrange(self.nchan):
ep = epochs_from_binvec(self.energy[:, c] > self.threshold[c])
epochs.extend(ep)
if len(epochs) == 0:
epochs = sp.zeros((0, 2))
else:
epochs = merge_epochs(epochs, min_dist=step * self.max_merge_dist + 1)
epochs = epochs[epochs[:, 1] - epochs[:, 0] >= step * self.min_allowed_length]
self.events = sp.asarray(epochs, dtype=INDEX_DTYPE)
return x
## publics
[docs] def get_nonartefact_epochs(self):
"""return the index set that represents the non-artifact epochs"""
if self.size is None:
raise RuntimeError('No data given!')
if self.events.size == 0:
return sp.array([[0, self.size]])
else:
return invert_epochs(self.events, end=self.size)
##--- MAIN
if __name__ == '__main__':
pass