# -*- coding: utf-8 -*-
"""This module defines a base class for format specific trajectory classes."""
from os.path import isfile, abspath, split, splitext
from numbers import Integral
import numpy as np
from prody import LOGGER
from prody.utilities import relpath
from .trajbase import TrajBase
__all__ = ['TrajFile']
[docs]class TrajFile(TrajBase):
"""A base class for trajectory file classes:
* :class:`.DCDFile`"""
def __init__(self, filename, mode='r'):
"""Open *filename* for reading (default, ``mode="r"``), writing
(``mode="w"``), or appending (``mode="r+"`` or ``mode="a"``).
Binary mode option will be appended automatically."""
if not isinstance(filename, str):
raise TypeError("filename argument must be a string")
if not isinstance(mode, str):
TypeError('mode argument must be string')
if not mode in ('r', 'w', 'a', 'r+'):
ValueError("mode string must begin with one of 'r', 'w', 'r+', or "
"'a'")
name = splitext(split(filename)[1])[0]
TrajBase.__init__(self, name)
self._file = None
if mode == 'r' and not isfile(filename):
raise IOError("[Errno 2] No such file or directory: '{0}'"
.format(filename))
self._filename = filename
if mode in ('a', 'r+'):
self._file = open(filename, 'r+b')
self._file.seek(0)
mode = 'a'
else:
if not mode.endswith('b'):
mode += 'b'
self._file = open(filename, mode)
self._mode = mode
self._bytes_per_frame = None
self._first_byte = None
self._dtype = np.float32
self._timestep = 1
self._first_ts = 0
self._framefreq = 1
self._n_fixed = 0
def __del__(self):
if self._file is not None:
self._file.close()
def __repr__(self):
if self._closed:
return ('<{0}: {1} (closed)>').format(
self.__class__.__name__, self._title)
link = ''
if self._ag is not None:
link = 'linked to ' + str(self._ag) + '; '
if self._mode.startswith('r'):
next = 'next {0} of {1} frames; '.format(self._nfi,
self._n_csets)
else:
next = '{0} frames written; '.format(self._n_csets)
if self._indices is None:
atoms = '{0} atoms'.format(self._n_atoms)
else:
atoms = 'selected {0} of {1} atoms'.format(
self.numSelected(), self._n_atoms)
return '<{0}: {1} ({2}{3}{4})>'.format(
self.__class__.__name__, self._title, link, next, atoms)
[docs] def getFilename(self, absolute=False):
"""Returns relative path to the current file. For absolute path,
pass ``absolute=True`` argument."""
if absolute:
return abspath(self._filename)
return relpath(self._filename)
[docs] def getFrame(self, index):
"""Returns frame at given *index*."""
if self._closed:
raise ValueError('I/O operation on closed file')
if not isinstance(index, Integral):
raise IndexError('index must be an integer')
if not 0 <= index < self._n_csets:
raise IndexError('index must be greater or equal to 0 and less '
'than number of frames')
nfi = self._nfi
if index > nfi:
self.skip(index - nfi)
elif index < nfi:
self.reset()
if index > 0:
self.skip(index)
return next(self)
getFrame.__doc__ = TrajBase.getFrame.__doc__
[docs] def getCoordsets(self, indices=None):
if self._closed:
raise ValueError('I/O operation on closed file')
if indices is None:
indices = np.arange(self._n_csets)
elif isinstance(indices, int):
indices = np.array([indices])
elif isinstance(indices, slice):
indices = np.arange(*indices.indices(self._n_csets))
indices.sort()
elif isinstance(indices, (list, np.ndarray)):
indices = np.unique(indices)
else:
raise TypeError('indices must be an integer or a list of integers')
nfi = self._nfi
self.reset()
n_atoms = self.numSelected()
coords = np.zeros((len(indices), n_atoms, 3), self._dtype)
prev = 0
next = self.nextCoordset
for i, index in enumerate(indices):
diff = index - prev
if diff > 1:
self.skip(diff-1)
xyz = next()
if xyz is None:
LOGGER.warning('Expected {0} frames, but parsed {1}.'
.format(len(indices), i))
self.goto(nfi)
return coords[:i]
coords[i] = xyz
prev = index
self.goto(nfi)
return coords
getCoordsets.__doc__ = TrajBase.getCoordsets.__doc__
[docs] def skip(self, n):
if self._closed:
raise ValueError('I/O operation on closed file')
if not isinstance(n, Integral):
raise ValueError('n must be an integer')
if n > 0:
left = self._n_csets - self._nfi
if n > left:
n = left
self._file.seek(n * self._bytes_per_frame, 1)
self._nfi += n
skip.__doc__ = TrajBase.skip.__doc__
[docs] def goto(self, n):
if self._closed:
raise ValueError('I/O operation on closed file')
if not isinstance(n, int):
raise ValueError('n must be an integer')
n_csets = self._n_csets
if n == 0:
self.reset()
else:
if n < 0:
n = n_csets + n
if n < 0:
n = 0
elif n > n_csets:
n = n_csets
self._file.seek(self._first_byte + n * self._bytes_per_frame)
self._nfi = n
goto.__doc__ = TrajBase.goto.__doc__
[docs] def reset(self):
if self._closed:
raise ValueError('I/O operation on closed file')
self._file.seek(self._first_byte)
self._nfi = 0
reset.__doc__ = TrajBase.reset.__doc__
[docs] def close(self):
self._file.close()
self._nfi = 0
self._closed = True
close.__doc__ = TrajBase.close.__doc__
[docs] def getTimestep(self):
"""Returns timestep size."""
return self._timestep
[docs] def getFirstTimestep(self):
"""Returns first timestep value."""
return self._first_ts
[docs] def getFrameFreq(self):
"""Returns timesteps between frames."""
return self._framefreq
[docs] def numFixed(self):
"""Returns number of fixed atoms."""
return self._n_fixed