Source code for prody.sequence.analysis

# -*- coding: utf-8 -*-
"""This module defines MSA analysis functions."""

__author__ = 'Anindita Dutta, Ahmet Bakan, Wenzhi Mao'

from numbers import Integral
import os

from numpy import dtype, zeros, empty, ones, where, ceil, shape, eye
from numpy import indices, tril_indices, array, ndarray, isscalar, unique

from prody import LOGGER
from prody.utilities import which, MATCH_SCORE, MISMATCH_SCORE
from prody.utilities import GAP_PENALTY, GAP_EXT_PENALTY, ALIGNMENT_METHOD

from prody.sequence.msa import MSA, refineMSA
from prody.sequence.msafile import parseMSA, writeMSA
from prody.sequence.sequence import Sequence
from prody.atomic import Atomic
from prody.measure import calcDistance

from Bio import pairwise2
import sys

__all__ = ['calcShannonEntropy', 'buildMutinfoMatrix', 'calcMSAOccupancy',
           'applyMutinfoCorr', 'applyMutinfoNorm', 'calcRankorder', 'filterRankedPairs',
           'buildSeqidMatrix', 'uniqueSequences', 'buildOMESMatrix',
           'buildSCAMatrix', 'buildDirectInfoMatrix', 'calcMeff', 
           'buildPCMatrix', 'buildMSA', 'showAlignment', 'alignTwoSequencesWithBiopython', 
           'alignSequenceToMSA', 'calcPercentIdentities', 'alignSequencesByChain',
           'trimAtomsUsingMSA']


doc_turbo = """

    By default, *turbo* mode, which uses memory as large as the MSA array
    itself but runs four to five times faster, will be used.  If memory
    allocation fails, the implementation will fall back to slower and
    memory efficient mode."""

[docs]def calcPercentIdentities(msa): percent_ids = [] aas = ['A','C','D','E','F','G','H','I','J','K','L', \ 'M','N','P','Q','R','S','T','V','W','Y','-'] for i in range(len(msa)): col_list = list(msa.getArray()[:,i]) max_count = 0 for aa in aas: if col_list.count(aa) > max_count: max_count = col_list.count(aa) percent_ids.append(float(max_count)/float(len(col_list))*100) return percent_ids
def getMSA(msa): """Returns MSA character array.""" try: msa = msa._getArray() except AttributeError: pass try: dtype_, ndim, shape = msa.dtype, msa.ndim, msa.shape except AttributeError: raise TypeError('msa must be an MSA instance or a 2D character array') if dtype_ != dtype('|S1') or ndim != 2: raise TypeError('msa must be an MSA instance or a 2D character array') return msa
[docs]def calcShannonEntropy(msa, ambiguity=True, omitgaps=True, **kwargs): """Returns Shannon entropy array calculated for *msa*, which may be an :class:`.MSA` instance or a 2D Numpy character array. Implementation is case insensitive and handles ambiguous amino acids as follows: * **B** (Asx) count is allocated to *D* (Asp) and *N* (Asn) * **Z** (Glx) count is allocated to *E* (Glu) and *Q* (Gln) * **J** (Xle) count is allocated to *I* (Ile) and *L* (Leu) * **X** (Xaa) count is allocated to the twenty standard amino acids Selenocysteine (**U**, Sec) and pyrrolysine (**O**, Pyl) are considered as distinct amino acids. When *ambiguity* is set **False**, all alphabet characters as considered as distinct types. All non-alphabet characters are considered as gaps, and they are handled in two ways: * non-existent, the probability of observing amino acids in a given column is adjusted, by default * as a distinct character with its own probability, when *omitgaps* is **False**""" msa = getMSA(msa) length = msa.shape[1] entropy = empty(length, float) from .msatools import msaentropy return msaentropy(msa, entropy, ambiguity=bool(ambiguity), omitgaps=bool(omitgaps))
[docs]def buildMutinfoMatrix(msa, ambiguity=True, turbo=True, **kwargs): """Returns mutual information matrix calculated for *msa*, which may be an :class:`.MSA` instance or a 2D Numpy character array. Implementation is case insensitive and handles ambiguous amino acids as follows: * **B** (Asx) count is allocated to *D* (Asp) and *N* (Asn) * **Z** (Glx) count is allocated to *E* (Glu) and *Q* (Gln) * **J** (Xle) count is allocated to *I* (Ile) and *L* (Leu) * **X** (Xaa) count is allocated to the twenty standard amino acids * Joint probability of observing a pair of ambiguous amino acids is allocated to all potential combinations, e.g. probability of **XX** is allocated to 400 combinations of standard amino acids, similarly probability of **XB** is allocated to 40 combinations of *D* and *N* with the standard amino acids. Selenocysteine (**U**, Sec) and pyrrolysine (**O**, Pyl) are considered as distinct amino acids. When *ambiguity* is set **False**, all alphabet characters as considered as distinct types. All non-alphabet characters are considered as gaps. Mutual information matrix can be normalized or corrected using :func:`applyMINormalization` and :func:`applyMICorrection` methods, respectively. Normalization by joint entropy can performed using this function with *norm* option set **True**.""" msa = getMSA(msa) from .msatools import msamutinfo LOGGER.timeit('_mutinfo') length = msa.shape[1] mutinfo = empty((length, length), float) mutinfo = msamutinfo(msa, mutinfo, ambiguity=bool(ambiguity), turbo=bool(turbo), norm=bool(kwargs.get('norm', False)), debug=bool(kwargs.get('debug', False))) LOGGER.report('Mutual information matrix was calculated in %.2fs.', '_mutinfo') return mutinfo
buildMutinfoMatrix.__doc__ += doc_turbo
[docs]def calcMSAOccupancy(msa, occ='res', count=False): """Returns occupancy array calculated for residue positions (default, ``'res'`` or ``'col'`` for *occ*) or sequences (``'seq'`` or ``'row'`` for *occ*) of *msa*, which may be an :class:`.MSA` instance or a 2D NumPy character array. By default, occupancy [0-1] will be calculated. If *count* is **True**, count of non-gap characters will be returned. Implementation is case insensitive.""" from .msatools import msaocc msa = getMSA(msa) try: dim = occ.startswith('res') or occ.startswith('col') except AttributeError: raise TypeError('occ must be a string') occ = zeros(msa.shape[int(dim)], float) return msaocc(msa, occ, dim, count=bool(count))
[docs]def applyMutinfoNorm(mutinfo, entropy, norm='sument'): """Apply one of the normalizations discussed in [MLC05]_ to *mutinfo* matrix. *norm* can be one of the following: * ``'sument'``: :math:`H(X) + H(Y)`, sum of entropy of columns * ``'minent'``: :math:`min\{H(X), H(Y)\}`, minimum entropy * ``'maxent'``: :math:`max\{H(X), H(Y)\}`, maximum entropy * ``'mincon'``: :math:`min\{H(X|Y), H(Y|X)\}`, minimum conditional entropy * ``'maxcon'``: :math:`max\{H(X|Y), H(Y|X)\}`, maximum conditional entropy where :math:`H(X)` is the entropy of a column, and :math:`H(X|Y) = H(X) - MI(X, Y)`. Normalization with joint entropy, i.e. :math:`H(X, Y)`, can be done using :func:`.buildMutinfoMatrix` *norm* argument. .. [MLC05] Martin LC, Gloor GB, Dunn SD, Wahl LM. Using information theory to search for co-evolving residues in proteins. *Bioinformatics* **2005** 21(22):4116-4124.""" try: ndim, shape = mutinfo.ndim, mutinfo.shape except AttributeError: raise TypeError('mutinfo must be a 2D square array') if ndim != 2 or shape[0] != shape[1]: raise ValueError('mutinfo must be a 2D square array') try: ndim, shapent = entropy.ndim, entropy.shape except AttributeError: raise TypeError('entropy must be a numpy array') if ndim != 1: raise ValueError('entropy must be a 1D array') if shapent[0] != shape[0]: raise ValueError('shape of mutinfo and entropy does not match') try: sw = norm.startswith except AttributeError: raise TypeError('norm must be a string') if sw('sument'): norm = lambda i_val, j_val, val: i_val + j_val elif sw('minent'): norm = lambda i_val, j_val, val: min(i_val, j_val) elif sw('maxent'): norm = lambda i_val, j_val, val: max(i_val, j_val) elif sw('mincon'): norm = lambda i_val, j_val, val: min(i_val - val, j_val - val) elif sw('maxcon'): norm = lambda i_val, j_val, val: max(i_val - val, j_val - val) elif sw('joint'): raise ValueError('for joint entropy normalization, use ' 'buildMutinfoMatrix function') else: raise ValueError('norm={0} is not a valid normalization type' .format(norm)) mi = mutinfo.copy() for i, i_val in enumerate(entropy): for j, j_val in enumerate(entropy): val = mi[i, j] div = norm(i_val, j_val, val) if div == 0: mi[i, j] = 0 else: mi[i, j] /= div return mi
[docs]def applyMutinfoCorr(mutinfo, corr='prod'): """Returns a copy of *mutinfo* array after average product correction (default) or average sum correction is applied. See [DSD08]_ for details. .. [DSD08] Dunn SD, Wahl LM, Gloor GB. Mutual information without the influence of phylogeny or entropy dramatically improves residue contact prediction. *Bioinformatics* **2008** 24(3):333-340.""" try: ndim, shape = mutinfo.ndim, mutinfo.shape except AttributeError: raise TypeError('mutinfo must be a 2D square array') if ndim != 2 or shape[0] != shape[1]: raise ValueError('mutinfo must be a 2D square array') try: sw = corr.startswith except AttributeError: raise TypeError('correction must be a string') avg_mipos = mutinfo.sum(1) / (shape[0] - 1) avg_mi = avg_mipos.mean() mi = mutinfo.copy() if sw('prod') or sw('apc'): for i, i_avg in enumerate(avg_mipos): for j, j_avg in enumerate(avg_mipos): mi[i, j] -= (i_avg * j_avg) / avg_mi elif sw('sum') or sw('asc'): for i, i_avg in enumerate(avg_mipos): for j, j_avg in enumerate(avg_mipos): mi[i, j] -= i_avg + j_avg - avg_mi else: raise ValueError('correction must be prod or sum, not ' + corr) return mi
[docs]def filterRankedPairs(pdb, indices, msa_indices, rank_row, rank_col, zscore_sort, \ num_of_pairs=20, seqDistance=5, resi_range=None, \ pdbDistance=8, chain1='A', chain2='A'): ''' indices and msa_indices are lists output from alignSequenceToMSA rank_row, rank_col and zscore_sort are the outputs from calcRankorder :arg num_of_pairs: The number of pairs to be output, if no value is given then all pairs are output. Default is 20 :type num_of_pairs: int :arg seqDistance: Remove pairs that are closer than this in the reference sequence Default is 5 :type seqDistance: int :arg pdbDistance: Remove pairs with Calpha atoms further apart than this in the PDB Default is 8 :type pdbDistance: int :arg chain1: The chain used for the residue specified by rank_row when measuring distances :type chain1: str :arg chain2: The chain used for the residue specified by rank_col when measuring distances :type chain2: str ''' if isscalar(indices): raise TypeError('Please provide a valid indices list') if isscalar(msa_indices): raise TypeError('Please provide valid msa_indices, which should be a list') if isscalar(rank_row): raise TypeError('Please provide ranked row from calcRankorder') if isscalar(rank_col): raise ValueError('Please provide ranked col from calcRankorder') if isscalar(zscore_sort): raise ValueError('Please provide sorted Z scores from calcRankorder') if num_of_pairs is None: num_of_pairs = len(rank_row) pairList = [] i = -1 j = 0 while j < num_of_pairs: i += 1 row_idx = indices[where(msa_indices == rank_row[i])[0][0]] col_idx = indices[where(msa_indices == rank_col[i])[0][0]] if not isinstance(row_idx, Integral) or not isinstance(col_idx, Integral): continue if row_idx - col_idx < seqDistance: continue distance = calcDistance(pdb.select('chain %s and resid %s' % (chain1, \ row_idx)).copy(), \ pdb.select('chain %s and resid %s' % (chain2, \ row_idx)).copy()) if distance > pdbDistance: continue if resi_range is not None: if not row_idx in resi_range and not col_idx in resi_range: continue pairList.append('%3d:\t%3d\t%3d\t%5.1f\t%5.1f\n'%(i, row_idx, col_idx, zscore_sort[i], distance)) j += 1 return pairList
[docs]def buildSeqidMatrix(msa, turbo=True): """Returns sequence identity matrix for *msa*.""" msa = getMSA(msa) LOGGER.timeit('_seqid') from .seqtools import msaeye dim = msa.shape[0] seqid = msaeye(msa, ones((dim, dim), float), turbo=bool(turbo)) LOGGER.report('Sequence identity matrix was calculated in %.2fs.', '_seqid') return seqid
buildSeqidMatrix.__doc__ += doc_turbo
[docs]def uniqueSequences(msa, seqid=0.98, turbo=True): """Returns a boolean array marking unique sequences in *msa*. A sequence sharing sequence identity of *seqid* or more with another sequence coming before itself in *msa* will have a **True** value in the array.""" msa = getMSA(msa) from .seqtools import msaeye if not (0 < seqid <= 1): raise ValueError('seqid must satisfy 0 < seqid <= 1') return msaeye(msa, zeros(msa.shape[0], bool), unique=seqid, turbo=bool(turbo))
uniqueSequences.__doc__ += doc_turbo
[docs]def calcRankorder(matrix, zscore=False, **kwargs): """Returns indices of elements and corresponding values sorted in descending order, if *descend* is **True** (default). Can apply a zscore normalization; by default along *axis* - 0 such that each column has ``mean=0`` and ``std=1``. If *zcore* analysis is used, return value contains the zscores. If matrix is symmetric only lower triangle indices will be returned, with diagonal elements if *diag* is **True** (default).""" try: ndim, shape = matrix.ndim, matrix.shape except AttributeError: raise TypeError('matrix must be a 2D array') if ndim != 2: raise ValueError('matrix must be a 2D array') threshold = kwargs.get('thredhold', 0.0001) try: symm = abs((matrix.transpose() - matrix).max()) < threshold except: symm = False if zscore: axis = int(bool(kwargs.get('axis', 0))) matrix = (matrix - matrix.mean(axis)) / matrix.std(axis) LOGGER.info('Zscore normalization has been applied.') descend = kwargs.get('descend', True) if not symm: if descend: sorted_index = matrix.argsort(axis=None)[::-1] else: sorted_index = matrix.argsort(axis=None) row = indices(shape)[0].flatten()[sorted_index] column = indices(shape)[1].flatten()[sorted_index] else: LOGGER.info('Matrix is symmetric, only lower triangle indices ' 'will be returned.') if kwargs.get('diag', True): k = 0 else: k = -1 ind_row, ind_column = tril_indices(shape[0], k=k) matrix_lt = matrix[ind_row, ind_column] if descend: sorted_index = matrix_lt.argsort(axis=None)[::-1] else: sorted_index = matrix_lt.argsort(axis=None) row = ind_row[sorted_index] column = ind_column[sorted_index] return (row, column, matrix[row, column])
[docs]def buildOMESMatrix(msa, ambiguity=True, turbo=True, **kwargs): """Returns OMES (Observed Minus Expected Squared) covariance matrix calculated for *msa*, which may be an :class:`.MSA` instance or a 2D NumPy character array. OMES is defined as:: (N_OBS - N_EX)^2 (f_i,j - f_i * f_j)^2 OMES_(i,j) = sum(------------------) = N * sum(-----------------------) N_EX f_i * f_j Implementation is case insensitive and handles ambiguous amino acids as follows: * **B** (Asx) count is allocated to *D* (Asp) and *N* (Asn) * **Z** (Glx) count is allocated to *E* (Glu) and *Q* (Gln) * **J** (Xle) count is allocated to *I* (Ile) and *L* (Leu) * **X** (Xaa) count is allocated to the twenty standard amino acids * Joint probability of observing a pair of ambiguous amino acids is allocated to all potential combinations, e.g. probability of **XX** is allocated to 400 combinations of standard amino acids, similarly probability of **XB** is allocated to 40 combinations of *D* and *N* with the standard amino acids. Selenocysteine (**U**, Sec) and pyrrolysine (**O**, Pyl) are considered as distinct amino acids. When *ambiguity* is set **False**, all alphabet characters as considered as distinct types. All non-alphabet characters are considered as gaps.""" msa = getMSA(msa) from .msatools import msaomes LOGGER.timeit('_omes') length = msa.shape[1] omes = empty((length, length), float) omes = msaomes(msa, omes, ambiguity=bool(ambiguity), turbo=bool(turbo), debug=bool(kwargs.get('debug', False))) LOGGER.report('OMES matrix was calculated in %.2fs.', '_omes') return omes
buildOMESMatrix.__doc__ += doc_turbo
[docs]def buildSCAMatrix(msa, turbo=True, **kwargs): """Returns SCA matrix calculated for *msa*, which may be an :class:`.MSA` instance or a 2D Numpy character array. Implementation is case insensitive and handles ambiguous amino acids as follows: * **B** (Asx) count is allocated to *D* (Asp) and *N* (Asn) * **Z** (Glx) count is allocated to *E* (Glu) and *Q* (Gln) * **J** (Xle) count is allocated to *I* (Ile) and *L* (Leu) * **X** (Xaa) count is allocated to the twenty standard amino acids * Joint probability of observing a pair of ambiguous amino acids is allocated to all potential combinations, e.g. probability of **XX** is allocated to 400 combinations of standard amino acids, similarly probability of **XB** is allocated to 40 combinations of *D* and *N* with the standard amino acids. Selenocysteine (**U**, Sec) and pyrrolysine (**O**, Pyl) are considered as distinct amino acids. When *ambiguity* is set **False**, all alphabet characters as considered as distinct types. All non-alphabet characters are considered as gaps.""" msa = getMSA(msa) if msa.shape[0]<100: LOGGER.warning('SCA performs the best with higher number of sequences, and ' 'minimal number of sequences is recommended as 100.') from .msatools import msasca LOGGER.timeit('_sca') length = msa.shape[1] sca = zeros((length, length), float) sca = msasca(msa, sca, turbo=bool(turbo)) LOGGER.report('SCA matrix was calculated in %.2fs.', '_sca') return sca
buildSCAMatrix.__doc__ += doc_turbo
[docs]def buildPCMatrix(msa, turbo=False, **kwargs): """Returns PC matrix calculated for *msa*, which may be an :class:`.MSA` instance or a 2D Numpy character array. Implementation is case insensitive and handles ambiguous amino acids as follows: * **B** (Asx) count is allocated to *D* (Asp) and *N* (Asn) * **Z** (Glx) count is allocated to *E* (Glu) and *Q* (Gln) * **J** (Xle) count is allocated to *I* (Ile) and *L* (Leu) * **X** (Xaa) count is allocated to the twenty standard amino acids * Joint probability of observing a pair of ambiguous amino acids is allocated to all potential combinations, e.g. probability of **XX** is allocated to 400 combinations of standard amino acids, similarly probability of **XB** is allocated to 40 combinations of *D* and *N* with the standard amino acids. Selenocysteine (**U**, Sec) and pyrrolysine (**O**, Pyl) are considered as distinct amino acids. When *ambiguity* is set **False**, all alphabet characters as considered as distinct types. All non-alphabet characters are considered as gaps. """ msa = getMSA(msa) from .msatools import msapsicov LOGGER.timeit('_psicov') length = msa.shape[1] pc = zeros((length, length), float) pc = msapsicov(msa, pc, turbo=bool(turbo)) LOGGER.report('PC matrix was calculated in %.2fs.', '_psicov') return pc
[docs]def buildDirectInfoMatrix(msa, seqid=.8, pseudo_weight=.5, refine=False, **kwargs): """Returns direct information matrix calculated for *msa*, which may be an :class:`.MSA` instance or a 2D Numpy character array. Sequences sharing sequence identity of *seqid* or more with another sequence are regarded as similar sequences for calculating their weights using :func:`.calcMeff`. *pseudo_weight* are the weight for pseudo count probability. Sequences are not refined by default. When *refine* is set **True**, the MSA will be refined by the first sequence and the shape of direct information matrix will be smaller. """ msa = getMSA(msa) from .msatools import msadipretest, msadirectinfo1, msadirectinfo2 from numpy import matrix LOGGER.timeit('_di') if msa.shape[0]<250: LOGGER.warning('DI performs the best with higher number of sequences, and ' 'minimal number of sequences is recommended as 250.') refine = 1 if refine else 0 # msadipretest get some parameter from msa to set matrix size length, q = msadipretest(msa, refine=refine) c = matrix.dot(matrix(zeros((length*q, 1), float)), matrix(zeros((1, length*q), float))) prob = zeros((length, q+1), float) # msadirectinfo1 return c to be inversed and prob to be used meff, n, length, c, prob = msadirectinfo1(msa, c, prob, theta=1.-seqid, pseudocount_weight=pseudo_weight, refine=refine, q=q+1) c = c.I di = zeros((length, length), float) # get final DI di = msadirectinfo2(n, length, c, prob, di, q+1) del prob, c LOGGER.report('DI matrix was calculated in %.2fs.', '_di') return di
[docs]def calcMeff(msa, seqid=.8, refine=False, weight=False, **kwargs): """Returns the Meff for *msa*, which may be an :class:`.MSA` instance or a 2D Numpy character array. Since similar sequences in an *msa* decreases the diversity of *msa*, *Meff* gives a weight for sequences in the *msa*. For example: One sequence in MSA has 5 other similar sequences in this MSA(itself included). The weight of this sequence is defined as 1/5=0.2. Meff is the sum of all sequence weights. In another word, Meff can be understood as the effective number of independent sequences. Sequences sharing sequence identity of *seqid* or more with another sequence are regarded as similar sequences to calculate Meff. Sequences are not refined by default. When *refine* is set **True**, the MSA will be refined by the first sequence. The weight for each sequence are returned when *weight* is **True**.""" msa = getMSA(msa) from .msatools import msameff LOGGER.timeit('_meff') refine = 1 if refine else 0 weight = 0 if weight else 1 # A Mark for return weighted array. if (not weight): w = zeros((msa.shape[0]), float) meff = msameff(msa, theta=1.-seqid, meff_only=weight, refine=refine, w=w) else: meff = msameff(msa, theta=1.-seqid, meff_only=weight, refine=refine) LOGGER.report('Meff was calculated in %.2fs.', '_meff') return meff
[docs]def alignSequencesByChain(PDBs, **kwargs): """ Runs :func:`buildMSA` for each chain and optionally joins the results. Returns either a single :class:`MSA` or a dictionary containing an :class:`MSA` for each chain. :arg PDBs: a list of :class:`AtomGroup` objects :type PDBs: list :arg join_chains: whether to join chain alignments default is True :type join_chains: bool :arg join_char: a character for joining chain alignments default is '/' as used by PIR format alignments :type join_char: str """ if isscalar(PDBs): raise TypeError('PDBs should be array-like') if not PDBs: raise ValueError('PDBs should not be empty') pdbs = [] chains = [] for i, pdb in enumerate(PDBs): if isinstance(pdb, Atomic): pdbs.append(pdb) else: raise TypeError('each entry in PDBs must be a :class:`Atomic` instance') chains.append([]) for chain in list(pdbs[i].getHierView()): chains[i].append(chain) if i != 0 and len(chains[i]) != len(chains[0]): raise ValueError('all pdbs should have the same number of chains') labels = [] for pdb in pdbs: chids = '' for chain in list(pdb.getHierView()): chids += chain.getChid() labels.append(pdb.getTitle() + '_' + chids) chains = array(chains) chain_alignments = [] alignments = {} for j in range(len(chains[0])): prefix = 'chain_' + chains[0, j].getChid() msa = buildMSA(chains[:, j], title=prefix, labels=labels) msa = refineMSA(msa, colocc=1e-9) # remove gap-only cols chain_alignments.append(msa) alignments[labels[0].split('_')[1][j]] = msa join_chains = kwargs.get('join_chains', True) join_char = kwargs.get('join_char', '/') if len(chains[0]) == 1: join_chains = False if join_chains: joined_msaarr = [] for i, chain_alignment in enumerate(chain_alignments): pdb_seqs = [] for j, sequence in enumerate(chain_alignment): pdb_seqs.append(sequence) joined_msaarr.append(join_char.join(pdb_seqs)) result = MSA(joined_msaarr, title='joined_chains', labels=[label.split('_')[0] for label in labels]) else: result = alignments if len(result) == 1: result = result[list(result.keys())[0]] return result
[docs]def buildMSA(sequences, title='Unknown', labels=None, **kwargs): """ Aligns sequences with clustalw or clustalw2 and returns the resulting MSA. :arg sequences: a file, MSA object or a list or array containing sequences as Atomic objects with :func:`getSequence` or Sequence objects or strings. If strings are used then labels must be provided using ``labels`` :type sequences: :class:`Atomic`, :class:`.MSA`, :class:`~numpy.ndarray`, str :arg title: the title for the MSA and it will be used as the prefix for output files. :type title: str :arg labels: a list of labels to go with the sequences :type labels: list :arg align: whether to align the sequences default True :type align: bool :arg method: alignment method, one of either 'global' (biopython.pairwise2.align.globalms), 'local' (biopython.pairwise2.align.localms), clustalw(2), or another software in your path. Default is 'clustalw' :type align: str """ align = kwargs.get('align', True) method = kwargs.pop('method', 'clustalw') # 1. check if sequences are in a fasta file and if not make one if isinstance(sequences, str): filename = sequences elif not isinstance(sequences, MSA): try: max_len = 0 for sequence in sequences: if isinstance(sequence, Atomic): if len(sequence.ca.copy()) > max_len: max_len = len(sequence.ca.copy()) elif isinstance(sequence, MSA): if len(sequence[0]) > max_len: max_len = len(sequence[0]) else: if len(sequence) > max_len: max_len = len(sequence) msa = [] fetched_labels = [] for i, sequence in enumerate(sequences): if isinstance(sequence, Atomic): strseq = sequence.ca.getSequence() label = sequence.getTitle() elif isinstance(sequence, Sequence): strseq = str(sequence) label = sequence.getLabel() elif isinstance(sequence, MSA): strseq = str(sequence[0]) label = sequence.getLabel(0) LOGGER.warn('Only the first sequence in the MSA at entry {0} is used.' .format(i)) elif isinstance(sequence, str): strseq = sequence label = str(i + 1) else: raise TypeError('sequences should be a list of strings, ' 'Atomic, or Sequence instances') strseq = strseq + '-'*(max_len - len(strseq)) msa.append(array(list(strseq))) fetched_labels.append(label) sequences = array(msa) except: raise TypeError('sequences should be iterable') # "if a list" is a pythonic way to check if a list is empty or not (or none) if not labels and fetched_labels: labels = fetched_labels label = [label.replace(' ','_') for label in labels] # labels checkers are removed because they will be properly handled in MSA class initialization msa = MSA(msa=sequences, title=title, labels=labels) if align and 'clustal' in method: filename = writeMSA(title + '.fasta', msa) if align: # 2. find and run alignment method if method in ['biopython', 'local', 'global']: if len(sequences) == 2: msa, _, _ = alignTwoSequencesWithBiopython(sequences[0], sequences[1], **kwargs) else: raise ValueError("Provide only two sequences or another method. \ Biopython pairwise alignment can only be used \ to build an MSA with two sequences.") elif 'clustalw' in method: clustalw = which('clustalw') if clustalw is None: if which('clustalw2') is not None: clustalw = which('clustalw2') else: raise EnvironmentError("The executable for clustalw was not found, \ install clustalw or add it to the path.") os.system('"%s" %s -OUTORDER=INPUT'%(clustalw, filename)) # 3. parse and return the new MSA msa = parseMSA(title + '.aln') else: alignTool = which(method) if alignTool is None: raise EnvironmentError("The executable for {0} was not found, \ install it or add it to the path.".format(alignTool)) os.system('"%s" %s -OUTORDER=INPUT'%(clustalw, filename)) # 3. parse and return the new MSA msa = parseMSA(title + '.aln') return msa
[docs]def showAlignment(alignment, **kwargs): """ Prints out an alignment as sets of short rows with labels. :arg alignment: any object with aligned sequences :type alignment: :class: `.MSA`, list :arg row_size: the size of each row default 60 :type row_size: int :arg indices: a set of indices for some or all sequences that will be shown above the relevant sequences :type indices: :class:`~numpy.ndarray`, list :arg index_start: how far along the alignment to start putting indices default 0 :type index_start: int :arg index_stop: how far along the alignment to stop putting indices default the point when the shortest sequence stops :type index_stop: int :arg labels: a list of labels :type labels: list """ row_size = kwargs.get('row_size', 60) labels = kwargs.get('labels', None) if labels is not None: if isscalar(labels): raise TypeError('labels should be array-like') for label in labels: if not isinstance(label, str): raise TypeError('each label should be a string') if len(labels) < len(alignment): raise ValueError('there should be a label for every sequence shown') else: labels = [] for i, sequence in enumerate(alignment): if hasattr(sequence, 'getLabel'): labels.append(sequence.getLabel()) else: labels.append(str(i+1)) indices = kwargs.get('indices', None) index_start = kwargs.get('index_start', 0) index_stop = kwargs.get('index_stop', 0) if index_stop == 0 and indices is not None: locs = [] maxes = [] for index in indices: int_index = [] for i in index: if i == '': int_index.append(0) else: int_index.append(int(i)) int_index = array(int_index) maxes.append(max(int_index)) locs.append(where(int_index == max(int_index))[0][0]) index_stop = locs[where(maxes == min(maxes))[0][0]] for i in range(int(ceil(len(alignment[0])/float(row_size)))): for j in range(len(alignment)): if indices is not None: sys.stdout.write('\n' + ' '*15 + '\t') for k in range(row_size*i+10,row_size*(i+1)+10,10): try: if k > index_start + 10 and k < index_stop + 10: sys.stdout.write('{:10d}'.format(int(indices[j][k-1]))) elif k < index_stop: sys.stdout.write(' '*(k-index_start)) else: sys.stdout.write(' '*10) except: sys.stdout.write(' '*10) sys.stdout.write('\n') sys.stdout.write(labels[j][:15] + \ ' ' * (15-len(labels[j][:15])) + \ '\t' + str(alignment[j])[60*i:60*(i+1)] + '\n') sys.stdout.write('\n') return
[docs]def alignSequenceToMSA(seq, msa, **kwargs): """ Align a sequence from a PDB or Sequence to a sequence from an MSA and create two sets of indices. The sequence from the MSA (*seq*), the alignment and the two sets of indices are returned. The first set (*indices*) maps the residue numbers in the PDB to the reference sequence. The second set (*msa_indices*) indexes the reference sequence in the msa and is used for retrieving values from the first indices. :arg seq: an object with an associated sequence string or a sequence string itself :type seq: :class:`.Atomic`, :class:`.Sequence`, str :arg msa: a multiple sequence alignment :type msa: :class:`.MSA` :arg label: a label for a sequence in msa or a PDB ID ``msa.getIndex(label)`` must return a sequence index :type label: str :arg chain: which chain from pdb to use for alignment, default is **None**, which does no selection on *seq*. This value will be ignored if seq is not an :class:`.Atomic` object. :type chain: str Parameters for Biopython ``pairwise2`` alignments can be provided as keyword arguments. Default values are originally from ``proteins.compare`` module, but now found in ``utilities.seqtools``. :arg match: a positive integer, used to reward finding a match :type match: int :arg mismatch: a negative integer, used to penalise finding a mismatch :type mismatch: int :arg gap_opening: a negative integer, used to penalise opening a gap :type gap_opening: int :arg gap_extension: a negative integer, used to penalise extending a gap :type gap_extension: int :arg method: method for pairwise2 alignment. Possible values are ``"local"`` and ``"global"`` :type method: str """ label = kwargs.get('label', None) chain = kwargs.get('chain', None) match = kwargs.get('match', MATCH_SCORE) mismatch = kwargs.get('mismatch', MISMATCH_SCORE) gap_opening = kwargs.get('gap_opening', GAP_PENALTY) gap_extension = kwargs.get('gap_extension', GAP_EXT_PENALTY) method = kwargs.get('method', ALIGNMENT_METHOD) if isinstance(seq, Atomic): if isinstance(chain, str): ag = seq.select('chain {0}'.format(chain)) elif chain is None: ag = seq chids = ag.getChids() if len(unique(chids)) > 1: LOGGER.warn('%s consists of multiple chains. Please consider selecting one chain'%(seq.getTitle())) else: raise TypeError('chain should be a string or None') if ag is None: raise ValueError('seq may be None or chain ID may be invalid') sequence = ag.select('ca').getSequence() elif isinstance(seq, Sequence): sequence = str(seq) ag = None elif isinstance(seq, str): sequence = seq ag = None else: raise TypeError('seq must be an atomic class, sequence class, or str not {0}' .format(type(seq))) if not isinstance(msa, MSA): raise TypeError('msa must be an MSA instance') if label is None: if ag: label = ag.getTitle().split('_')[0] elif isinstance(seq, Sequence): label = seq.getLabel() else: raise ValueError('A label cannot be extracted from seq so please provide one.') index = msa.getIndex(label) if index is None and (len(label) == 4 or len(label) == 5): from prody import parsePDB try: structure, header = parsePDB(label[:4], header=True) except Exception as err: raise IOError('failed to parse header for {0} ({1})' .format(label[:4], str(err))) chid = chain for poly in header['polymers']: if chid and poly.chid != chid: continue for dbref in poly.dbrefs: if index is None: index = msa.getIndex(dbref.idcode) if index is not None: LOGGER.info('{0} idcode {1} for {2}{3} ' 'is found in {4}.'.format( dbref.database, dbref.idcode, label[:4], poly.chid, str(msa))) label = dbref.idcode break if index is None: index = msa.getIndex(dbref.accession) if index is not None: LOGGER.info('{0} accession {1} for {2}{3} ' 'is found in {4}.'.format( dbref.database, dbref.accession, label[:4], poly.chid, str(msa))) label = dbref.accession break if index is not None: chain = structure[poly.chid] if index is None: raise ValueError('label is not in msa, or msa is not indexed') try: len(index) except TypeError: pass else: raise ValueError('label {0} maps onto multiple sequences, ' 'so cannot be used for refinement'.format(label)) if isinstance(index, int): refMsaSeq = str(msa[index]).upper().replace('-','.') else: raise TypeError('The output from querying that label against msa is not a single sequence.') if method == 'local': alignment = pairwise2.align.localms(sequence, str(refMsaSeq), match, mismatch, gap_opening, gap_extension, one_alignment_only=1) elif method == 'global': alignment = pairwise2.align.globalms(sequence, str(refMsaSeq), match, mismatch, gap_opening, gap_extension, one_alignment_only=1) else: raise ValueError('method should be local or global') seq_indices = [0] msa_indices = [0] for i in range(len(alignment[0][0])): if alignment[0][0][i] != '-': seq_indices.append(seq_indices[i]+1) else: seq_indices.append(seq_indices[i]) if alignment[0][1][i] != '-': msa_indices.append(msa_indices[i]+1) else: msa_indices.append(msa_indices[i]) seq_indices.pop(0) # The first element was extra for initialisation msa_indices.pop(0) # The first element was extra for initialisation seq_indices = array(seq_indices) msa_indices = array(msa_indices) if ag: seq_indices += ag.getResnums()[0] - 1 alignment = MSA(msa=array([array(list(alignment[0][0])), \ array(list(alignment[0][1]))]), \ labels=[ag.getTitle(), label]) return alignment, seq_indices, msa_indices
[docs]def alignTwoSequencesWithBiopython(seq1, seq2, **kwargs): """Easily align two sequences with Biopython's globalms or localms. Returns an MSA and indices for use with :func:`.showAlignment`. Alignment parameters can be provided as keyword arguments. Default values are as originally set in the proteins.compare module, but now found in utilities.seqtools. :arg match: a positive integer, used to reward finding a match :type match: int :arg mismatch: a negative integer, used to penalise finding a mismatch :type mismatch: int :arg gap_opening: a negative integer, used to penalise opening a gap :type gap_opening: int :arg gap_extension: a negative integer, used to penalise extending a gap :type gap_extension: int :arg method: method for pairwise2 alignment. Possible values are 'local' and 'global' :type method: str """ match = kwargs.get('match', MATCH_SCORE) mismatch = kwargs.get('mismatch', MISMATCH_SCORE) gap_opening = kwargs.get('gap_opening', GAP_PENALTY) gap_extension = kwargs.get('gap_extension', GAP_EXT_PENALTY) method = kwargs.get('method', ALIGNMENT_METHOD) if method == 'local': alignment = pairwise2.align.localms(seq1, seq2, match, mismatch, gap_opening, gap_extension) elif method == 'global': alignment = pairwise2.align.globalms(seq1, seq2, match, mismatch, gap_opening, gap_extension) else: raise ValueError('method should be local or global') seq_indices = [0] msa_indices = [0] for i in range(len(alignment[0][0])): if alignment[0][0][i] != '-': seq_indices.append(seq_indices[i]+1) else: seq_indices.append(seq_indices[i]) if alignment[0][1][i] != '-': msa_indices.append(msa_indices[i]+1) else: msa_indices.append(msa_indices[i]) seq_indices = array(seq_indices) msa_indices = array(msa_indices) alignment = MSA(msa=array([array(list(alignment[0][0])), \ array(list(alignment[0][1]))])) return alignment, seq_indices, msa_indices
[docs]def trimAtomsUsingMSA(atoms, msa, **kwargs): """This function uses :func:`.alignSequenceToMSA` and has the same kwargs. :arg atoms: an atomic structure for trimming :type atoms: :class:`.Atomic` :arg msa: a multiple sequence alignment :type msa: :class:`.MSA` """ aln, idx_1, idx_2 = alignSequenceToMSA(atoms, msa, **kwargs) u, i = unique(idx_2, return_index=True) resnums_str = ' '.join([str(x) for x in idx_1[i]]) chain = kwargs.get('chain', 'A') return atoms.select('chain {0} and resnum {1}'.format(chain, resnums_str))