# -*- coding: utf-8 -*-
#_____________________________________________________________________________
#
# Copyright (c) 2012-2013, Berlin Institute of Technology
# All rights reserved.
#
# Developed by: Philipp Meier <pmeier82@gmail.com>
#
# Neural Information Processing Group (NI)
# School for Electrical Engineering and Computer Science
# Berlin Institute of Technology
# MAR 5-6, Marchstr. 23, 10587 Berlin, Germany
# http://www.ni.tu-berlin.de/
#
# Repository: https://github.com/pmeier82/BOTMpy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal with the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimers.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimers in the documentation
# and/or other materials provided with the distribution.
# * Neither the names of Neural Information Processing Group (NI), Berlin
# Institute of Technology, nor the names of its contributors may be used to
# endorse or promote products derived from this Software without specific
# prior written permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# WITH THE SOFTWARE.
#_____________________________________________________________________________
#
# Acknowledgements:
# Philipp Meier <pmeier82@gmail.com>
#_____________________________________________________________________________
#
# Changelog:
# * <iso-date> <identity> :: <description>
#_____________________________________________________________________________
#
"""general utility functions"""
__docformat__ = 'restructuredtext'
__all__ = [
'sortrows', 'vec2ten', 'ten2vec', 'mcvec_to_conc', 'mcvec_from_conc',
'xcorr', 'shifted_matrix_sub', 'dict_sort_ndarrays',
'dict_list_to_ndarray', 'get_idx']
## IMPORTS
import scipy as sp
from scipy import linalg as sp_la
## FUNCTIONS
## general array operations
[docs]def sortrows(data):
"""sort matrix by rows
:type data: ndarray
:param data: ndarray that should be sorted by its rows
:returns: ndarray - data sorted by its rows.
"""
return sp.sort(
data.view([('', data.dtype)] * data.shape[1]), axis=0
).view(data.dtype)
[docs]def vec2ten(data, nchan=4):
"""converts from templates/spikes that are concatenated across the
channels to tensors that have an extra dim for the channels
:type data: ndarray
:param data: input array [templates][vars * channels]
:type nchan: int
:param nchan: count of channels
Default=4
:returns: ndarray - data converted to tensor [templates][vars][channels]
"""
if data.ndim == 1:
data = sp.atleast_2d(data)
n, dim = data.shape
if dim % nchan != 0:
raise ValueError(
'dim %s nchan != 0 !! dim=%s, nchan=%s' % (dim, nchan))
tf = dim / nchan
rval = sp.zeros((n, tf, nchan), data.dtype)
for i in xrange(n):
for c in xrange(nchan):
rval[i, :, c] = data[i, c * tf:(c + 1) * tf]
return rval
[docs]def ten2vec(data):
"""converts from templates/spikes that are not concatenated across the
channels to vectors.
:type data: ndarray
:param data: input array [templates][vars][channels]
:returns: ndarray- data converted to concatenated vectors
[templates][channels * vars]
"""
# init
n, tf, nchan = data.shape
rval = sp.zeros((n, nchan * tf), data.dtype)
# transform
for i in xrange(n):
for c in xrange(nchan):
rval[i, c * tf:(c + 1) * tf] = data[i, :, c]
# return
return rval
[docs]def mcvec_to_conc(x):
"""returns the concatenated vector for a multi-channeled vector
:type x: ndarray
:param x: multi-channeled vector in matrix form
:returns: ndarray - multi-channeled vector in channel concatenated form
"""
return x.T.flatten()
[docs]def mcvec_from_conc(x, nc=4):
"""returns the multi-channeled vector from a concatenated representation
:type x: ndarray
:param x: multi-channeled vector in channel concatenated form
:type nc: int
:param nc: channel count
:returns: ndarray - multi-channeled vector in matrix form
"""
nsamples = x.size / nc
if nsamples != round(x.size / nc):
raise ValueError('nc does not match the vector size!')
return x.reshape(nc, nsamples).T
[docs]def xcorr(a, b=None, lag=None, normalise=False, unbiased=False):
"""cross-correlation for one-dimensional input signals of equal size
If :b: is not given the auto-correlation of :a: will be computed.
:type a: ndarray
:param a: one-dimensional time series
:type b: ndarray
:param b: one-dimensional time series, if None :a: will be taken instead
Default=None
:type lag: int
:param lag: lag up to which the cross correlation will be calculated. If
None all possible lags (2*a.size-1) will be computed.
Default=None
:type normalise: bool
:param normalise: if True, normalise
Default=True
:type unbiased: bool
:param unbiased: if True and :normalise: is True, use a.size-|tau| to
normalize instead of a.size
Default=False
:returns: ndarray - cross-correlate of :a: and :b: upt to lags :lag:
"""
# checks
a = sp.asarray(a)
if b is None:
b = a
else:
b = sp.asarray(b)
if not (a.ndim == b.ndim == 1):
raise ValueError('a.ndim != b.ndim != 1')
if a.size != b.size:
raise ValueError('a.size != b.size')
if a.size < 2:
raise ValueError('a.size < 2')
if lag is None:
lag = int(a.size - 1)
if lag > a.size - 1:
raise ValueError('lag > vector size - 1')
# init
T = a.size
lag_range = xrange(int(-lag), int(lag) + 1)
rval = sp.empty(len(lag_range), dtype=a.dtype)
# calc
for tau in lag_range:
rval[lag + tau] = sp.dot(a[max(0, +tau):min(T, T + tau)], b[max(0, -tau):min(T, T - tau)])
# normalise
if normalise is True:
denom = sp.array([T] * len(lag_range))
if unbiased is True:
denom -= sp.absolute(lag_range)
rval /= denom
# return
return rval
def xcorrv(a, b=None, lag=None, dtype=None):
"""vectorial cross correlation by taking the expectation over an outer product"""
# checks
a = sp.asarray(a)
b = sp.asarray(b or a)
if not (a.ndim == b.ndim):
raise ValueError('a.ndim !== b.ndim')
#if a.size != b.size:
# raise ValueError('a.size != b.size')
#if a.size < 2:
# raise ValueError('a.size < 2')
if lag is None:
lag = int(a.shape[0] - 1)
if lag > a.shape[0] - 1:
raise ValueError('lag > vector len - 1')
# init
lag_range = xrange(int(-lag), int(lag) + 1)
rval = sp.empty((a.shape[1], b.shape[1], len(lag_range)), dtype=dtype or a.dtype)
# calc
for tau in lag_range:
prod = a.T[:, None, max(0, +tau):min(len(a), len(a) + tau)] *\
b.T[None, :, max(0, -tau):min(len(b), len(b) - tau)].conj()
rval[..., lag + tau] = prod.mean(axis=-1)
# return
return rval
## filtering and related processing
[docs]def shifted_matrix_sub(data, sub, tau, pad_val=0.0):
"""Subtracts the multi-channeled vector (rows are channels) y from
the vector x with a certain offset. x and y can due to the offset be only
partly overlapping.
REM: from matlab
:type data: ndarray
:param data: data array to apply the subtractor to
:type sub: ndarray
:param sub: subtractor array
:type tau: int
:param tau: offset of :sub: w.r.t. start of :data:
:type pad_val: float
:param pad_val: value to use for the padding
Default=0.0
:return: ndarray - data minus sub at offset, len(data)
"""
ns_data, nc_data = data.shape
ns_sub, nc_sub = sub.shape
if nc_data != nc_sub:
raise ValueError('nc_data and nc_sub must agree!')
tau = int(tau)
data_sub = sp.empty_like(data)
data_sub[:] = pad_val
data_sub[max(0, tau):tau + ns_sub] = sub[max(0, -tau):ns_data - tau]
return data - data_sub
## data structure utilities
[docs]def dict_list_to_ndarray(in_dict):
"""converts all lists in a dictionary to ndarray, works recursively
:type in_dict: dict
:param in_dict: dict to workd on
:returns: dict - all list are converted to ndarray
"""
for k in in_dict:
if isinstance(in_dict[k], list):
in_dict[k] = sp.asarray(in_dict[k])
elif isinstance(in_dict[k], dict):
in_dict[k] = dict_list_to_ndarray(in_dict[k])
else:
pass
return in_dict
[docs]def dict_sort_ndarrays(in_dict):
"""sort all arrays in a dictionary, works recursively
:type in_dict: dict
:param in_dict: dict to work on
:returns:
"""
for k in in_dict:
if isinstance(in_dict[k], sp.ndarray):
# in_dict[k] = sp.sort(in_dict[k])
in_dict[k] = sp.sort(in_dict[k])
elif isinstance(in_dict[k], dict):
in_dict[k] = dict_sort_ndarrays(in_dict[k])
else:
pass
return in_dict
## index calculations
[docs]def get_idx(idxset, append=False):
"""yields the first free index in a positive integer index set
:type append: bool
:param append: if True, returns max(:idxset:)+1,
else find the first free index in :idxset:
:returns: int - the first free index
"""
try:
idxmax = max(idxset) + 1
if append is True:
return idxmax
idxrange = sp.arange(idxmax)
return idxrange[sp.nanargmin(sp.in1d(idxrange, idxset))]
except:
return 0
## MAIN
if __name__ == '__main__':
pass