# -*- coding: utf-8 -*-
"""This module defines MSA analysis functions."""
from numbers import Integral
from numpy import all, zeros, dtype, array, char, cumsum, ceil, reshape
from numpy import where, sort, concatenate, vstack, isscalar, chararray
from Bio import AlignIO
from Bio import pairwise2
from prody import LOGGER, PY3K
from prody.atomic import Atomic
from prody.utilities import toChararray, pystr, splitSeqLabel
from .sequence import Sequence
import sys
__all__ = ['MSA', 'refineMSA', 'mergeMSA', 'specMergeMSA',]
[docs]class MSA(object):
"""Store and manipulate multiple sequence alignments."""
def __init__(self, msa, title='Unknown', labels=None, **kwargs):
"""*msa* must be a 2D Numpy character array. *labels* is a list of
sequence labels (or titles). *mapping* should map label or part of
label to sequence index in *msa* array. If *mapping* is not given,
one will be build from *labels*."""
self._aligned = aligned = kwargs.get('aligned', True)
msa = toChararray(msa, aligned)
numseq = msa.shape[0]
if labels and len(labels) != numseq:
raise ValueError('len(labels) must be equal to number of '
'sequences')
if labels is None:
labels = [str(i+1) for i in range(numseq)]
if PY3K:
for i, label in enumerate(labels):
if not isinstance(label, str):
labels[i] = label.decode()
self._labels = labels
mapping = kwargs.get('mapping')
self._map(mapping)
self._msa = msa
self._title = str(title) or 'Unknown'
self._split = bool(kwargs.get('split', True))
def _map(self, mapping=None):
labels = self._labels
if mapping is not None:
try:
mapping['isdict']
except KeyError:
pass
except Exception:
raise TypeError('mapping must be a dictionary')
for key, value in mapping.items():
values = [value] if isscalar(value) else value
for i in values:
if not key in labels[i]:
labels[i] = key
self._mapping = mapping = {}
for index, label in enumerate(labels):
label = splitSeqLabel(label)[0]
try:
value = mapping[label]
except KeyError:
mapping[label] = index
else:
try:
value.append(index)
except AttributeError:
mapping[label] = [value, index]
return mapping
def __str__(self):
return 'MSA ' + self._title
def __repr__(self):
if self._aligned:
return ('<MSA: {0} ({1} sequences, {2} residues)>'
).format(self._title, self.numSequences(),
self.numResidues())
else:
return ('<MSA: {0} ({1} sequences, not aligned)>'
).format(self._title, self.numSequences())
def __len__(self):
return len(self._msa)
def __add__(self, other):
"""Concatenate two MSAs."""
A = self.getArray()
B = other.getArray()
try:
AB = vstack((A, B))
except:
raise ValueError('failed to add {0} and {1} together: shapes do not match'
.format(repr(self), repr(other)))
aligned = self._aligned or other._aligned
labels = list(self._labels)
labels.extend(other._labels)
split = self._split or other._split
msa = MSA(AB, title='%s + %s'%(self.getTitle(), other.getTitle()),
aligned=aligned, labels=labels, split=split)
return msa
def __getitem__(self, index):
if isinstance(index, Integral):
return Sequence(self, index)
if isinstance(index, str):
try:
rows = self._mapping[index]
except KeyError:
raise KeyError('label {0} is not mapped to a sequence'
.format(index))
else:
msa = self._msa[rows]
if msa.ndim == 1:
return Sequence(self, rows)
else:
if msa.base is not None:
msa = msa.copy()
labels = self._labels
return MSA(msa, title='{0}[{1}]'.format(self._title,
index), labels=[labels[i] for i in rows])
elif isinstance(index, tuple):
if len(index) == 1:
return self[index[0]]
elif len(index) == 2:
rows, cols = index
else:
raise IndexError('invalid index: ' + str(index))
else:
rows, cols = index, None
# handle list of labels
if isinstance(rows, list):
rows = self.getIndex(rows) or rows
elif isinstance(rows, int):
return Sequence(self._msa[rows, cols].tostring(),
self._labels[rows])
elif isinstance(rows, str):
try:
rows = self._mapping[rows]
except KeyError:
raise KeyError('label {0} is not mapped to a sequence'
.format(index))
else:
if isinstance(rows, int):
return Sequence(self._msa[rows, cols].tostring(),
self._labels[rows])
if cols is None:
msa = self._msa[rows]
else:
try:
msa = self._msa[rows, cols]
except TypeError:
raise IndexError('invalid index: ' + str(index))
try:
lbls = list(array(self._labels)[rows])
except TypeError:
labels = self._labels
lbls = [labels[i] for i in rows]
finally:
if not isinstance(lbls, list):
lbls = [lbls]
if msa.ndim == 0:
msa = msa.reshape((1, 1))
elif msa.ndim == 1:
msa = msa.reshape((1, len(msa)))
if msa.base is not None:
msa = msa.copy()
return MSA(msa=msa, title=self._title, labels=lbls,
aligned=self._aligned)
def __iter__(self):
for i in range(len(self._msa)):
yield Sequence(self, i)
def __contains__(self, key):
try:
return key in self._mapping
except Exception:
pass
return False
def __eq__(self, other):
try:
other = other._getArray()
except AttributeError:
return False
try:
return all(other == self._msa)
except Exception:
pass
return False
def _getSplit(self):
return self._split
def _setSplit(self, split):
self._split = bool(split)
split = property(_getSplit, _setSplit,
doc='Return split label when iterating or indexing.')
[docs] def extend(self, other):
"""Adds *other* to this MSA."""
A = self.getArray()
if isinstance(other, MSA):
B = other.getArray()
otherlabels = other._labels
elif isinstance(other, Sequence):
B = other.getArray()
B = reshape(B, (1, B.shape[0]))
otherlabels = other.getLabel(full=True)
else:
try:
B = toChararray(other, aligned=False)
numA, numB = A.shape[0], B.shape[0]
otherlabels = [str(i+1) for i in range(numA, numA+numB)]
except:
raise ValueError('failed to add {1} to {0}'
.format(repr(self), repr(other)))
try:
AB = vstack((A, B))
except:
raise ValueError('failed to add {1} to {0}: shapes do not match'
.format(repr(self), repr(other)))
labels = list(self._labels)
labels.extend(otherlabels)
self._msa = AB
self._labels = labels
self._map()
[docs] def isAligned(self):
"""Returns **True** if MSA is aligned."""
return self._aligned
[docs] def numSequences(self):
"""Returns number of sequences."""
return self._msa.shape[0]
[docs] def numResidues(self):
"""Returns number of residues (or columns in the MSA), if MSA is
aligned."""
if self._aligned:
return self._msa.shape[1]
[docs] def numIndexed(self):
"""Returns number of sequences that are indexed using the identifier
part or all of their labels. The return value should be equal to
number of sequences."""
count = len(self._mapping)
if len(self._msa) == count:
return count
else:
count = len(self._mapping)
for val in self._mapping.values():
try:
count += len(val) - 1
except TypeError:
pass
return count
[docs] def getTitle(self):
"""Returns title of the instance."""
return self._title
[docs] def setTitle(self, title):
"""Set title of the instance."""
self._title = str(title)
[docs] def getLabel(self, index, full=False):
"""Returns label of the sequence at given *index*. Residue numbers will
be removed from the sequence label, unless *full* is **True**."""
index = self._mapping.get(index, index)
if full:
return self._labels[index]
else:
return splitSeqLabel(self._labels[index])[0]
[docs] def getResnums(self, index):
"""Returns starting and ending residue numbers (:term:`resnum`) for the
sequence at given *index*."""
index = self._mapping.get(index, index)
return splitSeqLabel(self._labels[index])[1:]
[docs] def getArray(self):
"""Returns a copy of the MSA character array."""
return self._msa.copy()
def _getArray(self):
"""Returns MSA character array."""
return self._msa
[docs] def getIndex(self, label):
"""Returns index of the sequence that *label* maps onto. If *label*
maps onto multiple sequences or *label* is a list of labels, a list
of indices is returned. If an index for a label is not found,
return **None**."""
try:
index = self._mapping[label]
except KeyError:
try:
return [v for k, v in self._mapping.items() if label in k][0]
except:
return None
except TypeError:
mapping = self._mapping
indices = []
append, extend = indices.append, indices.extend
for key in label:
try:
index = mapping[key]
except KeyError:
return None
try:
extend(index)
except TypeError:
append(index)
return indices
else:
try:
return list(index)
except TypeError:
return index
[docs] def iterLabels(self, full=False):
"""Yield sequence labels. By default the part of the label used for
indexing sequences is yielded."""
if full:
for label in self._labels:
yield label
else:
for label in self._labels:
yield splitSeqLabel(label)[0]
[docs] def getLabels(self, full=False):
"""Returns all labels"""
labels = [label for label in self.iterLabels(full=full)]
return labels
[docs] def countLabel(self, label):
"""Returns the number of sequences that *label* maps onto."""
try:
return len(self._mapping[label])
except KeyError:
return 0
except TypeError:
return 1
[docs]def refineMSA(msa, index=None, label=None, rowocc=None, seqid=None, colocc=None, **kwargs):
"""Refine *msa* by removing sequences (rows) and residues (columns) that
contain gaps.
:arg msa: multiple sequence alignment
:type msa: :class:`.MSA`
:arg index: remove columns that are gaps in the sequence with that index
:type index: int
:arg label: remove columns that are gaps in the sequence matching label,
``msa.getIndex(label)`` must return a sequence index, a PDB identifier
is also acceptable
:type label: str
:arg rowocc: row occupancy, sequences with less occupancy will be
removed after *label* refinement is applied
:type rowocc: float
:arg seqid: keep unique sequences at specified sequence identity level,
unique sequences are identified using :func:`.uniqueSequences`
:type seqid: float
:arg colocc: column occupancy, residue positions with less occupancy
will be removed after other refinements are applied
:type colocc: float
:arg keep: keep columns corresponding to residues not resolved in the PDB
structure, default is **False**, applies when *label* is a PDB
identifier
:arg type: bool
For Pfam MSA data, *label* is UniProt entry name for the protein. You may
also use PDB structure and chain identifiers, e.g. ``'1p38'`` or
``'1p38A'``, for *label* argument and UniProt entry names will be parsed
using :func:`.parsePDBHeader` function (see also :class:`.Polymer` and
:class:`.DBRef`).
The order of refinements are applied in the order of arguments. If *label*
and *unique* is specified, sequence matching *label* will
be kept in the refined :class:`.MSA` although it may be similar to some
other sequence."""
# if msa is a char array, it will be refined but label won't work
try:
ndim, dtype_ = msa.ndim, msa.dtype
except AttributeError:
try:
arr = msa._getArray()
except AttributeError:
raise TypeError('msa must be a character array or an MSA instance')
ndim, dtype_ = arr.ndim, arr.dtype
else:
arr, msa = msa, None
if dtype('|S1') != dtype_:
raise ValueError('msa must be a character array or an MSA instance')
if ndim != 2:
raise ValueError('msa must be a 2D array or an MSA instance')
title = []
cols = None
if index is not None:
before = arr.shape[1]
LOGGER.timeit('_refine')
cols = char.isalpha(arr[index]).nonzero()[0]
arr = arr.take(cols, 1)
title.append('index=' + str(index))
LOGGER.report('Index refinement reduced number of columns from {0} to '
'{1} in %.2fs.'.format(before, arr.shape[1]), '_refine')
if label is not None:
if index is not None:
LOGGER.info('An index was provided so the label will be ignored.')
else:
before = arr.shape[1]
LOGGER.timeit('_refine')
try:
upper, lower = label.upper(), label.lower()
except AttributeError:
raise TypeError('label must be a string')
if msa is None:
raise TypeError('msa must be an MSA instance, '
'label cannot be used')
index = msa.getIndex(label)
if index is None:
index = msa.getIndex(upper)
if index is None:
index = msa.getIndex(lower)
chain = None
if index is None and (len(label) == 4 or len(label) == 5):
from prody import parsePDB
try:
structure, header = parsePDB(label[:4], header=True)
except Exception as err:
raise IOError('failed to parse header for {0} ({1})'
.format(label[:4], str(err)))
chid = label[4:].upper()
for poly in header['polymers']:
if chid and poly.chid != chid:
continue
for dbref in poly.dbrefs:
if index is None:
index = msa.getIndex(dbref.idcode)
if index is not None:
LOGGER.info('{0} idcode {1} for {2}{3} '
'is found in chain {4}.'.format(
dbref.database, dbref.idcode,
label[:4], poly.chid, str(msa)))
break
if index is None:
index = msa.getIndex(dbref.accession)
if index is not None:
LOGGER.info('{0} accession {1} for {2}{3} '
'is found in chain {4}.'.format(
dbref.database, dbref.accession,
label[:4], poly.chid, str(msa)))
break
if index is not None:
chain = structure[poly.chid]
resnums = chain.ca.getResnums()
if index is None:
raise ValueError('label is not in msa, or msa is not indexed')
try:
len(index)
except TypeError:
pass
else:
raise ValueError('label {0} maps onto multiple sequences, '
'so cannot be used for refinement'.format(label))
title.append('label=' + label)
cols = char.isalpha(arr[index]).nonzero()[0]
arr = arr.take(cols, 1)
LOGGER.report('Label refinement reduced number of columns from {0} to '
'{1} in %.2fs.'.format(before, arr.shape[1]), '_refine')
if chain is not None and not kwargs.get('keep', False):
before = arr.shape[1]
LOGGER.timeit('_refine')
from Bio import pairwise2
from prody.utilities import MATCH_SCORE, MISMATCH_SCORE
from prody.utilities import GAP_PENALTY, GAP_EXT_PENALTY, ALIGNMENT_METHOD
chseq = chain.getSequence()
algn = pairwise2.align.localms(pystr(arr[index].tostring().upper()), pystr(chseq),
MATCH_SCORE, MISMATCH_SCORE,
GAP_PENALTY, GAP_EXT_PENALTY,
one_alignment_only=1)
torf = []
for s, c in zip(*algn[0][:2]):
if s == '-':
continue
elif c != '-':
torf.append(True)
else:
torf.append(False)
torf = array(torf)
tsum = torf.sum()
assert tsum <= before, 'problem in mapping sequence to structure'
if tsum < before:
arr = arr.take(torf.nonzero()[0], 1)
resnums = resnums.take(torf.nonzero()[0]-torf.nonzero()[0][0]+1)
LOGGER.report('Structure refinement reduced number of '
'columns from {0} to {1} in %.2fs.'
.format(before, arr.shape[1]), '_refine')
else:
LOGGER.debug('All residues in the sequence are contained in '
'PDB structure {0}.'.format(label))
labels = msa._labels
labels[index] = splitSeqLabel(labels[index])[0] + '/' + str(resnums[0]) + '-' + str(resnums[-1])
from .analysis import calcMSAOccupancy, uniqueSequences
rows = None
if rowocc is not None:
before = arr.shape[0]
LOGGER.timeit('_refine')
try:
rowocc = float(rowocc)
except Exception as err:
raise TypeError('rowocc must be a float ({0})'.format(str(err)))
assert 0. <= rowocc <= 1., 'rowocc must be between 0 and 1'
rows = calcMSAOccupancy(arr, 'row') >= rowocc
if index is not None:
index = rows[:index].sum()
rows = (rows).nonzero()[0]
arr = arr[rows]
title.append('rowocc>=' + str(rowocc))
LOGGER.report('Row occupancy refinement reduced number of rows from '
'{0} to {1} in %.2fs.'.format(before, arr.shape[0]),
'_refine')
if seqid is not None:
before = arr.shape[0]
LOGGER.timeit('_refine')
unique = uniqueSequences(arr, seqid)
if index is not None:
unique[index] = True
unique = unique.nonzero()[0]
arr = arr[unique]
title.append('seqid>=' + str(seqid))
if rows is not None:
rows = rows[unique]
else:
rows = unique
LOGGER.report('Sequence identity refinement reduced number of rows '
'from {0} to {1} in %.2fs.'.format(before, arr.shape[0]),
'_refine')
if colocc is not None:
before = arr.shape[1]
LOGGER.timeit('_refine')
try:
colocc = float(colocc)
except Exception as err:
raise TypeError('colocc must be a float ({0})'.format(str(err)))
assert 0. <= colocc <= 1., 'colocc must be between 0 and 1'
cols = (calcMSAOccupancy(arr, 'col') >= colocc).nonzero()[0]
arr = arr.take(cols, 1)
title.append('colocc>=' + str(colocc))
LOGGER.report('Column occupancy refinement reduced number of columns '
'from {0} to {1} in %.2fs.'.format(before, arr.shape[1]),
'_refine')
if not title:
raise ValueError('label, index, seqid, rowocc, colocc all cannot be None')
# depending on slicing of rows, arr may not have it's own memory
if arr.base is not None:
arr = arr.copy()
if msa is None:
return arr
else:
if rows is None:
from copy import copy
labels = copy(msa._labels)
else:
labels = msa._labels
labels = [labels[i] for i in rows]
return MSA(arr, title=msa.getTitle() + ' refined ({0})'
.format(', '.join(title)), labels=labels)
[docs]def mergeMSA(*msa, **kwargs):
"""Returns an :class:`.MSA` obtained from merging parts of the sequences
of proteins present in multiple *msa* instances. Sequences are matched
based on protein identifiers found in the sequence labels. Order of
sequences in the merged MSA will follow the order of sequences in the
first *msa* instance. Note that protein identifiers that map to multiple
sequences will be excluded."""
if len(msa) <= 1:
raise ValueError('more than one msa instances are needed')
try:
arrs = [m._getArray() for m in msa]
sets = []
for m in msa:
aset = set([])
count = m.countLabel
for label in m.iterLabels():
if count(label) == 1:
aset.add(label)
sets.append(aset)
except AttributeError:
raise TypeError('all msa arguments must be MSA instances')
sets = iter(sets)
common = next(sets)
for aset in sets:
common = common.intersection(aset)
if not common:
return None
lens = [m.numResidues() for m in msa]
rngs = [0]
rngs.extend(cumsum(lens))
rngs = [(start, end) for start, end in zip(rngs[:-1], rngs[1:])]
idx_arr_rng = list(zip([m.getIndex for m in msa], arrs, rngs))
merger = zeros((len(common), sum(lens)), '|S1')
labels = []
for index, label in enumerate(common):
if label not in common:
continue
for idx, arr, (start, end) in idx_arr_rng:
merger[index, start:end] = arr[idx(label)]
labels.append(label)
merger = MSA(merger, labels=labels,
title=' + '.join([m.getTitle() for m in msa]))
return merger
[docs]def specMergeMSA(*msa, **kwargs):
"""Returns an :class:`.MSA` obtained from merging parts of the sequences
of proteins present in multiple *msa* instances. Sequences are matched
based on species section of protein identifiers found in the sequence labels.
Order of sequences in the merged MSA will follow the order of sequences in the
first *msa* instance. Note that protein identifiers that map to multiple
sequences will be excluded."""
if len(msa) <= 1:
raise ValueError('more than one msa instances are needed')
lbl={}
try:
arrs = [m._getArray() for m in msa]
sets = []
labells = []
for m in msa:
aset = set([])
labell = {}
count = m.countLabel
for label in m.iterLabels():
lbl[label]=label.rsplit('_')[1]
if count(label) == 1 and lbl[label] not in aset:
aset.add(lbl[label])
labell[lbl[label]]=label
sets.append(aset)
labells.append(labell)
except AttributeError:
raise TypeError('all msa arguments must be MSA instances')
sets = iter(sets)
common = next(sets)
for aset in sets:
common = common.intersection(aset)
if not common:
return None
lens = [m.numResidues() for m in msa]
rngs = [0]
rngs.extend(cumsum(lens))
rngs = [(start, end) for start, end in zip(rngs[:-1], rngs[1:])]
idx_arr_rng = list(zip([m.getIndex for m in msa], arrs, rngs))
merger = zeros((len(common), sum(lens)), '|S1')
labels = []
for index, lbl in enumerate(common):
for idx, arr, (start, end) in idx_arr_rng:
merger[index, 0:start]=list(str(msa[0][msa[0].getIndex(labells[0][lbl])]))
merger[index, start:end]=list(str(msa[1][msa[1].getIndex(labells[1][lbl])]))
label = labells[0][lbl]
labels.append(label)
merger = MSA(merger, labels=labels,
title=' + '.join([m.getTitle() for m in msa]))
return merger