Source code for botmpy.common.datafile.xpd

# -*- coding: utf-8 -*-
#_____________________________________________________________________________
#
# Copyright (c) 2012 Berlin Institute of Technology
# All rights reserved.
#
# Developed by:	Philipp Meier <pmeier82@gmail.com>
#               Neural Information Processing Group (NI)
#               School for Electrical Engineering and Computer Science
#               Berlin Institute of Technology
#               MAR 5-6, Marchstr. 23, 10587 Berlin, Germany
#               http://www.ni.tu-berlin.de/
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal with the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# * Redistributions of source code must retain the above copyright notice,
#   this list of conditions and the following disclaimers.
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimers in the documentation
#   and/or other materials provided with the distribution.
# * Neither the names of Neural Information Processing Group (NI), Berlin
#   Institute of Technology, nor the names of its contributors may be used to
#   endorse or promote products derived from this Software without specific
#   prior written permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# WITH THE SOFTWARE.
#_____________________________________________________________________________
#
# Acknowledgements:
#   Philipp Meier <pmeier82@gmail.com>
#_____________________________________________________________________________
#


"""datafile implementation for xpd file format"""
__docformat__ = 'restructuredtext'
__all__ = ['XpdFile', '_XPD_TH', '_XPD_CH']

##---IMPORTS

import scipy as sp
from struct import Struct
from .datafile import DataFile, DataFileError

##---CONSTANTS

_CONV_I = Struct('I')
_CONV_H = Struct('H')
_CONV_B = Struct('B')
_CONV_d = Struct('d')
_CONV_s = Struct('s')
_CONV_7s = Struct('7s')
_CONV_255s = Struct('255s')

##--- CLASSES

class _XPD_TH(object):
    """XPD trial header struct

        ===== =============================
        Size  XPD Trial Header Member
        ===== =============================
        H     header size
        I     data size
        7s    name of generating program
        B     version identifier = 86 / 'V'
        H     NM build no.
        B     high ver. = 0
        B     low ver. = 12
        I     trial no.
        B     stimulus code
        B     error byte
        4H    time stamp
        255s  comment
        255s  additional comment
        Total 542 bytes
        ===== =============================
    """

    SIZE = 542

    def __init__(self, fp):
        """
        :type fp: file
        :param fp: open file at seek(header_start)
        """

        # read buffer
        buf = fp.read(self.SIZE)

        # extract trial header information
        self.header_size = _CONV_H.unpack(buf[0:2])[0]
        self.data_size = _CONV_I.unpack(buf[2:6])[0]
        self.name = _CONV_7s.unpack(buf[6:13])[0]
        self.version = _CONV_s.unpack(buf[13:14])[0]
        self.nm_build_no = _CONV_H.unpack(buf[14:16])[0]
        self.high_ver = _CONV_B.unpack(buf[16:17])[0]
        self.low_ver = _CONV_B.unpack(buf[17:18])[0]
        self.trial_no = _CONV_I.unpack(buf[18:22])[0]
        self.stimulus = _CONV_B.unpack(buf[22:23])[0]
        self.error = _CONV_B.unpack(buf[23:24])[0]
        self.timestamp = (_CONV_H.unpack(buf[24:26])[0],
                          _CONV_H.unpack(buf[26:28])[0],
                          _CONV_H.unpack(buf[28:30])[0],
                          _CONV_H.unpack(buf[30:32])[0])
        self.comment = _CONV_255s.unpack(buf[32:287])[0]
        self.comment = self.comment[:self.comment.find('\x00')]
        self.add_comment = _CONV_255s.unpack(buf[287:542])[0]
        self.add_comment = self.add_comment[:self.add_comment.find('\x00')]

    def __str__(self):
        rval = self.__repr__()
        rval += '\nheader_size\t%d\n' % self.header_size
        rval += 'data_size\t%d\n' % self.data_size
        rval += 'name\t\t%s\n' % self.name
        rval += 'version\t\t%s\n' % self.version
        rval += 'nm_build_no\t%d\n' % self.nm_build_no
        rval += 'high_ver\t%d\n' % self.high_ver
        rval += 'low_ver\t\t%d\n' % self.low_ver
        rval += 'trial_no\t%d\n' % self.trial_no
        rval += 'stimulus\t%d\n' % self.stimulus
        rval += 'error\t%d\n' % self.error
        rval += 'timestamp\t%s\n' % str(self.timestamp)
        rval += 'comment\t\t%s\n' % self.comment
        rval += 'add_comment\t%s\n' % self.add_comment
        return rval


class _XPD_CH(object):
    """XPD channel header struct

        ===== =====================
        Size  XPD Channel Header
        ===== =====================
        I     channel no.
        d     sample rate in kHz
        d     offset-x of channel
        I     data length in samples
        Total 24 bytes
        ===== =====================
    """

    SIZE = 24

    def __init__(self, fp):
        """
        :type fp: file
        :param fp: open file at seek(header_start)
        """

        # read buffer
        buf = fp.read(self.SIZE)

        # extract channel header information
        self.channel_no = _CONV_I.unpack(buf[0:4])[0]
        self.sample_rate = _CONV_d.unpack(buf[4:12])[0]
        self.x_offset = _CONV_d.unpack(buf[12:20])[0]
        self.n_sample = _CONV_I.unpack(buf[20:24])[0]
        self.data_offset = fp.tell()

    def __str__(self):
        rval = self.__repr__()
        rval += '\nchannel_no\t%d\n' % self.channel_no
        rval += 'sample_rate\t%f\n' % self.sample_rate
        rval += 'x_offset\t%f\n' % self.x_offset
        rval += 'n_sample\t%d\n' % self.n_sample
        rval += 'data_offset\t%d' % self.data_offset
        return rval


[docs]class XpdFile(DataFile): """XPD file from - Matthias Munk Group @ MPI Tübingen""" ## constructor def __init__(self, filename=None, dtype=None, cache=False): # members self.trial_header = None self.n_achan = None self.n_dchan = None self.n_echan = None self.max_achan = None self.achan_header = None self.dchan_header = None self.echan_header = None self.cache = cache self._cache = None # super super(XpdFile, self).__init__(filename=filename, dtype=dtype) def __del__(self): super(XpdFile, self).__del__() self._cache = None ## implementation def _initialize_file(self, filename, **kwargs): # open file self.fp = open(filename, 'rb') # trial header if _CONV_H.unpack(self.fp.read(2))[0] != 120: self.fp.close() raise DataFileError('unexpected input while reading trial ' 'header for file %s' % self.fp.name) self.trial_header = _XPD_TH(self.fp) # analog channel headers if _CONV_H.unpack(self.fp.read(2))[0] != 123: self.fp.close() raise DataFileError('unexpected input while reading analog ' 'channel header for file %s' % self.fp.name) self.n_achan = _CONV_I.unpack(self.fp.read(4))[0] self.achan_header = {} self.max_achan = -1 for _ in xrange(self.n_achan): ach = _XPD_CH(self.fp) if ach.channel_no > self.max_achan: self.max_achan = ach.channel_no self.achan_header[ach.channel_no] = ach self.fp.seek(ach.n_sample * 2, 1) # init the cache for the analog channels if self.cache is True: self._cache = {} # digital channel headers if _CONV_H.unpack(self.fp.read(2))[0] != 121: self.fp.close() raise DataFileError('unexpected input while reading digital ' 'channel header for file %s' % self.fp.name) self.n_dchan = _CONV_I.unpack(self.fp.read(4))[0] self.dchan_header = {} for _ in xrange(self.n_dchan): dch = _XPD_CH(self.fp) self.dchan_header[dch.channel_no] = dch self.fp.seek(dch.n_sample * 4, 1) # event channel headers if _CONV_H.unpack(self.fp.read(2))[0] != 122: self.fp.close() raise DataFileError('unexpected input while reading event ' 'channel header for file %s' % self.fp.name) self.n_echan = _CONV_I.unpack(self.fp.read(4))[0] self.echan_header = {} for _ in xrange(self.n_echan): ech = _XPD_CH(self.fp) self.echan_header[ech.channel_no] = ech self.fp.seek(ech.n_sample * 4, 1) self.fp.seek(0) def _close(self): self.fp.close() def _closed(self): return self.fp.closed def _filename(self): return self.fp.name def _get_data(self, **kwargs): """returns a numpy array of the data with samples on the rows and channels on the columns. channels may be selected via the channels parameter. get data for one tetrode (default all channels) as ndarray :type item: int :keyword item: tetrode id. starts at 1!! Default = 1 :type chans: list :keyword chans: Channel list. Default = [0,1,2,3] """ # keywords item = kwargs.get('item', 1) chans = kwargs.get('chans', [0, 1, 2, 3]) # init my_chans = [item + chans[i] * 16 for i in xrange(len(chans))] nsample = 0 for i in xrange(4): if my_chans[i] not in self.achan_header: continue else: if self.achan_header[my_chans[i]].n_sample > nsample: nsample = self.achan_header[my_chans[i]].n_sample if nsample == 0: raise IndexError('no data for tetrode %s' % item) # collect data rval = sp.zeros((nsample, len(chans)), dtype=self.dtype) for i in xrange(len(my_chans)): load_item = None # check cache if self.cache is True: if my_chans[i] in self._cache: item = self._cache[my_chans[i]] if not isinstance(item, sp.ndarray): load_item = None else: load_item = item # load from file if load_item is None: try: load_item = self._get_achan(my_chans[i]) except IndexError: load_item = sp.zeros(nsample, dtype=self.dtype) if self.cache is True: self._cache[my_chans[i]] = load_item # unfortunately the channel shapes are not always consistent # across the tetrode. but we preallocate space such that the # largest item may fit in the buffer. rval[:load_item.size, i] = load_item # return stuff if not rval.any(): raise IndexError('no data for tetrode %s' % item) return rval ## private helpers def _get_achan(self, idx): """yields an analog channel as ndarray :type idx: int :param idx: channel id """ # checks if idx not in self.achan_header: raise IndexError('no data for this channel: %s' % idx) # get data self.fp.seek(self.achan_header[idx].data_offset) byte_data = self.fp.read(self.achan_header[idx].n_sample * 2) # return if len(byte_data) == 0: return sp.array([], dtype=sp.int16) return sp.frombuffer(byte_data, dtype=sp.int16) def _get_echan(self, idx): """yields an event channel as ndarray :type idx: int :param idx: channel id. """ # checks if idx not in self.echan_header: raise IndexError('no data for this channel: %s' % idx) # get data self.fp.seek(self.echan_header[idx].data_offset, 0) byte_data = self.fp.read(self.echan_header[idx].n_sample * 4) # return if len(byte_data) == 0: return sp.array([], dtype=sp.int32) return sp.frombuffer(byte_data, dtype=sp.int32)
[docs] def get_available_tetrodes(self): """yields the set of available tetrodes""" return [k for k in self.achan_header.keys() if 1 <= k <= 16]
##--- MAIN if __name__ == '__main__': arc = XpdFile('/home/phil/Data/Munk/Louis/L011/L0111001.xpd') X = arc.get_data(item=1) print X del arc, X