Source code for prody.proteins.blastpdb

# -*- coding: utf-8 -*-
"""This module defines functions for blast searching the Protein Data Bank."""

import os.path
import numpy as np
from prody import LOGGER, PY3K
from prody.utilities import dictElement, openURL, which, pystr
from prody.sequence import parseMSA, MSA, Sequence

import platform, os, re, sys, time, urllib

if PY3K:
    import urllib.parse as urllib
    import urllib.request as urllib2
else:
    import urllib
    import urllib2

from prody.atomic import Atomic
from prody.proteins.pdbfile import parsePDB

__all__ = ['PDBBlastRecord', 'blastPDB',]

[docs]def blastPDB(sequence, filename=None, **kwargs): """Returns a :class:`PDBBlastRecord` instance that contains results from blast searching *sequence* against the PDB using NCBI blastp. :arg sequence: an object with an associated sequence string or a sequence string itself :type sequence: :class:`Atomic`, :class:`Sequence`, or str :arg filename: a *filename* to save the results in XML format :type filename: str *hitlist_size* (default is ``250``) and *expect* (default is ``1e-10``) search parameters can be adjusted by the user. *sleep* keyword argument (default is ``2`` seconds) determines how long to wait to reconnect for results. Sleep time is multiplied by 1.5 when results are not ready. *timeout* (default is 120 s) determines when to give up waiting for the results. """ if sequence == 'runexample': sequence = ('ASFPVEILPFLYLGCAKDSTNLDVLEEFGIKYILNVTPNLPNLFENAGEFKYKQIPI' 'SDHWSQNLSQFFPEAISFIDEARGKNCGVLVHSLAGISRSVTVTVAYLMQKLNLSMN' 'DAYDIVKMKKSNISPNFNFMGQLLDFERTL') elif isinstance(sequence, Atomic): sequence = sequence.calpha.getSequence() elif isinstance(sequence, Sequence): sequence = str(sequence) elif isinstance(sequence, str): if len(sequence) in [4, 5, 6]: ag = parsePDB(sequence) sequence = ag.calpha.getSequence() sequence = ''.join(sequence.split()) else: raise TypeError('sequence must be Atomic, Sequence, or str not {0}' .format(type(sequence))) return PDBBlastRecord(filename, sequence, **kwargs)
[docs]class PDBBlastRecord(object): """A class to store results from blast searches.""" def __init__(self, xml=None, sequence=None, **kwargs): """Instantiate a PDBBlastRecord object instance. :arg xml: blast search results in XML format or an XML file that contains the results :type xml: str :arg sequence: query sequence :type sequence: str """ if sequence: try: sequence = ''.join(sequence.split()) isalpha = sequence.isalpha() except AttributeError: raise TypeError('sequence must be a string') else: if not isalpha: raise ValueError('not a valid protein sequence') self._sequence = sequence self._xml = xml self._hits = None self.isSuccess = False self._timeout = kwargs.get('timeout', 120) self.isSuccess = self.fetch(**kwargs)
[docs] def fetch(self, xml=None, sequence=None, **kwargs): """Get Blast record from url or file. :arg sequence: an object with an associated sequence string or a sequence string itself :type sequence: :class:`Atomic`, :class:`Sequence`, or str :arg xml: blast search results in XML format or an XML file that contains the results or a filename for saving the results or None :type xml: str :arg timeout: amount of time until the query times out in seconds default value is 120 :type timeout: int """ if self.isSuccess: LOGGER.warn("The record already exists so not further search is performed") return True if sequence is None: sequence = self._sequence if xml is None: xml = self._xml import xml.etree.cElementTree as ET have_xml = False filename = None if xml is not None: if len(xml) < 100: # xml likely contains a filename if os.path.isfile(xml): # read the contents try: xml = ET.parse(xml) root = xml.getroot() have_xml = True except: raise ValueError('could not parse xml from xml file') else: # xml contains a filename for writing filename = xml else: try: if isinstance(xml, list): root = ET.fromstringlist(xml) elif isinstance(xml, str): root = ET.fromstring(xml) except: raise ValueError('xml is not a filename and does not look like' ' a valid XML string') else: have_xml = True if have_xml is False: # we still need to run a blast headers = {'User-agent': 'ProDy'} query = [('DATABASE', 'pdb'), ('ENTREZ_QUERY', '(none)'), ('PROGRAM', 'blastp'),] expect = float(kwargs.pop('expect', 10e-10)) if expect <= 0: raise ValueError('expect must be a positive number') query.append(('EXPECT', expect)) hitlist_size = int(kwargs.pop('hitlist_size', 250)) if hitlist_size <= 0: raise ValueError('expect must be a positive integer') query.append(('HITLIST_SIZE', hitlist_size)) query.append(('QUERY', sequence)) query.append(('CMD', 'Put')) sleep = float(kwargs.pop('sleep', 2)) timeout = float(kwargs.pop('timeout', self._timeout)) self._timeout = timeout try: import urllib.parse urlencode = lambda data: bytes(urllib.parse.urlencode(data), 'utf-8') except ImportError: from urllib import urlencode url = 'https://blast.ncbi.nlm.nih.gov/Blast.cgi' data = urlencode(query) LOGGER.timeit('_prody_blast') LOGGER.info('Blast searching NCBI PDB database for "{0}..."' .format(sequence[:5])) handle = openURL(url, data=data, headers=headers) html = handle.read() index = html.find(b'RID =') if index == -1: raise Exception('NCBI did not return expected response.') else: last = html.find(b'\n', index) rid = html[index + len('RID ='):last].strip() query = [('ALIGNMENTS', 500), ('DESCRIPTIONS', 500), ('FORMAT_TYPE', 'XML'), ('RID', rid), ('CMD', 'Get')] data = urlencode(query) while True: LOGGER.sleep(int(sleep), 'to reconnect to NCBI for search results.') LOGGER.write('Connecting to NCBI for search results...') handle = openURL(url, data=data, headers=headers) results = handle.read() index = results.find(b'Status=') LOGGER.clear() if index < 0: break last = results.index(b'\n', index) status = results[index+len('Status='):last].strip() if status.upper() == b'READY': break sleep = int(sleep * 1.5) if LOGGER.timing('_prody_blast') > timeout: LOGGER.warn('Blast search time out.') return False LOGGER.clear() LOGGER.report('Blast search completed in %.1fs.', '_prody_blast') root = ET.XML(results) try: ext_xml = filename.lower().endswith('.xml') except AttributeError: pass else: if not ext_xml: filename += '.xml' out = open(filename, 'w') if PY3K: out.write(results.decode()) else: out.write(results) out.close() LOGGER.info('Results are saved as {0}.'.format(repr(filename))) root = dictElement(root, 'BlastOutput_') if root['db'] != 'pdb': raise ValueError('blast search database in xml must be "pdb"') if root['program'] != 'blastp': raise ValueError('blast search program in xml must be "blastp"') self._param = dictElement(root['param'][0], 'Parameters_') query_len = int(root['query-len']) if sequence and len(sequence) != query_len: raise ValueError('query-len and the length of the sequence do not ' 'match, xml data may not be for given sequence') hits = [] for iteration in root['iterations']: for hit in dictElement(iteration, 'Iteration_')['hits']: hit = dictElement(hit, 'Hit_') data = dictElement(hit['hsps'][0], 'Hsp_') for key in ['align-len', 'gaps', 'hit-frame', 'hit-from', 'hit-to', 'identity', 'positive', 'query-frame', 'query-from', 'query-to']: data[key] = int(data[key]) data['query-len'] = query_len for key in ['evalue', 'bit-score', 'score']: data[key] = float(data[key]) p_identity = 100.0 * data['identity'] / (data['query-to'] - data['query-from'] + 1) data['percent_identity'] = p_identity p_overlap = (100.0 * (data['align-len'] - data['gaps']) / query_len) data['percent_coverage'] = p_overlap for item in (hit['id'] + hit['def']).split('>gi'): head, title = item.split(None, 1) head = head.split('|') pdb_id = head[-2].lower() chain_id = head[-1][:1] pdbch = dict(data) pdbch['pdb_id'] = pdb_id pdbch['chain_id'] = chain_id pdbch['title'] = (head[-1][1:] + title).strip() hits.append((p_identity, p_overlap, pdbch)) hits.sort(key=lambda hit: hit[0], reverse=True) self._hits = hits return True
[docs] def getSequence(self): """Returns the query sequence that was used in the search.""" return self._sequence
[docs] def getParameters(self): """Returns parameters used in blast search.""" return dict(self._param)
[docs] def getHits(self, percent_identity=0., percent_overlap=0., chain=False): """Returns a dictionary in which PDB identifiers are mapped to structure and alignment information. :arg percent_identity: PDB hits with percent sequence identity equal to or higher than this value will be returned, default is ``0.`` :type percent_identity: float :arg percent_overlap: PDB hits with percent coverage of the query sequence equivalent or better will be returned, default is ``0.`` :type percent_overlap: float :arg chain: if chain is **True**, individual chains in a PDB file will be considered as separate hits , default is **False** :type chain: bool""" assert isinstance(percent_identity, (float, int)), \ 'percent_identity must be a float or an integer' assert isinstance(percent_overlap, (float, int)), \ 'percent_overlap must be a float or an integer' assert isinstance(chain, bool), 'chain must be a boolean' hits = {} if self._hits is None: raise ValueError('There are no hits yet. Please fetch again.') for p_identity, p_overlap, hit in self._hits: if p_identity < percent_identity: break if p_overlap < percent_overlap: continue if chain: key = (hit['pdb_id'], hit['chain_id']) else: key = hit['pdb_id'] if not key in hits: hits[key] = hit return hits
[docs] def getBest(self): """Returns a dictionary containing structure and alignment information for the hit with highest sequence identity.""" return dict(self._hits[0][2])
[docs] def writeSequences(self, filename, **kwargs): """ Returns a plot that contains a dendrogram of the sequence similarities among the sequences in given hit list. :arg hits: A dictionary that contains hits that are obtained from a blast record object. :type hits: dict Arguments of getHits can be parsed as kwargs. """ if not filename.lower().endswith('.fasta'): filename += '.fasta' with open(filename,'w') as f_out: hits = self.getHits(**kwargs) for z in hits: # get keys f_out.write(">" + str(z) + "\n") f_out.write(hits[z]['hseq']) f_out.write("\n") return filename