Source code for botmpy.common.covariance_estimator

# -*- coding: utf-8 -*-
#_____________________________________________________________________________
#
# Copyright (c) 2012-2013, Berlin Institute of Technology
# All rights reserved.
#
# Developed by:	Philipp Meier <pmeier82@gmail.com>
#
#               Neural Information Processing Group (NI)
#               School for Electrical Engineering and Computer Science
#               Berlin Institute of Technology
#               MAR 5-6, Marchstr. 23, 10587 Berlin, Germany
#               http://www.ni.tu-berlin.de/
#
# Repository:   https://github.com/pmeier82/BOTMpy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal with the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# * Redistributions of source code must retain the above copyright notice,
#   this list of conditions and the following disclaimers.
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimers in the documentation
#   and/or other materials provided with the distribution.
# * Neither the names of Neural Information Processing Group (NI), Berlin
#   Institute of Technology, nor the names of its contributors may be used to
#   endorse or promote products derived from this Software without specific
#   prior written permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# WITH THE SOFTWARE.
#_____________________________________________________________________________
#
# Acknowledgements:
#   Philipp Meier <pmeier82@gmail.com>
#_____________________________________________________________________________
#
# Changelog:
#   * <iso-date> <identity> :: <description>
#_____________________________________________________________________________
#


"""covariance estimator for timeseries data"""
__docformat__ = 'restructuredtext'
__all__ = ['BaseTimeSeriesCovarianceEstimator', 'TimeSeriesCovE',
           'XcorrStore', 'build_idx_set', 'build_block_toeplitz_from_xcorrs']

##  IMPORTS

import scipy as sp
from scipy import linalg as sp_la
from scipy import random as sp_rd
from collections import deque
from .funcs_general import xcorr
from .matrix_ops import (compute_coloured_loading, compute_diagonal_loading,
                         compute_matrix_cond)
from .util import INDEX_DTYPE

##  CLASSES

[docs]class BaseTimeSeriesCovarianceEstimator(object): """covariance estimator base class""" ## constructor def __init__(self, weight=0.05, cond=50, dtype=None): """ :type weight: float :param weight: from [0.0, 1.0]. new observations will be weighted and contribute to the update with the factor weight. (exp model) Default=0.05 :type cond: float :param cond: condition number to assert if the loaded matrix is requested. Default=50 :type dtype: dtype resolvable :param dtype: anything that can be used as a numpy.dtype object Default=float32 """ # members self.dtype = sp.dtype(dtype or sp.float32) # privates self._weight = weight self._cond = float(cond) self._is_initialised = False self._n_upd = 0 self._n_upd_smpl = 0 ## getter and setter methods
[docs] def get_cmx(self, **kwargs): if self._is_initialised is False: raise RuntimeError('Estimator has not been initialised!') return self._get_cmx(**kwargs)
def _get_cmx(self, **kwargs): raise NotImplementedError
[docs] def get_icmx(self, **kwargs): if self._is_initialised is False: raise RuntimeError('Estimator has not been initialised!') return self._get_icmx(**kwargs)
def _get_icmx(self, **kwargs): raise NotImplementedError
[docs] def get_svd(self, **kwargs): if self._is_initialised is False: raise RuntimeError('Estimator has not been initialised!') return self._get_svd(**kwargs)
def _get_svd(self, **kwargs): raise NotImplementedError
[docs] def get_whitening_op(self, **kwargs): if self._is_initialised is False: raise RuntimeError('Estimator has not been initialised!') return self._get_whitening_op(**kwargs)
def _get_whitening_op(self, **kwargs): raise NotImplementedError
[docs] def get_cond(self, **kwargs): if not self.is_initialised: raise RuntimeError('Estimator has not been initialised!') sv = self._get_svd(**kwargs)[1] return compute_matrix_cond(sv)
[docs] def is_cond_ok(self, **kwargs): cond = self.get_cond(**kwargs) return cond <= self._cond
[docs] def get_cmx_loaded(self, **kwargs): if not self.is_initialised: raise RuntimeError('Estimator has not been initialised!') kind = kwargs.get('kind', 'diagonal') if kind not in ['diagonal', 'coloured']: raise ValueError( 'kind must be one of \'diagonal\' or \'coloured\'!') cmx = self._get_cmx(**kwargs) svd = self._get_svd(**kwargs) return { 'coloured': compute_coloured_loading, 'diagonal': compute_diagonal_loading, }[kind](cmx, svd, self._cond)
[docs] def get_icmx_loaded(self, **kwargs): if not self.is_initialised: raise RuntimeError('Estimator has not been initialised!') lcmx = self.get_cmx_loaded(**kwargs) return sp_la.inv(lcmx)
[docs] def is_initialised(self): return self._is_initialised ## public methods
[docs] def reset(self): """reset the internal buffers to None""" self._is_initialised = False self._n_upd = 0 self._n_upd_smpl = 0 self._reset()
[docs] def update(self, data, **kwargs): """update covariance matrix with epochs of :data: :type data: ndarray :param data: data vector [samples, channels] :exception ValueError: `data` is not a ndarray with `ndim == 2` """ # checks data = sp.asarray(data, dtype=self.dtype) if data.ndim != 2: raise ValueError('data is not of rank 2') # relay n_smpl = self._update(data, **kwargs) if self._is_initialised is False: self._is_initialised = n_smpl > 0 if n_smpl > 0: self._n_upd += 1 self._n_upd_smpl += n_smpl ## private methods
def _update(self, data, **kwargs): # should return the number of samples that went into building the new # observation raise NotImplementedError def _reset(self): pass ## special methods def __str__(self, additional=''): return '%s(init=%s%s)' % (self.__class__.__name__, self._is_initialised, additional)
[docs]class TimeSeriesCovE(BaseTimeSeriesCovarianceEstimator): """covariance estimator for timeseries data Given strips of (multi-channeled) data, this covariance estimator is able to estimate the time-lagged (block-)covariance-matrix, which has a (block-)toeplitz structure. Estimates are built by taking (cross- and) auto-correlations of the (multi-channeled) data pieces, for all lags in the defined range. From this data a toeplitz matrix is build (for each combination of channels). """ # TODO: glibber glibber - better doc text ## constructor def __init__(self, tf_max=100, nc=4, weight=0.05, cond=50, with_default_chan_set=True, dtype=None): """see BaseTimeSeriesCovarianceEstimator :type tf_max: int :param tf_max: the maximum lag for the cross-correlation functions internally stored. the estimator will be able to provide covariance matrices for lags in [1..tf_max]. Default=100 :type nc: int :param nc: channel count of expected data. data that is feed to update the estimator is checked for this channel count. also determines the size of the internal storage for the correlation functions. Default=4 :type with_default_chan_set: bool :param with_default_chan_set: if True, add the default channel set """ # checks if tf_max <= 0: raise ValueError('tf_max <= 0') if nc <= 0: raise ValueError('nc <= 0') # super super(TimeSeriesCovE, self).__init__(weight=weight, cond=cond, dtype=dtype) # members self._tf_max = int(tf_max) self._nc = int(nc) self._store = XcorrStore(self._tf_max, self._nc) self._buf_cmx = {} self._buf_icmx = {} self._buf_svd = {} self._buf_whi = {} self._chan_set = [] # init if with_default_chan_set is True: self.new_chan_set(tuple(range(self._nc))) ## getter and setter - base def _process_keywords(self, kwargs): """return parameters from keywords :type kwargs: dict :param kwargs: keyword parameter dict :rtype: tuple :returns: tf, chan_set, dtype """ try: tf = int(kwargs.get('tf')) except: tf = int(self._tf_max) try: cs = tuple(kwargs.get('chan_set')) except: cs = tuple(range(self._nc)) return tf, cs def _get_cmx(self, **kwargs): """yield the current estimate :type chan_set: tuple :keyword chan_set: channel ids forming a valid channel set :type tf: int :keyword tf: max lags in samples :returns: ndarray - (block-) toeplitz covariance matrix """ tf, chan_set = self._process_keywords(kwargs) buf_key = (tf, chan_set) if buf_key not in self._buf_cmx: self._buf_cmx[buf_key] = build_block_toeplitz_from_xcorrs( tf, chan_set, self._store, dtype=self.dtype) return self._buf_cmx[buf_key] def _get_icmx(self, **kwargs): """yield the inverse of the current estimate :type chan_set: tuple :keyword chan_set: channel ids forming a valid channel set :type tf: int :keyword tf: max lags in samples :returns: ndarray - inverse (block-) toeplitz covariance matrix """ tf, chan_set = self._process_keywords(kwargs) buf_key = (tf, chan_set) if buf_key not in self._buf_icmx: svd = self._get_svd(**kwargs) self._buf_icmx[buf_key] = sp.dot( sp.dot(svd[0], sp.diag(1. / svd[1])), svd[2]) return self._buf_icmx[buf_key] def _get_svd(self, **kwargs): """yield the singular value decomposition of the current estimate :type chan_set: tuple :keyword chan_set: channel ids forming a valid channel set :type tf: int :keyword tf: max lags in samples :returns: tuple - U, s, Vh as returned by :scipy.linalg.svd: """ tf, chan_set = self._process_keywords(kwargs) buf_key = (tf, chan_set) if buf_key not in self._buf_svd: cmx = self._get_cmx(**kwargs) self._buf_svd[buf_key] = sp_la.svd(cmx) return self._buf_svd[buf_key] def _get_whitening_op(self, **kwargs): """yield the whitening operator with respect to the current estimate for observation from the vector space this matrix operates on. if C = Q.T * Q then Q^-1 is the whitening operator calculated via SVD :type chan_set: tuple :keyword chan_set: channel ids forming a valid channel set :type tf: int :keyword tf: max lags in samples :returns: ndarray - whitening operator matrix """ tf, chan_set = self._process_keywords(kwargs) buf_key = (tf, chan_set) if buf_key not in self._buf_whi: svd = self._get_svd(**kwargs) self._buf_whi[buf_key] = sp.dot( sp.dot(svd[0], sp.diag(sp.sqrt(1. / svd[1]))), svd[2]) return self._buf_whi[buf_key] # getter and setter - own
[docs] def get_tf_max(self): return self._tf_max
[docs] def set_tf_max(self, value): if value < 1: raise ValueError('tf_max < 1') self._tf_max = int(value) self.reset() # TODO: reset tf_max for self._store
tf_max = property(get_tf_max)
[docs] def get_nc(self): return self._nc
nc = property(get_nc)
[docs] def get_chan_set(self): return self._chan_set
[docs] def new_chan_set(self, cs_new): if max(cs_new) >= self._nc: raise ValueError( 'new channel set is incompatible with channel count!') if cs_new in self._chan_set: raise ValueError('channel set already included!') self._chan_set.append(tuple(sorted(cs_new)))
[docs] def rm_chan_set(self, cs_rm): if tuple(cs_rm) in self._chan_set: self._chan_set.remove(tuple(cs_rm)) else: raise ValueError('channel set not included!') ## special methods
def __str__(self): return super(TimeSeriesCovE, self).__str__(',tf=%s,nc=%s' % (self._tf_max, self._nc)) ## implementation base def _update(self, data, **kwargs): """updates the estimator with new data :type data: ndarray :param data: the data to operate on [samples, channels] :type epochs: ndarray :keyword epochs: epochs delimiting the data to take the estimate over :type min_len: int :keyword min_len: minimum length of epochs in samples :rtype: int :returns: number of samples that went into the sample """ # kwargs epochs = kwargs.get('epochs', None) min_len = kwargs.get('min_len', 3 * self._tf_max) # check data if data.shape[1] != self._nc: raise ValueError('channel count (columns) must be %d' % self._nc) if data.shape[0] < min_len: raise ValueError('must give at least %d samples of data' % min_len) # check epochs if epochs is None: epochs = sp.array([[0, data.shape[0]]], dtype=INDEX_DTYPE) else: epochs = sp.asarray(epochs) len_epoch = epochs[:, 1] - epochs[:, 0] if not any(len_epoch >= min_len): raise ValueError('no epochs with len >= min_len!') epochs = epochs[len_epoch > min_len] n_epoch = epochs.shape[0] len_epoch = epochs[:, 1] - epochs[:, 0] # FIX: we have to check if we have any epochs left here!! if n_epoch == 0: return 0 self._clear_buf() # calculate cross-correlation functions for new observation processed = {} for cs in self._chan_set: chan_set = build_idx_set(cs) for m, n in chan_set: if (m, n) not in processed: processed[m, n] = [] else: continue for e in xrange(n_epoch): xc = xcorr(data[epochs[e, 0]:epochs[e, 1], m], data[epochs[e, 0]:epochs[e, 1], n], lag=self._tf_max - 1, normalise=True) processed[m, n].append(xc * len_epoch[e]) for k in processed.keys(): processed[k] = sp.sum(processed[k], axis=0) / len_epoch.sum() if k in self._store: self._store[k] *= 1.0 - self._weight self._store[k] += self._weight * processed[k] else: self._store[k] = processed[k] # return return len_epoch.sum() def _clear_buf(self): self._buf_cmx.clear() self._buf_icmx.clear() self._buf_svd.clear() self._buf_whi.clear() def _reset(self): self._store.reset() self._clear_buf() # self._chan_set = [] # setting to default chan_set self._chan_set = [tuple(range(self._nc))] @staticmethod
[docs] def white_noise_init(tf_max, nc, std=1.0, dtype=None): """init as white noise :type tf_max: int :param tf_max: max length of template in samples :type nc: int :param nc: number of channels :type std: float :param std: standard deviation of white noise process Default=1.0 :type dtype: dtype resolvable :param dtype: :rtype: TimeSeriesCovE :return: TimeSeriesCovE with white noise init """ chan_set = tuple(range(nc)) rval = TimeSeriesCovE(tf_max=tf_max, nc=nc) for m, n in build_idx_set(chan_set): xc = sp.zeros(2 * tf_max - 1, dtype=dtype or sp.float32) if m == n: xc[tf_max - 1] = float(std * std) rval._store[m, n] = xc rval._is_initialised = True return rval
@staticmethod
[docs] def cmx_init(cmx, nc, tf_max=None, dtype=None): """init from given covariance matrix :type cmx: ndarray :param cmx: covariance matrix in concatenated representation :type nc: int :param nc: number of channels :type tf_max: int :param tf_max: max length of template in samples, if None derive it from the passed matrix. Default=None :rtype: TimeSeriesCovE :return: TimeSeriesCovE with the passed matrix set """ # init and check chan_set = tuple(range(nc)) tf = cmx.shape[0] / nc if tf_max is None: tf_max = tf rval = TimeSeriesCovE(tf_max=tf_max, nc=nc) # build xcorrs from matrix for m, n in build_idx_set(chan_set): xc = sp.zeros(2 * tf_max - 1, dtype=dtype or sp.float32) mx = cmx[m * tf:(m + 1) * tf, n * tf:(n + 1) * tf] xc[:tf] = mx[:, 0][::-1] xc[tf:] = mx[0, 1:] rval._store[m, n] = xc rval._is_initialised = True return rval
class TimeSeriesCovE2(TimeSeriesCovE): """additional representations of the underlying xcorr container""" def __init__(self, *args, **kwargs): # super super(TimeSeriesCovE2, self).__init__(*args, **kwargs) # members self._sample_vars = None def get_cov_ten(self, **kwargs): tf, chan_set = self._process_keywords(kwargs) return build_cov_tensor_from_xcorrs(tf, chan_set, self._store, dtype=self.dtype, both=kwargs.get('both', True)) def _clear_buf(self): super(TimeSeriesCovE2, self)._clear_buf() self._sample_vars = None self._sample_mem = None self._sample_coef = None def sample(self, n=1): """sample with ar model corresponding to the current estimate""" # need to fit? if self._sample_vars is None: print 'fitting VAR model..', #self._sample_vars = LWR(self.get_cov_ten(tf=min(12, # self._tf_max), both=False)) self._sample_vars = LWR(self.get_cov_ten(both=False)) self._sample_coef = sp.hstack([self._sample_vars[0][..., i] for i in xrange( self._sample_vars[0].shape[2])]) mem_size = self._sample_vars[0].shape[2] * self._nc self._sample_mem = deque([0] * self._tf_max * self._nc, maxlen=mem_size) self._sample(5000) print 'done!' return self._sample(n) def _sample(self, n=1): """sampling""" rval = sp_rd.multivariate_normal(sp.zeros(self._nc), self._sample_vars[1], n) for k in xrange(self._sample_vars[0].shape[2]): rval[k] += sp.dot(self._sample_coef, self._sample_mem) self._sample_mem.extendleft(rval[k, ::-1]) return rval
[docs]class XcorrStore(object): """storage for cross-correlations""" def __init__(self, tf=100, nc=4): """ :type tf: int :param tf: length of the channel xcorrs in samples Default=100 :type nc: int :param nc: channel count for the storage Default=4 """ # checks if tf <= 0: raise ValueError('need tf > 1') if nc <= 0: raise ValueError('need nc > 1') # members self._tf = int(tf) self._nc = int(nc) self._store = {} def __getitem__(self, key): self._check_index(key) return self._store[key] def __setitem__(self, key, value): self._check_index(key) self._check_value(value) self._store[key] = value def __contains__(self, key): return key in self._store def __iter__(self): return self._store.__iter__() def _check_index(self, key): if not isinstance(key, tuple): raise IndexError('needs 2dim index!') if len(key) != 2: raise IndexError('needs 2dim index') if key[1] < key[0]: raise KeyError('x-index must be >= y-index!') if 0 > key[0] >= self._nc or 0 > key[1] >= self._nc: raise IndexError('index out of bounds! nc = %d' % self._nc) def _check_value(self, value): if not isinstance(value, sp.ndarray): raise TypeError('value needs to be ndarray') if value.ndim != 1: raise ValueError('value needs to be ndim==1') if value.size != (self._tf * 2) - 1: raise ValueError( 'value needs to be size==%d' % int(self._tf * 2 - 1))
[docs] def reset(self): self._store.clear()
[docs]def build_idx_set(ids): """builds the block index set for an upper triangular matrix :type ids: iterable :param ids: set of channel ids """ return [(ids[i], ids[j]) for i in xrange(len(ids)) for j in xrange(i, len(ids))]
[docs]def build_block_toeplitz_from_xcorrs(tf, chan_set, xcorrs, dtype=None): """builds a block toeplitz matrix from a set of channel xcorrs :type tf: int :param tf: desired lag in samples :type chan_set: list :param chan_set: list of channel ids to build the channel set from. the block covariance matrix will be build so that blocks are ordered from lower to higher channel id. :type xcorrs: XcorrStore :param xcorrs: XcorrStore object holding the xcorrs for various channel combinations :type dtype: dtype derivable :param dtype: will be passed to the constructor for the matrix returned. Default=None """ # init and checks assert tf <= xcorrs._tf chan_set = sorted(chan_set) nc = len(chan_set) assert all(sp.diff(chan_set) >= 1) assert max(chan_set) < xcorrs._nc assert all([key in xcorrs for key in build_idx_set(chan_set)]), 'no data for requested channels' rval = sp.empty((tf * nc, tf * nc), dtype=dtype) # build blocks and insert into fout for i in xrange(nc): m = chan_set[i] for j in xrange(i, nc): n = chan_set[j] xc = xcorrs[m, n] sample0 = xc.size / 2 r = xc[sample0:sample0 + tf] c = xc[sample0 + 1 - tf:sample0 + 1][::-1] #c = xc[sample0:sample0 - tf:-1] block_ij = sp_la.toeplitz(c, r) rval[i * tf:(i + 1) * tf, j * tf:(j + 1) * tf] = block_ij if i != j: rval[j * tf:(j + 1) * tf, i * tf:(i + 1) * tf] = block_ij.T # return return rval
def build_cov_tensor_from_xcorrs(tf, chan_set, xcorrs, dtype=None, both=False): """builds a covariance tensor from a set of channel xcorrs The tensor will hold the forward (positive lags) covariances for all auto-/cross-correlations in the chan_set :type tf: int :param tf: desired lag in samples :type chan_set: list :param chan_set: list of channel ids to build the channel set from. the covariance tensor will be build so that the chan_set is indexed natively. :type xcorrs: XcorrStore :param xcorrs: XcorrStore object holding the xcorrs for various channel combinations :type dtype: dtype derivable :param dtype: will be passed to the constructor for the matrix returned. Default=None """ # init and checks assert tf <= xcorrs._tf chan_set = sorted(chan_set) nc = len(chan_set) assert all(sp.diff(chan_set) >= 1) assert max(chan_set) < xcorrs._nc assert all([key in xcorrs for key in build_idx_set(chan_set)]), 'no data for requested channels' xc_len = tf + both * (tf - 1) rval = sp.empty((nc, nc, xc_len), dtype=dtype) # write single xcorrs for i in xrange(nc): m = chan_set[i] for j in xrange(i, nc): n = chan_set[j] xc = xcorrs[m, n] sample0 = xc.size / 2 bakw = xc[:sample0 + 1][:tf - 1:-1] comb = None if both is True: rval[i, j, :] = xc[sample0 - tf + 1:sample0 + tf] else: rval[i, j, :] = xc[sample0:][:tf] if i != j: if both is True: rval[j, i, :] = xc[::-1][sample0 - tf + 1:sample0 + tf] else: rval[j, i, :] = xc[::-1][:tf] # return return rval def LWR(R): """multivariate Levinson-Durbin recursion, (Whittle, Wiggins and Robinson) aka LWR We assume all quantities to be well behaving: real, pos_sem_def, yadda yadda Reference: "Covariance characterization by partial autocorrelation matrices", Morf, Vieira and Kailath, The Annals or Statistics 1978, Vol. 6, No. 3, 643-648 :type R: ndarray :param R: xcorr sequence (nc, nc, N+1), should cover one more lag than the desired model order :rtype: ndarray, ndarray :return: ar coefficient sequence (nc, nc, N); driving process covariance estimate (nc, nc) """ # R is (nc, nc, N+1) N = R.shape[2] - 1 nc = R.shape[0] # coefficients A = sp.zeros((nc, nc, N)) # forward (ar) B = sp.zeros((nc, nc, N)) # backward (lp) # coefficient error covariances err_e = sp.zeros((nc, nc)) # forward prediction error covariance err_e = R[..., 0] err_r = sp.zeros((nc, nc)) # backward prediction error covariance err_r = R[..., 0] # intermediate update term Delta = sp.empty((nc, nc)) # iterate for n in xrange(N): # (9) # \Delta_{n+1} = R_{n+1} + \sum_{i=1}^{n} A_{n,i} R_{n+1-i} Delta[:] = R[..., n + 1] for i in xrange(1, n + 1): Delta += sp.dot(A[..., i - 1], R[..., n + 1 - i]) # intermediate for (7), (8), (10), (11) # \Delta_{n+1} (\sigma_{n}^{r})^{-1} delta_err_r_inv = sp_la.solve(err_r, Delta) #delta_err_r_inv = sp.dot(Delta, la.inv(err_r)) # \Delta_{n+1}^{T} (\sigma_{n}^{\epsilon})^{-1} deltaT_err_e_inv = sp_la.solve(err_e, Delta.T) #deltaT_err_e_inv = sp.dot(Delta.T, la.inv(err_e)) AA = A.copy() # (7) # A_{n+1} = [I,A_{n,1},..,A_{n,n},0] - \Delta_{n+1}(\sigma_{n}^{r})^{ # -1} [0,B_{n,n},..,B_{n,1},I] for i in xrange(1, n + 1): A[..., i - 1] -= sp.dot(delta_err_r_inv, B[..., n - i]) A[..., n] = -delta_err_r_inv # (8) # B_{n+1} = [0,B_{n,n},..,B_{n,1},I] - \Delta_{n+1}^{T}(\sigma_{n}^{ # \epsilon})^{-1} [I,A_{n,1},..,A_{n,n},0] for i in xrange(1, n + 1): B[..., i - 1] -= sp.dot(deltaT_err_e_inv, AA[..., n - i]) B[..., n] = -deltaT_err_e_inv # (10) # \sigma_{n+1}^{\epsilon} = \sigma_{n}^{\epsilon} - \Delta_{n+1}( # \sigma_{n}^{r})\Delta_{n+1}^{T} err_e -= sp.dot(delta_err_r_inv, Delta.T) # (11) # \sigma_{n+1}^{r} = \sigma_{n}^{r} - \Delta_{n+1}^{T}(\sigma_{n}^{ # \epsilon})\Delta_{n+1} err_r -= sp.dot(deltaT_err_e_inv, Delta) # return return A, err_e ## MAIN if __name__ == '__main__': pass

Project Versions

This Page