Source code for prody.trajectory.dcdfile

# -*- coding: utf-8 -*-
"""This module defines classes for handling trajectory files in `DCD format`_.

.. _DCD format: http://www.ks.uiuc.edu/Research/namd/2.6/ug/node13.html"""

import os
from time import time
from struct import calcsize, unpack, pack
from os.path import getsize
import datetime

import numpy as np
from numpy import float32, fromstring

from prody.atomic import Atomic
from prody.ensemble import Ensemble
from prody.utilities import checkCoords
from prody import LOGGER, PY2K

from .frame import Frame
from .trajbase import TrajBase
from .trajfile import TrajFile
from .trajectory import Trajectory

if PY2K:
    range = xrange

__all__ = ['DCDFile', 'parseDCD', 'writeDCD']

now = datetime.datetime.now

DEBUG = False

PISQUARE = np.pi ** 2
RECSCALE32BIT = 1
RECSCALE64BIT = 2

[docs]class DCDFile(TrajFile): """A class for reading and writing DCD files. DCD header and first frame is parsed at instantiation. Coordinates from the first frame is set as the reference coordinate set. This class has been tested for 32-bit DCD files. 32-bit floating-point coordinate array can be casted automatically to a specified type, such as 64-bit float, using *astype* keyword argument, i.e. ``astype=float``, using :meth:`ndarray.astype` method.""" def __init__(self, filename, mode='rb', **kwargs): TrajFile.__init__(self, filename, mode) self._astype = kwargs.get('astype', None) if not self._mode.startswith('w'): self._parseHeader() __init__.__doc__ = TrajFile.__init__.__doc__ def _parseHeader(self): """Read the header information from a dcd file. Input: fd - a file struct opened for binary reading. Output: 0 on success, negative error code on failure. Side effects: *natoms set to number of atoms per frame *nsets set to number of frames in dcd file *istart set to starting timestep of dcd file *nsavc set to timesteps between dcd saves *delta set to value of trajectory timestep *nfixed set to number of fixed atoms *freeind may be set to heap-allocated space *reverse set to one if reverse-endian, zero if not. *charmm set to internal code for handling charmm data. """ dcd = self._file endian = b'' #'=' # native endian rec_scale = RECSCALE32BIT charmm = None dcdcordmagic = unpack(endian + b'i', b'CORD')[0] # Check magic number in file header and determine byte order bits = dcd.read(calcsize('ii')) temp = unpack(endian + b'ii', bits) if temp[0] + temp[1] == 84: LOGGER.info('Detected CHARMM -i8 64-bit DCD file of native ' 'endianness.') rec_scale = RECSCALE64BIT elif temp[0] == 84 and temp[1] == dcdcordmagic: pass #LOGGER.info('Detected standard 32-bit DCD file of native ' # 'endianness.') else: if unpack(b'>ii', bits) == temp: endian = '>' else: endian = '<' temp = unpack(endian + b'ii', bits) if temp[0] + temp[1] == 84: rec_scale = RECSCALE64BIT LOGGER.info('Detected CHARMM -i8 64-bit DCD file of opposite ' 'endianness.') else: endian = '' temp = unpack(endian + b'ii', bits) if temp[0] == 84 and temp[1] == dcdcordmagic: LOGGER.info('Detected standard 32-bit DCD file of ' 'opposite endianness.') else: raise IOError('Unrecognized DCD header or unsupported ' 'DCD format.') # check for magic string, in case of long record markers if rec_scale == RECSCALE64BIT: raise IOError('CHARMM 64-bit DCD files are not yet supported.'); temp = unpack(b'I', dcd.read(calcsize('I'))) if temp[0] != dcdcordmagic: raise IOError('Failed to find CORD magic in CHARMM -i8 64-bit ' 'DCD file.'); # Buffer the entire header for random access bits = dcd.read(80) # CHARMm-genereate DCD files set the last integer in the # header, which is unused by X-PLOR, to its version number. # Checking if this is nonzero tells us this is a CHARMm file # and to look for other CHARMm flags. temp = unpack(endian + b'i'*20 , bits) if temp[-1] != 0: charmm = True if charmm: #LOGGER.info('CHARMM format DCD file (also NAMD 2.1 and later).') temp = unpack(endian + b'i'*9 + b'f' + b'i'*10 , bits) else: LOGGER.info('X-PLOR format DCD file (also NAMD 2.0 and earlier) ' 'is not supported.') return None # Store the number of sets of coordinates (NSET) self._n_csets = temp[0] # Store ISTART, the starting timestep self._first_ts = temp[1] # Store NSAVC, the number of timesteps between dcd saves self._framefreq = temp[2] # Store NAMNF, the number of fixed atoms self._n_fixed = temp[8] if self._n_fixed > 0: raise IOError('DCD files with fixed atoms is not yet supported.') # Read in the timestep, DELTA # Note: DELTA is stored as double with X-PLOR but as float with CHARMm self._timestep = temp[9] self._unitcell = temp[10] == 1 # Get the end size of the first block if unpack(endian + b'i', dcd.read(rec_scale * calcsize('i')))[0] != 84: raise IOError('Unrecognized DCD format.') # Read in the size of the next block temp = unpack(endian + b'i', dcd.read(rec_scale * calcsize('i'))) if (temp[0] - 4) % 80 != 0: raise IOError('Unrecognized DCD format.') noremarks = temp[0] == 84 # Read NTITLE, the number of 80 character title strings there are temp = unpack(endian + b'i', dcd.read(rec_scale * calcsize('i'))) self._dcdtitle = dcd.read(80) if not noremarks: self._remarks = dcd.read(80) # Get the ending size for this block temp = unpack(endian + b'i', dcd.read(rec_scale * calcsize('i'))) if (temp[0] - 4) % 80 != 0: raise IOError('Unrecognized DCD format.') # Read in an integer '4' if unpack(endian + b'i', dcd.read(rec_scale * calcsize('i')))[0] != 4: raise IOError('Unrecognized DCD format.') # Read in the number of atoms self._n_atoms = unpack(endian + b'i', dcd.read(rec_scale*calcsize('i')))[0] # Read in an integer '4' if unpack(endian + b'i', dcd.read(rec_scale * calcsize('i')))[0] != 4: raise IOError('Bad DCD format.') self._is64bit = rec_scale == RECSCALE64BIT self._endian = endian self._n_floats = (self._n_atoms + 2) * 3 if self._is64bit: if self._unitcell: self._bytes_per_frame = 56 + self._n_floats * 8 else: self._bytes_per_frame = self._n_floats * 8 LOGGER.warning('Reading of 64 bit DCD files has not been tested. ' 'Please report any problems that you may find.') self._dtype = np.float64 self._itemsize = 8 else: if self._unitcell: self._bytes_per_frame = 56 + self._n_floats * 4 else: self._bytes_per_frame = self._n_floats * 4 self._dtype = np.float32 self._itemsize = 4 self._first_byte = self._file.tell() n_csets = (getsize(self._filename) - self._first_byte ) // self._bytes_per_frame if n_csets != self._n_csets: LOGGER.warning('DCD header claims {0} frames, file size ' 'indicates there are actually {1} frames.' .format(self._n_csets, n_csets)) self._n_csets = n_csets self._coords = self.nextCoordset() self._file.seek(self._first_byte) self._nfi = 0
[docs] def hasUnitcell(self): return self._unitcell
hasUnitcell.__doc__ = TrajBase.hasUnitcell.__doc__
[docs] def getRemarks(self): """Returns remarks parsed from DCD file.""" return self._remarks
def __next__(self): if self._closed: raise ValueError('I/O operation on closed file') nfi = self._nfi if nfi < self._n_csets: unitcell = self._nextUnitcell() coords = self._nextCoordset() if self._ag is None: frame = Frame(self, nfi, coords, unitcell) else: frame = self._frame Frame.__init__(frame, self, nfi, None, unitcell) return frame __next__.__doc__ = TrajBase.__next__.__doc__ next = __next__
[docs] def nextCoordset(self): """Returns next coordinate set.""" if self._closed: raise ValueError('I/O operation on closed file') if self._nfi < self._n_csets: #Skip extended system coordinates (unit cell data) if self._unitcell: self._file.seek(56, 1) if self._indices is None: return self._nextCoordset() else: return self._nextCoordset()[self._indices]
def _nextCoordset(self): n_floats = self._n_floats n_atoms = self._n_atoms xyz = fromstring(self._file.read(self._itemsize * n_floats), self._dtype) if len(xyz) != n_floats: return None xyz = xyz.reshape((3, n_atoms+2)).T[1:-1,:] xyz = xyz.reshape((n_atoms, 3)) if self._ag is not None: self._ag._setCoords(xyz, self._title + ' frame ' + str(self._nfi), overwrite=True) self._nfi += 1 if self._astype is not None and self._astype != xyz.dtype: xyz= xyz.astype(self._astype) return xyz nextCoordset.__doc__ = TrajBase.nextCoordset.__doc__ def _nextUnitcell(self): if self._unitcell: self._file.read(4) unitcell = fromstring(self._file.read(48), dtype=np.float64) unitcell = unitcell[[0,2,5,1,3,4]] if np.all(abs(unitcell[3:]) <= 1): # This file was generated by CHARMM, or by NAMD > 2.5, with the angle */ # cosines of the periodic cell angles written to the DCD file. */ # This formulation improves rounding behavior for orthogonal cells */ # so that the angles end up at precisely 90 degrees, unlike acos(). */ unitcell[3:] = 90. - np.arcsin(unitcell[3:]) * 90 / PISQUARE self._file.read(4) return unitcell
[docs] def getCoordsets(self, indices=None): """Returns coordinate sets at given *indices*. *indices* may be an integer, a list of integers or **None**. **None** returns all coordinate sets.""" if self._closed: raise ValueError('I/O operation on closed file') if (self._indices is None and (indices is None or indices == slice(None))): nfi = self._nfi self.reset() n_floats = self._n_floats + self._unitcell * 14 n_atoms = self._n_atoms n_csets = self._n_csets data = self._file.read(self._itemsize * n_floats * n_csets) data = fromstring(data, self._dtype) if len(data) > n_floats * n_csets: n_csets = len(data)/n_floats data = data[:n_csets] LOGGER.warning('DCD is corrupt, {0} out of {1} frames ' 'were parsed.'.format(n_csets, self._n_csets)) data = data.reshape((n_csets, n_floats)) if self._unitcell: data = data[:, 14:] data = data.reshape((n_csets, 3, n_atoms+2)) data = data[:, :, 1:-1] data = data.transpose(0, 2, 1) self.goto(nfi) if self._astype is not None and self._astype != data.dtype: data = data.astype(self._astype) return data else: return TrajFile.getCoordsets(self, indices)
getCoordsets.__doc__ = TrajBase.getCoordsets.__doc__
[docs] def write(self, coords, unitcell=None, **kwargs): """Write *coords* to a file open in 'a' or 'w' mode. *coords* may be a NUmpy array or a ProDy object that stores or points to coordinate data. Note that all coordinate sets of ProDy object will be written. Number of atoms will be determined from the file or based on the size of the first coordinate set written. If *unitcell* is provided for the first coordinate set, it will be expected for the following coordinate sets as well. If *coords* is an :class:`~.Atomic` or :class:`~.Ensemble` all coordinate sets will be written. Following keywords are used when writing the first coordinate set: :arg timestep: timestep used for integration, default is 1 :arg firsttimestep: number of the first timestep, default is 0 :arg framefreq: number of timesteps between frames, default is 1""" if self._closed: raise ValueError('I/O operation on closed file') if self._mode == 'r': raise IOError('File not open for writing') try: coords = coords._getCoordsets() except AttributeError: try: coords = coords._getCoords() except AttributeError: checkCoords(coords, csets=True, dtype=None) else: if unitcell is None: try: coords = coords.getUnitcell() except AttributeError: pass if coords.dtype != float32: coords = coords.astype(float32) n_atoms = coords.shape[-2] if self._n_atoms == 0: self._n_atoms = n_atoms elif self._n_atoms != n_atoms: raise ValueError('coords does not have correct number of atoms') if coords.ndim == 2: coords = [coords] dcd = self._file pack_i_4N = pack('i', self._n_atoms * 4) if self._n_csets == 0: if unitcell is None: self._unitcell = False else: self._unitcell = True timestep = float(kwargs.get('timestep', 1.0)) first_ts = int(kwargs.get('firsttimestep', 0)) framefreq = int(kwargs.get('framefreq', 1)) n_fixed = 0 pack_i_0 = pack(b'i', 0) pack_ix4_0x4 = pack(b'i'*4, 0, 0, 0, 0) pack_i_1 = pack(b'i', 1) pack_i_2 = pack(b'i', 2) pack_i_4 = pack(b'i', 4) pack_i_84 = pack(b'i', 84) pack_i_164 = pack(b'i', 164) dcd.write(pack_i_84) dcd.write(b'CORD') dcd.write(pack_i_0) # 0 Number of frames in file, none written yet dcd.write(pack(b'i', first_ts)) # 1 Starting timestep dcd.write(pack(b'i', framefreq)) # 2 Timesteps between frames dcd.write(pack_i_0) # 3 Number of timesteps in simulation dcd.write(pack_i_0) # 4 NAMD writes NSTEP or ISTART - NSAVC here? dcd.write(pack_ix4_0x4) # 5, 6, 7, 8 dcd.write(pack('f', timestep)) # 9 timestep dcd.write(pack('i', int(self._unitcell))) # 10 with unitcell dcd.write(pack_ix4_0x4) # 11, 12, 13, 14 dcd.write(pack_ix4_0x4) # 15, 16, 17, 18 dcd.write(pack('i', 24)) # 19 Pretend to be CHARMM version 24 dcd.write(pack_i_84) dcd.write(pack_i_164) dcd.write(pack_i_2) dcd.write(b'Created by ProDy'.ljust(80)) temp = now().strftime('%d %B, %Y at %H:%M') try: temp = bytes(temp, encoding='utf-8') except TypeError: pass dcd.write((b'REMARKS Created ' + temp).ljust(80)) dcd.write(pack_i_164) dcd.write(pack_i_4) dcd.write(pack(b'i', n_atoms)) dcd.write(pack_i_4) self._first_byte = dcd.tell() if self._unitcell: if unitcell is None: raise TypeError('unitcell data is expected') else: uc = unitcell uc[3:] = np.sin((PISQUARE/90) * (90-uc[3:])) uc = uc[[0,3,1,4,5,2]] pack_i_48 = pack('i', 48) dcd.seek(0, 2) for xyz in coords: if self._unitcell: dcd.write(pack_i_48) uc.tofile(dcd) dcd.write(pack_i_48) xyz = xyz.T dcd.write(pack_i_4N) xyz[0].tofile(dcd) dcd.write(pack_i_4N) dcd.write(pack_i_4N) xyz[1].tofile(dcd) dcd.write(pack_i_4N) dcd.write(pack_i_4N) xyz[2].tofile(dcd) dcd.write(pack_i_4N) self._n_csets += 1 dcd.seek(8, 0) dcd.write(pack('i', self._n_csets)) dcd.seek(0, 2) self._nfi = self._n_csets
[docs] def flush(self): """Flush the internal output buffer.""" if self._mode != 'r': self._file.flush() os.fsync(self._file.fileno())
[docs]def parseDCD(filename, start=None, stop=None, step=None, astype=None): """Parse CHARMM format DCD files (also NAMD 2.1 and later). Returns an :class:`Ensemble` instance. Conformations in the ensemble will be ordered as they appear in the trajectory file. Use :class:`DCDFile` class for parsing coordinates of a subset of atoms. :arg filename: DCD filename :type filename: str :arg start: index of first frame to read :type start: int :arg stop: index of the frame that stops reading :type stop: int :arg step: steps between reading frames, default is 1 meaning every frame :type step: int :arg astype: cast coordinate array to specified type :type astype: type""" dcd = DCDFile(filename, astype=astype) time_ = time() n_frames = dcd.numFrames() LOGGER.info('DCD file contains {0} coordinate sets for {1} atoms.' .format(n_frames, dcd.numAtoms())) ensemble = dcd[slice(start,stop,step)] dcd.close() time_ = time() - time_ or 0.01 dcd_size = 1.0 * dcd.numFrames() * dcd._bytes_per_frame / (1024*1024) LOGGER.info('DCD file was parsed in {0:.2f} seconds.'.format(time_)) LOGGER.info('{0:.2f} MB parsed at input rate {1:.2f} MB/s.' .format(dcd_size, dcd_size/time_)) LOGGER.info('{0} coordinate sets parsed at input rate {1} frame/s.' .format(n_frames, int(n_frames/time_))) return ensemble
[docs]def writeDCD(filename, trajectory, start=None, stop=None, step=None, align=False): """Write 32-bit CHARMM format DCD file (also NAMD 2.1 and later). *trajectory* can be an :class:`Trajectory`, :class:`DCDFile`, or :class:`Ensemble` instance. *filename* is returned upon successful output of file.""" if not filename.lower().endswith('.dcd'): filename += '.dcd' if not isinstance(trajectory, (TrajBase, Ensemble, Atomic)): raise TypeError('{0} is not a valid type for trajectory' .format(type(trajectory))) irange = list(range(*slice(start, stop, step) .indices(trajectory.numCoordsets()))) n_csets = len(irange) if n_csets == 0: raise ValueError('trajectory does not have any coordinate sets, or ' 'no coordinate sets are selected') if isinstance(trajectory, Atomic): isEnsemble = False isAtomic = True n_atoms = trajectory.numAtoms() else: isEnsemble = True isAtomic = False n_atoms = trajectory.numSelected() if n_atoms == 0: raise ValueError('no atoms are selected in the trajectory') if isinstance(trajectory, TrajBase): isTrajectory = True unitcell = trajectory.hasUnitcell() nfi = trajectory.nextIndex() trajectory.reset() pack_i_48 = pack('i', 48) if isinstance(trajectory, Trajectory): timestep = trajectory.getTimestep()[0] first_ts = trajectory.getFirstTimestep()[0] framefreq = trajectory.getFrameFreq()[0] n_fixed = trajectory.numFixed()[0] else: timestep = trajectory.getTimestep() first_ts = trajectory.getFirstTimestep() framefreq = trajectory.getFrameFreq() n_fixed = trajectory.numFixed() else: isTrajectory = False unitcell = False if isinstance(trajectory, Ensemble): frame = trajectory[0] else: frame = trajectory acsi = trajectory.getACSIndex() timestep = 1 first_ts = 0 framefreq = 1 n_fixed = 0 dcd = DCDFile(filename, mode='w') LOGGER.progress('Writing DCD', len(irange), '_prody_writeDCD') prev = -1 uc = None time_ = time() for j, i in enumerate(irange): diff = i - prev prev = i if isTrajectory: if diff > 1: trajectory.skip(diff-1) frame = next(trajectory) if frame is None: break if unitcell: uc = frame._getUnitcell() uc[3:] = np.sin((PISQUARE/90) * (90-uc[3:])) uc = uc[[0,3,1,4,5,2]] elif isEnsemble: frame._index = i else: frame.setACSIndex(i) if align: frame.superpose() if j == 0: dcd.write(frame._getCoords(), uc, timestep=timestep, firsttimestep=first_ts, framefreq=framefreq) else: dcd.write(frame._getCoords(), uc) LOGGER.update(i, label='_prody_writeDCD') if isAtomic: trajectory.setACSIndex(acsi) j += 1 LOGGER.finish() dcd.close() time_ = time() - time_ or 0.01 dcd_size = 1.0 * (56 + (n_atoms * 3 + 6) * 4 ) * n_csets / (1024*1024) LOGGER.info('DCD file was written in {0:.2f} seconds.'.format(time_)) LOGGER.info('{0:.2f} MB written at input rate {1:.2f} MB/s.' .format(dcd_size, dcd_size/time_)) LOGGER.info('{0} coordinate sets written at output rate {1} frame/s.' .format(n_csets, int(n_csets/time_))) if j != n_csets: LOGGER.warn('Warning: {0} frames expected, {1} written.' .format(n_csets, j)) if isTrajectory: trajectory.goto(nfi) return filename