# -*- coding: utf-8 -*-
#_____________________________________________________________________________
#
# Copyright (c) 2012-2013, Berlin Institute of Technology
# All rights reserved.
#
# Developed by: Philipp Meier <pmeier82@gmail.com>
#
# Neural Information Processing Group (NI)
# School for Electrical Engineering and Computer Science
# Berlin Institute of Technology
# MAR 5-6, Marchstr. 23, 10587 Berlin, Germany
# http://www.ni.tu-berlin.de/
#
# Repository: https://github.com/pmeier82/BOTMpy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal with the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimers.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimers in the documentation
# and/or other materials provided with the distribution.
# * Neither the names of Neural Information Processing Group (NI), Berlin
# Institute of Technology, nor the names of its contributors may be used to
# endorse or promote products derived from this Software without specific
# prior written permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# WITH THE SOFTWARE.
#_____________________________________________________________________________
#
# Acknowledgements:
# Philipp Meier <pmeier82@gmail.com>
#_____________________________________________________________________________
#
# Changelog:
# * <iso-date> <identity> :: <description>
#_____________________________________________________________________________
#
"""initialization - clustering of spikes in feature space"""
__docformat__ = 'restructuredtext'
__all__ = ['ClusteringNode', 'HomoscedasticClusteringNode']
## IMPORTS
import scipy as sp
from sklearn.mixture import DPGMM, GMM, VBGMM
import sklearn.cluster
from sklearn.metrics import euclidean_distances
from .base_nodes import ResetNode
## CLASSES
class FitlerbankParameters(object):
"""collection of initial parameters for a filterbank
holds templates, the length of templates in samples, number of channels,
the covariance matrix and the templates itself.
"""
def __init__(self):
"""
:return:
:rtype:
"""
self.tf = None
self.nc = None
self.templates = None
self.covest = None
[docs]class ClusteringNode(ResetNode):
"""interface for clustering algorithms"""
## constructor
def __init__(self, dtype=None):
"""
:type dtype: dtype resolvable
:param dtype: will be passed to :py:class:`mdp.Node`
"""
# super
super(ClusteringNode, self).__init__(dtype=dtype)
# members
self.labels = None
self._labels = None
self.parameters = None
self._parameters = None
## mdp.Node stuff
[docs] def is_invertable(self):
return True
[docs] def is_trainable(self):
return False
def _reset(self):
self.labels = None
self._labels = None
self.parameters = None
self._parameters = None
[docs]class HomoscedasticClusteringNode(ClusteringNode):
"""clustering with model order selection to learn a mixture model
Assuming the data are prewhitened spikes, possibly in some condensed
representation e.g. PCA, the problem is to find the correct number of
components and their corresponding means. The covariance matrix of all
components is assumed to be the same, as the variation in the data is
produced by an additive noise process. Further the covariance matrix can
be assumed be the identity matrix (or a scaled version due to estimation
errors, thus a spherical covariance),
To increase performance, it is assumed all necessary pre-processing has
been taken care of already, to assure an optimal clustering performance
(e.g.: alignment, resampling, (noise)whitening, etc.)
So we have to find the number of components and their means in a
homoscedastic clustering problem. The 'goodness of fit' will be evaluated
by evaluating a likelihood based criterion that is penalised for an
increasing number of model parameters (to prevent overfitting) (ref: BIC).
Minimising said criterion will lead to the most likely model.
"""
## constructor
def __init__(
self, clus_type='kmeans', crange=range(1, 16), repeats=4,
sigma_factor=4.0, max_iter=None, conv_thresh=None, alpha=None,
cvtype='tied', gof_type='bic', dtype=None, debug=False):
"""
:type clus_type: str
:param clus_type: clustering algorithm to use. Must be one of:
'kmeans', 'gmm', 'meanshift'
Default='kmeans'
:type crange: list
:param crange: cluster count to test for
Default=range(1,16)
:type repeats: int
:param repeats: repeat this many times per cluster count
Default=4
:type sigma_factor: float
:param sigma_factor: variance factor for the spherical covariance
Default=4.0
:type max_iter: int
:param max_iter: upper bound for the iterations per run
Default=None
:type conv_thresh: float
:param conv_thresh: convergence threshold.
Default=None
:type alpha: float
:param alpha: alpha value for the variational inference based gmm
algorithms
Default=None
:type cvtype: str
:param cvtype: covariance type, one of {'spherical', 'diag', 'tied',
'full'}
Default='tied'
:type dtype: dtype resolvable
:param dtype: dtype for internal calculations
Default=None
:type gof_type: str
:param gof_type: goodness of fit criterion to use, one of {'aic', 'bic'}
Default='bic'
:type debug: bool
:param debug: if True, announce progress to stdout.
Default=False
"""
# super
super(HomoscedasticClusteringNode, self).__init__(dtype=dtype)
# members
self._ll = None
self._gof = None
self._winner = None
self.clus_type = str(clus_type)
self.gof_type = str(gof_type)
allowed_types = ['kmeans', 'gmm', 'dpgmm', 'meanshift']
if self.clus_type not in allowed_types:
raise ValueError(
'clus_type must be one of: %s!' % str(allowed_types))
self.cvtype = str(cvtype)
self.crange = list(crange)
self.repeats = int(repeats)
self.sigma_factor = float(sigma_factor)
self.debug = bool(debug)
self.clus_kwargs = {}
if max_iter is not None and clus_type in ['kmeans', 'gmm', 'vbgmm',
'dpgmm']:
self.clus_kwargs.update(max_iter=max_iter)
if conv_thresh is not None and clus_type in ['kmeans', 'gmm', 'dpgmm']:
self.clus_kwargs.update(conv_thresh=conv_thresh)
if alpha is not None and clus_type in ['vbgmm', 'dpgmm']:
self.clus_kwargs.update(alpha=alpha)
def _reset(self):
super(HomoscedasticClusteringNode, self)._reset()
self._gof = None
self._winner = None
## spectral clustering
@staticmethod
[docs] def gauss_heat_kernel(x, delta=1.0):
return sp.exp(-x ** 2 / (2. * delta ** 2))
def _fit_spectral(self, x):
# FIXME: broken still
D = euclidean_distances(x, x)
A = HomoscedasticClusteringNode.gauss_heat_kernel(D)
# clustering
for c in xrange(len(self.crange)):
k = self.crange[c]
for r in xrange(self.repeats):
# init
if self.debug is True:
print '\t[%s][c:%d][r:%d]' % (
self.clus_type, self.crange[c], r + 1),
idx = c * self.repeats + r
# evaluate model
model = sklearn.cluster.SpectralClustering(k=k)
model.fit(A)
self._labels[idx] = model.labels_
means = sp.zeros((k, x.shape[1]))
for i in xrange(k):
means[i] = x[model.labels_ == i].mean(0)
self._parameters[idx] = means
def _fit_mean_shift(self, x):
for c in xrange(len(self.crange)):
quant = 0.015 * (c + 1)
for r in xrange(self.repeats):
bandwidth = sklearn.cluster.estimate_bandwidth(
x, quantile=quant, random_state=r)
idx = c * self.repeats + r
model = sklearn.cluster.MeanShift(
bandwidth=bandwidth, bin_seeding=True)
model.fit(x)
self._labels[idx] = model.labels_
self._parameters[idx] = model.cluster_centers_
# build equivalent gmm
k = model.cluster_centers_.shape[0]
model_gmm = GMM(n_components=k, covariance_type=self.cvtype,
init_params='c', n_iter=0)
model_gmm.means_ = model.cluster_centers_
model_gmm.weights_ = sp.array(
[(model.labels_ == i).sum() for i in xrange(k)])
model_gmm.fit(x)
# evaluate goodness of fit
self._ll[idx] = model_gmm.score(x).sum()
if self.gof_type == 'aic':
self._gof[idx] = model_gmm.aic(x)
if self.gof_type == 'bic':
self._gof[idx] = model_gmm.bic(x)
print quant, k, self._gof[idx]
def _fit_kmeans(self, x):
# clustering
for c in xrange(len(self.crange)):
k = self.crange[c]
for r in xrange(self.repeats):
# info
if self.debug is True:
print '\t[%s][c:%d][r:%d]' % (
self.clus_type, self.crange[c], r + 1),
idx = c * self.repeats + r
# fit kmeans model
model_kwargs = {}
if 'max_iter' in self.clus_kwargs:
model_kwargs.update(max_iter=self.clus_kwargs['max_iter'])
model = sklearn.cluster.KMeans(
n_clusters=k, init='k-means++', **model_kwargs)
self._labels[idx] = model.fit_predict(x)
self._parameters[idx] = model.cluster_centers_
# build equivalent gmm
model_gmm = GMM(n_components=k, covariance_type=self.cvtype)
model_gmm.means_ = model.cluster_centers_
model_gmm.covars_ = sp.ones(
(k, self.input_dim)) * self.sigma_factor
model_gmm.weights_ = sp.array(
[(self._labels[idx] == i).sum() for i in xrange(k)])
# evaluate goodness of fit
self._ll[idx] = model_gmm.score(x).sum()
if self.gof_type == 'aic':
self._gof[idx] = model_gmm.aic(x)
if self.gof_type == 'bic':
self._gof[idx] = model_gmm.bic(x)
# debug info
if self.debug is True:
print self._gof[idx], model.inertia_
## gmm (vanilla em)
def _fit_gmm(self, x):
# clustering
for c in xrange(len(self.crange)):
k = self.crange[c]
for r in xrange(self.repeats):
# info
if self.debug is True:
print '\t[%s][c:%d][r:%d]' % (self.clus_type, k, r + 1),
idx = c * self.repeats + r
# fit and evaluate model
model_kwargs = {}
if 'conv_thresh' in self.clus_kwargs:
model_kwargs.update(thresh=self.clus_kwargs['conv_thresh'])
if 'max_iter' in self.clus_kwargs:
model_kwargs.update(n_iter=self.clus_kwargs['max_iter'])
model = GMM(
n_components=k,
covariance_type=self.cvtype,
params='wmc',
init_params='wmc',
**model_kwargs)
model.covars_ = {'spherical': sp.ones((k, self.input_dim)),
'diag': sp.ones((k, self.input_dim)),
'tied': sp.eye(self.input_dim),
'full': sp.array([sp.eye(self.input_dim)] * k),
}[self.cvtype] * self.sigma_factor
model.fit(x)
self._labels[idx] = model.predict(x)
self._parameters[idx] = model.means_
# evaluate goodness of fit
self._ll[idx] = model.score(x).sum()
if self.gof_type == 'aic':
self._gof[idx] = model.aic(x)
if self.gof_type == 'bic':
self._gof[idx] = model.bic(x)
# debug
if self.debug is True:
print self._gof[idx], model.converged_
## gmm (variational inference bias)
# FIXME: broken due to sklearn interface change
def _fit_vbgmm(self, x):
# clustering
for c in xrange(len(self.crange)):
k = self.crange[c]
for r in xrange(self.repeats):
# info
if self.debug is True:
print '\t[%s][c:%d][r:%d]' % (
self.clus_type, self.crange[c], r + 1),
idx = c * self.repeats + r
# fit and evaluate model
model_kwargs = {}
if 'alpha' in self.clus_kwargs:
model_kwargs.update(alpha=self.clus_kwargs['alpha'])
if 'conv_thresh' in self.clus_kwargs:
model_kwargs.update(thresh=self.clus_kwargs['conv_thresh'])
model = VBGMM(n_components=k, covariance_type=self.cvtype,
**model_kwargs)
model.n_features = self.input_dim
fit_kwargs = {}
if 'max_iter' in self.clus_kwargs:
fit_kwargs.update(n_iter=self.clus_kwargs['max_iter'])
model.fit(x, params='wmc', init_params='wmc', **fit_kwargs)
self._labels[idx] = model.predict(x)
self._parameters[idx] = model.means
self._ll[idx] = model.score(x).sum()
# evaluate goodness of fit
self._gof[idx] = self.gof(x, self._ll[idx], k)
# debug
if self.debug is True:
print self._gof[idx], model.converged_
## gmm (Dirichlet process fitting)
# FIXME: broken due to sklearn interface changes
def _fit_dpgmm(self, x):
# clustering
k = max(self.crange)
for r in xrange(self.repeats):
# info
if self.debug is True:
print '\t[%s][c:%d][r:%d]' % (self.clus_type, k, r + 1),
# fit and evaluate model
model_kwargs = {}
if 'alpha' in self.clus_kwargs:
model_kwargs.update(alpha=self.clus_kwargs['alpha'])
if 'conv_thresh' in self.clus_kwargs:
model_kwargs.update(thresh=self.clus_kwargs['conv_thresh'])
if 'max_iter' in self.clus_kwargs:
model_kwargs.update(n_iter=self.clus_kwargs['max_iter'])
model = DPGMM(n_components=k, covariance_type=self.cvtype,
**model_kwargs)
model.fit(x)
self._labels[r] = model.predict(x)
self._parameters[r] = model.means_
self._ll[r] = model.score(x).sum()
# evaluate goodness of fit for this run
#self._gof[r] = self.gof(x, self._ll[r], k)
if self.gof_type == 'aic':
self._gof[r] = model.aic(x)
if self.gof_type == 'bic':
self._gof[r] = model.bic(x)
# debug
if self.debug is True:
print self._gof[r], model.n_components, model.weights_.shape[0]
## mdp.node interface
def _execute(self, x, *args, **kwargs):
"""run the clustering on a set of observations"""
# init
self._labels = sp.zeros((len(self.crange) * self.repeats,
x.shape[0]), dtype=int) - 1
self._gof = sp.zeros(len(self.crange) * self.repeats,
dtype=self.dtype)
self._ll = sp.zeros(len(self.crange) * self.repeats,
dtype=self.dtype)
self._parameters = [None] * len(self.crange) * self.repeats
# clustering
fit_func = {'kmeans': self._fit_kmeans,
'gmm': self._fit_gmm,
#'vbgmm': self._fit_vbgmm,
'dpgmm': self._fit_dpgmm,
'spectral': self._fit_spectral,
'meanshift': self._fit_mean_shift}[self.clus_type]
fit_func(x)
self._winner = sp.nanargmin(self._gof)
self.parameters = self._parameters[self._winner]
self.labels = self._labels[self._winner]
## plot interface
[docs] def plot(self, data, views=2, show=False):
"""plot clustering"""
# get plotting tools
try:
from spikeplot import plt, cluster
except ImportError:
return None
# init
views = min(views, int(data.shape[1] / 2))
fig = plt.figure()
fig.suptitle('clustering [%s]' % self.clus_type)
ax = [fig.add_subplot(2, views, v + 1) for v in xrange(views)]
axg = fig.add_subplot(212)
ncmp = int(self.labels.max() + 1)
cdata = dict(zip(xrange(ncmp),
[data[self.labels == c] for c in xrange(ncmp)]))
# plot clustering
for v in xrange(views):
cluster(
cdata,
data_dim=(2 * v, 2 * v + 1),
plot_handle=ax[v],
plot_mean=sp.sqrt(self.sigma_factor),
xlabel='PC %d' % int(2 * v),
ylabel='PC %d' % int(2 * v + 1),
show=False)
# plot gof
axg.plot(self._gof, ls='steps')
for i in xrange(1, len(self.crange)):
axg.axvline(i * self.repeats - 0.5, c='y', ls='--')
axg.axvspan(self._winner - 0.5, self._winner + 0.5, fc='gray',
alpha=0.2)
labels = []
for k in self.crange:
labels += ['%d' % k]
labels += ['.'] * (self.repeats - 1)
axg.set_xticks(sp.arange(len(labels)))
axg.set_xticklabels(labels)
axg.set_xlabel('cluster count and repeats')
axg.set_ylabel(str(self.gof_type).upper())
axg.set_xlim(-1, len(labels))
# show?
if show is True:
plt.show()
return True
## MAIN
if __name__ == '__main__':
pass