Source code for prody.chromatin.hic

from numbers import Integral

from numpy import ma
import numpy as np
from scipy.sparse import coo_matrix
from scipy.stats import mode
from prody.chromatin.norm import VCnorm, SQRTVCnorm, Filenorm
from prody.chromatin.functions import div0, showDomains, _getEigvecs

from prody import PY2K
from prody.dynamics import GNM, MaskedGNM
from prody.dynamics.functions import writeArray
from prody.dynamics.mode import Mode
from prody.dynamics.modeset import ModeSet

from prody.utilities import openFile, importLA, showMatrix, isURL, fixArraySize, makeSymmetric

__all__ = ['HiC', 'parseHiC', 'parseHiCStream', 'parseHiCBinary', 'saveHiC', 'loadHiC', 'writeMap']

[docs]class HiC(object): """This class is used to store and preprocess Hi-C contact map. A :class:`.GNM` instance for analyzing the contact map can be also created by using this class. """ def __init__(self, title='Unknown', map=None, bin=None): self._title = title self._map = None self.mask = False self._labels = 0 self.masked = True self.bin = bin self.map = map @property def map(self): if self.masked: return self.getTrimedMap() else: return self._map @map.setter def map(self, value): if value is None: self._map = None else: self._map = np.asarray(value) self._map = makeSymmetric(self._map) self._maskUnmappedRegions() self._labels = np.zeros(len(self._map), dtype=int) def __repr__(self): mask = self.mask if np.isscalar(mask): return '<HiC: {0} ({1} loci)>'.format(self._title, len(self._map)) else: return '<HiC: {0} ({1} mapped loci; {2} in total)>'.format(self._title, np.count_nonzero(mask), len(self._map)) def __str__(self): return 'HiC ' + self._title def __getitem__(self, index): if isinstance(index, Integral): return self.map.flatten()[index] else: i, j = index return self.map[i,j] def __len__(self): mask = self.mask if np.isscalar(mask): return len(self._map) else: return np.count_nonzero(mask) def numAtoms(self): return len(self.map)
[docs] def getTitle(self): """Returns title of the instance.""" return self._title
[docs] def setTitle(self, title): """Sets title of the instance.""" self._title = str(title)
[docs] def getCompleteMap(self): """Obtains the complete contact map with unmapped regions.""" return self._map
[docs] def getTrimedMap(self): """Obtains the contact map without unmapped regions.""" if self._map is None: return None if np.isscalar(self.mask): return self._map M = ma.array(self._map) M.mask = np.diag(~self.mask) return ma.compress_rowcols(M)
def align(self, array, axis=None): if not isinstance(array, np.ndarray): array = np.array(array) ret = array = array.copy() if np.isscalar(self.mask): return ret mask = self.mask.copy() l_full = self.getCompleteMap().shape[0] l_trim = self.getTrimedMap().shape[0] if len(array.shape) == 0: raise ValueError('array cannot be empty') elif len(array.shape) == 1: l = array.shape[0] if l == l_trim: N = len(mask) ret = np.zeros(N, dtype=array.dtype) ret[mask] = array elif l == l_full: ret = array[mask] else: raise ValueError('The length of array (%d) does not ' 'match that of either the full (%d) ' 'or trimed (%d).' %(l, l_full, l_trim)) elif len(array.shape) == 2: s = array.shape if axis is None: if s[0] != s[1]: raise ValueError('The array must be a square matrix ' 'if axis is set to None.') if s[0] == l_trim: N = len(mask) whole_mat = np.zeros((N,N), dtype=array.dtype) mask = np.outer(mask, mask) whole_mat[mask] = array.flatten() ret = whole_mat elif s[0] == l_full: M = ma.array(array) M.mask = np.diag(mask) ret = ma.compress_rowcols(M) else: raise ValueError('The size of array (%d) does not ' 'match that of either the full (%d) ' 'or trimed (%d).' %(s[0], l_full, l_trim)) else: new_shape = list(s) otheraxis = 0 if axis!=0 else 1 if s[axis] == l_trim: N = len(mask) new_shape[axis] = N whole_mat = np.zeros(new_shape) mask = np.expand_dims(mask, axis=otheraxis) mask = mask.repeat(s[otheraxis], axis=otheraxis) whole_mat[mask] = array.flatten() ret = whole_mat elif s[axis] == l_full: mask = np.expand_dims(mask, axis=otheraxis) mask = mask.repeat(s[otheraxis]) ret = self._map[mask] else: raise ValueError('The size of array (%d) does not ' 'match that of either the full (%d) ' 'or trimed (%d).' %(s[0], l_full, l_trim)) return ret
[docs] def getKirchhoff(self): """Builds a Kirchhoff matrix based on the contact map.""" if self._map is None: return None else: M = self.map I = np.eye(M.shape[0], dtype=bool) A = M.copy() A[I] = 0. D = np.diag(np.sum(A, axis=0)) K = D - A return K
def _maskUnmappedRegions(self, diag=False): """Finds and masks unmapped regions in the contact map.""" M = self._map if M is None: return if diag: # Obtain the diagonal values, need to make sure d is an array # instead of a matrix, otherwise diag() later will not work as # intended. d = np.array(np.diag(M)) else: d = np.array(M.sum(0)) # mask if a diagonal value is zero mask_zero = np.array(d==0) # mask if a diagonal value is NAN mask_nan = np.isnan(d) # combine two masks mask = np.logical_or(mask_nan, mask_zero) self.mask = ~mask return self.mask
[docs] def calcGNM(self, n_modes=None, **kwargs): """Calculates GNM on the current Hi-C map. By default, ``n_modes`` is set to **None** and ``zeros`` to **True**.""" if 'zeros' not in kwargs: kwargs['zeros'] = True if self.masked: gnm = MaskedGNM(self._title, self.mask) else: gnm = GNM(self._title) gnm.setKirchhoff(self.getKirchhoff()) gnm.calcModes(n_modes=n_modes, **kwargs) return gnm
[docs] def normalize(self, method=VCnorm, **kwargs): """Applies chosen normalization on the current Hi-C map.""" M = self._map N = method(M, **kwargs) self.map = N return N
[docs] def setDomains(self, labels, **kwargs): """Uses spectral clustering to identify structural domains on the chromosome. :arg labels: domain labels :type labels: :class:`~numpy.ndarray`, list :arg method: Label assignment algorithm used after Laplacian embedding. :type method: func """ wastrimmed = self.masked self.masked = True if len(labels) == self.numAtoms(): full_length = self.numAtoms() if full_length != len(labels): _labels = np.empty(full_length) _labels.fill(np.nan) _labels[self.mask] = labels currlbl = labels[0] for i in range(len(_labels)): l = _labels[i] if np.isnan(l): _labels[i] = currlbl elif currlbl != l: currlbl = l labels = _labels else: self.masked = False if len(labels) != self.numAtoms(): raise ValueError('The length of the labels should match either the length ' 'of masked or complete Hi-C map. Turn off "masked" if ' 'you intended to set the labels to the full map.') self.masked = wastrimmed self._labels = labels return self.getDomains()
[docs] def getDomains(self): """Returns an 1D :class:`numpy.ndarray` whose length is the number of loci. Each element is an index denotes to which domain the locus belongs.""" lbl = self._labels mask = self.mask if self.masked: lbl = lbl[mask] return lbl
[docs] def getDomainList(self): """Returns a list of domain separations. The list has two columns: the first is for the domain starts and the second is for the domain ends.""" indicators = np.diff(self.getDomains()) indicators = np.append(1., indicators) indicators[-1] = 1 sites = np.where(indicators != 0)[0] starts = sites[:-1] ends = sites[1:] domains = np.array([starts, ends]).T return domains
[docs] def view(self, spec='p', **kwargs): """Visualization of the Hi-C map and domains (if present). The function makes use of :func:`.showMatrix`. :arg spec: a string specifies how to preprocess the matrix. Blank for no preprocessing, 'p' for showing only data from *p*-th to *100-p*-th percentile. '_' is to suppress creating a new figure and paint to the current one instead. The letter specifications can be applied sequentially, e.g. 'p_'. :type spec: str :arg p: specifies the percentile threshold. :type p: double """ dm_kwargs = {} keys = list(kwargs.keys()) for k in keys: if k.startswith('dm_'): dm_kwargs[k[3:]] = kwargs.pop(k) elif k.startswith('domain_'): dm_kwargs[k[7:]] = kwargs.pop(k) M = self.map if 'p' in spec: p = kwargs.pop('p', 5) lp = kwargs.pop('lp', p) hp = kwargs.pop('hp', 100-p) vmin = np.percentile(M, lp) vmax = np.percentile(M, hp) else: vmin = vmax = None if not 'vmin' in kwargs: kwargs['vmin'] = vmin if not 'vmax' in kwargs: kwargs['vmax'] = vmax im = showMatrix(M, **kwargs) domains = self.getDomainList() if len(domains) > 1: showDomains(domains, **dm_kwargs) return im
def copy(self): new = type(self)() new.__dict__.update(self.__dict__) return new __copy__ = copy
[docs]def parseHiC(filename, **kwargs): """Returns an :class:`.HiC` from a Hi-C data file. This function extends :func:`.parseHiCStream`. :arg filename: the filename to the Hi-C data file. :type filename: str """ import os, struct title = kwargs.get('title') if title is None: title = os.path.basename(filename) else: title = kwargs.pop('title') if isURL(filename): M, res = parseHiCBinary(filename, title=title, **kwargs) else: with open(filename,'rb') as req: magic_number = struct.unpack('<3s',req.read(3))[0] if magic_number == b"HIC": M, res = parseHiCBinary(filename, title=title, **kwargs) else: with open(filename, 'r') as filestream: M, res = parseHiCStream(filestream, title=title, **kwargs) hic = HiC(title=title, map=M, bin=res) return hic
def _sparse2dense(I, J, values, bin=None): I = np.asarray(I, dtype=int) J = np.asarray(J, dtype=int) values = np.asarray(values, dtype=float) # determine the bin size by the most frequent interval if bin is None: loci = np.unique(np.sort(I)) bins = np.diff(loci) bin = mode(bins)[0][0] # convert coordinate from basepair to locus index bin = int(bin) I = I // bin J = J // bin # make sure that the matrix is square # if np.max(I) != np.max(J): # b = np.max(np.append(I, J)) # I = np.append(I, b) # J = np.append(J, b) # values = np.append(values, 0.) # Convert to sparse matrix format, then full matrix format # and finally array type. Matrix format is avoided because # diag() won't work as intended for Matrix instances. M = np.array(coo_matrix((values, (I, J))).todense()) return M, bin
[docs]def parseHiCStream(stream, **kwargs): """Returns an :class:`.HiC` from a stream of Hi-C data lines. :arg stream: Anything that implements the method ``read``, ``seek`` (e.g. :class:`file`, buffer, stdin) """ issparse = kwargs.get('sparse', None) import csv dialect = csv.Sniffer().sniff(stream.read(1024)) stream.seek(0) reader = csv.reader(stream, dialect) D = list() for row in reader: d = list() for element in row: d.append(np.double(element)) D.append(d) D = np.array(D) res = kwargs.get('bin', None) if res is not None: res = int(res) size = D.shape if len(D.shape) <= 1: raise ValueError("cannot parse the file: input file only contains one column.") if issparse is None: issparse = size[1] == 3 if not issparse: M = D else: try: I, J, values = D.T[:3] except ValueError: raise ValueError('the sparse matrix format should have three columns') M, res = _sparse2dense(I, J, values, bin=res) return M, res
def parseHiCBinary(filename, **kwargs): chrloc = kwargs.get('chrom', None) if chrloc is None: raise ValueError('chrom needs to be specified when parsing .hic format') chrloc1 = kwargs.get('chrom1', chrloc) chrloc2 = kwargs.get('chrom2', chrloc) norm = kwargs.get('norm', 'NONE') unit = kwargs.get('unit', 'BP') res = kwargs.get('binsize', None) res = kwargs.get('bin', res) if res is None: raise ValueError('bin needs to be specified when parsing .hic format') res = int(res) from .straw import straw result = straw(norm, filename, chrloc1, chrloc2, unit, res) M, res = _sparse2dense(*result, bin=res) return M, res
[docs]def writeMap(filename, map, bin=None, format='%f'): """Writes *map* to the file designated by *filename*. :arg filename: the file to be written. :type filename: str :arg map: a Hi-C contact map. :type map: :class:`numpy.ndarray` :arg bin: bin size of the *map*. If bin is `None`, *map* will be written in full matrix format. :type bin: int :arg format: output format for map elements. :type format: str """ assert isinstance(map, np.ndarray), 'map must be a numpy.ndarray.' if bin is None: return writeArray(filename, map, format=format) else: L = int(map.size - np.diag(map).size)//2 + np.diag(map).size spmat = np.zeros((L, 3)) m,n = map.shape l = 0 for i in range(m): for j in range(i,n): spmat[l, 0] = i * bin spmat[l, 1] = j * bin spmat[l, 2] = map[i, j] l += 1 fmt = ['%d', '%d', format] return writeArray(filename, spmat, format=fmt)
[docs]def saveHiC(hic, filename=None, map=True, **kwargs): """Saves *HiC* model data as :file:`filename.hic.npz`. If *map* is **True**, Hi-C contact map will not be saved and it can be loaded from raw data file later. If *filename* is **None**, name of the Hi-C instance will be used as the filename, after ``" "`` (white spaces) in the name are replaced with ``"_"`` (underscores). Upon successful completion of saving, filename is returned. This function makes use of :func:`numpy.savez` function.""" assert isinstance(hic, HiC), 'hic must be a HiC instance.' if filename is None: filename = hic.getTitle().replace(' ', '_') if filename.endswith('.hic'): filename += '.npz' elif not filename.endswith('.hic.npz'): filename += '.hic.npz' attr_dict = hic.__dict__.copy() if not map: attr_dict.pop('_map') ostream = openFile(filename, 'wb', **kwargs) np.savez_compressed(ostream, **attr_dict) ostream.close() return filename
[docs]def loadHiC(filename): """Returns HiC instance after loading it from file (*filename*). This function makes use of :func:`numpy.load` function. See also :func:`saveHiC`.""" attr_dict = np.load(filename) hic = HiC() keys = attr_dict.keys() for k in keys: val = attr_dict[k] if len(val.shape) == 0: val = np.asscalar(val) setattr(hic, k, val) return hic
def saveHiC_h5(hic, filename=None, **kwargs): """Saves *HiC* model data as :file:`filename.hic.npz`. If *filename* is **None**, name of the Hi-C instance will be used as the filename, after ``" "`` (white spaces) in the name are replaced with ``"_"`` (underscores). Upon successful completion of saving, filename is returned. This function makes use of :func:`numpy.savez` function.""" try: import h5py except: raise ImportError('h5py needs to be installed for using this function') assert isinstance(hic, HiC), 'hic must be a HiC instance.' if filename is None: filename = hic.getTitle().replace(' ', '_') if filename.endswith('.hic'): filename += '.hic' elif not filename.endswith('.hic.h5'): filename += '.hic.h5' attr_dict = hic.__dict__.copy() with h5py.File(filename, 'w') as f: for key in attr_dict: value = attr_dict[key] compression = None if np.isscalar(value) else 'gzip' f.create_dataset(key, data=value, compression=compression) return filename def loadHiC_h5(filename): """Returns HiC instance after loading it from file (*filename*). This function makes use of :func:`numpy.load` function. See also :func:`saveHiC`.""" try: import h5py except: raise ImportError('h5py needs to be installed for using this function') hic = HiC() with h5py.File(filename, 'r') as f: for key in f.keys(): try: value = f[key][:] except: value = f[key][()] setattr(hic, key, value) return hic