Source code for prody.proteins.localpdb

# -*- coding: utf-8 -*-
"""This module defines functions for handling local PDB folders."""

from glob import glob, iglob
from os.path import sep as pathsep
from os.path import abspath, isdir, isfile, join, split, splitext, normpath

from prody import LOGGER, SETTINGS
from prody.utilities import makePath, gunzip, relpath, copyFile, isWritable
from prody.utilities import sympath, isListLike

from . import wwpdb
from .wwpdb import checkIdentifiers, fetchPDBviaFTP, fetchPDBviaHTTP


__all__ = ['pathPDBFolder', 'pathPDBMirror',
           'fetchPDB', 'fetchPDBfromMirror',
           'iterPDBFilenames', 'findPDBFiles',
           'fetchPDBs']

[docs]def pathPDBFolder(folder=None, divided=False): """Returns or specify local PDB folder for storing PDB files downloaded from `wwPDB <http://www.wwpdb.org/>`_ servers. Files stored in this folder can be accessed via :func:`.fetchPDB` from any working directory. To release the current folder, pass an invalid path, e.g. ``folder=''``. If *divided* is **True**, the divided folder structure of wwPDB servers will be assumed when reading from and writing to the local folder. For example, a structure with identifier **1XYZ** will be present as :file:`pdblocalfolder/yz/pdb1xyz.pdb.gz`. If *divided* is **False**, a plain folder structure will be expected and adopted when saving files. For example, the same structure will be present as :file:`pdblocalfolder/1xyz.pdb.gz`. Finally, in either case, lower case letters will be used and compressed files will be stored.""" if folder is None: folder = SETTINGS.get('pdb_local_folder') if folder: if isdir(folder): return folder, SETTINGS.get('pdb_local_divided', True) else: LOGGER.warn('PDB local folder {0} is not a accessible.' .format(repr(folder))) else: if isdir(folder): folder = abspath(folder) LOGGER.info('Local PDB folder is set: {0}'.format(repr(folder))) if divided: LOGGER.info('wwPDB divided folder structure will be assumed.') else: LOGGER.info('A plain folder structure will be assumed.') SETTINGS['pdb_local_folder'] = folder SETTINGS['pdb_local_divided'] = bool(divided) SETTINGS.save() else: current = SETTINGS.pop('pdb_local_folder') if current: LOGGER.info('PDB folder {0} is released.' .format(repr(current))) SETTINGS.pop('pdb_local_divided') SETTINGS.save() else: raise IOError('{0} is not a valid path.'.format(repr(folder)))
wwpdb.pathPDBFolder = pathPDBFolder
[docs]def pathPDBMirror(path=None, format=None): """Returns or specify PDB mirror path to be used by :func:`.fetchPDB`. To release the current mirror, pass an invalid path, e.g. ``path=''``. If you are keeping a partial mirror, such as PDB files in :file:`/data/structures/divided/pdb/` folder, specify *format*, which is ``'pdb'`` in this case.""" if path is None: path = SETTINGS.get('pdb_mirror_path') format = SETTINGS.get('pdb_mirror_format', None) if path: if isdir(path): if format is None: return path else: return path, format else: LOGGER.warning('PDB mirror path {0} is not a accessible.' .format(repr(path))) else: if isdir(path): path = abspath(path) LOGGER.info('Local PDB mirror path is set: {0}' .format(repr(path))) SETTINGS['pdb_mirror_path'] = path SETTINGS['pdb_mirror_format'] = format SETTINGS.save() else: current = SETTINGS.pop('pdb_mirror_path') if current: LOGGER.info('PDB mirror {0} is released.' .format(repr(current))) SETTINGS.save() else: raise IOError('{0} is not a valid path.'.format(repr(path)))
[docs]def fetchPDBfromMirror(*pdb, **kwargs): """Returns path(s) to PDB (default), PDBML, or mmCIF file(s) for specified *pdb* identifier(s). If a *folder* is specified, files will be copied into this folder. If *compressed* is **False**, files will decompressed. *format* argument can be used to get `PDBML <http://pdbml.pdb.org/>`_ and `mmCIF <http://mmcif.pdb.org/>`_ files: ``format='cif'`` will fetch an mmCIF file, and ``format='xml'`` will fetch a PDBML file. If PDBML header file is desired, ``noatom=True`` argument will do the job.""" mirror = pathPDBMirror() if mirror is None: raise IOError('no mirror path is set') try: mirror, mirror_format = mirror except ValueError: mirror_format = None format = str(kwargs.pop('format', 'pdb')).lower() if kwargs.get('check', True): identifiers = checkIdentifiers(*pdb) else: identifiers = list(pdb) if format == 'pdb': ftp_divided = 'data/structures/divided/pdb' ftp_pdbext = '.ent.gz' ftp_prefix = 'pdb' extension = '.pdb' elif format == 'xml': if bool(kwargs.pop('noatom', False)): ftp_divided = 'data/structures/divided/XML-noatom' ftp_pdbext = '-noatom.xml.gz' extension = '-noatom.xml' else: ftp_divided = 'data/structures/divided/XML' ftp_pdbext = '.xml.gz' extension = '.xml' ftp_prefix = '' elif format == 'cif': ftp_divided = 'data/structures/divided/mmCIF' ftp_pdbext = '.cif.gz' ftp_prefix = '' extension = '.cif' else: if format: raise ValueError('{0} is not a recognized format' .format(repr(format))) else: raise ValueError('please specify a valid format') if mirror_format: if mirror_format.lower() != format: raise IOError('mirror contains only ' + mirror_format + ' files') ftp_divided = '' else: ftp_divided = join(*ftp_divided.split('/')) folder = kwargs.get('folder') compressed = kwargs.get('compressed', True) filenames = [] append = filenames.append success = 0 failure = 0 for pdb in identifiers: if pdb is None: append(None) continue fn = join(mirror, ftp_divided, pdb[1:3], ftp_prefix + pdb + ftp_pdbext) if isfile(fn): if folder or not compressed: if compressed: fn = copyFile(fn, join(folder or '.', pdb + extension + '.gz')) else: fn = gunzip(fn, join(folder or '.', pdb + extension)) append(normpath(fn)) success += 1 else: append(None) failure += 1 if len(identifiers) == 1: fn = filenames[0] if success: LOGGER.debug('PDB file is found in the local mirror ({0}).' .format(sympath(fn))) return fn else: LOGGER.debug('PDB files found in the local mirror ({0} found, ' '{1} missed).'.format(success, failure)) return filenames
[docs]def fetchPDB(*pdb, **kwargs): """Returns path(s) to PDB file(s) for specified *pdb* identifier(s). Files will be sought in user specified *folder* or current working directory, and then in local PDB folder and mirror, if they are available. If *copy* is set **True**, files will be copied into *folder*. If *compressed* is **False**, all files will be decompressed into *folder*. See :func:`pathPDBFolder` and :func:`pathPDBMirror` for managing local resources, :func:`.fetchPDBviaFTP` and :func:`.fetchPDBviaHTTP` for downloading files from PDB servers.""" if len(pdb) == 1 and isinstance(pdb[0], list): pdb = pdb[0] if 'format' in kwargs and kwargs.get('format') != 'pdb': return fetchPDBviaFTP(*pdb, **kwargs) identifiers = checkIdentifiers(*pdb) folder = kwargs.get('folder', '.') compressed = kwargs.get('compressed') # check *folder* specified by the user, usually pwd ('.') filedict = findPDBFiles(folder, compressed=compressed) filenames = [] not_found = [] exists = 0 for i, pdb in enumerate(identifiers): if pdb is None: filenames.append(None) elif pdb in filedict: filenames.append(filedict[pdb]) exists += 1 else: filenames.append(None) not_found.append((i, pdb)) if not not_found: if len(filenames) == 1: filenames = filenames[0] if exists: LOGGER.debug('PDB file is found in working directory ({0}).' .format(sympath(filenames))) return filenames if not isWritable(folder): raise IOError('permission to write in {0} is denied, please ' 'specify another folder'.format(folder)) if compressed is not None and not compressed: filedict = findPDBFiles(folder, compressed=True) not_found, decompress = [], not_found for i, pdb in decompress: if pdb in filedict: fn = filedict[pdb] filenames[i] = gunzip(fn, splitext(fn)[0]) else: not_found.append((i, pdb)) if not not_found: return filenames[0] if len(identifiers) == 1 else filenames local_folder = pathPDBFolder() copy = kwargs.setdefault('copy', False) if local_folder: local_folder, is_divided = local_folder temp, not_found = not_found, [] for i, pdb in temp: if is_divided: fn = join(local_folder, pdb[1:3], 'pdb' + pdb + '.pdb.gz') else: fn = join(local_folder, pdb + '.pdb.gz') if isfile(fn): if copy or not compressed and compressed is not None: if compressed: fn = copyFile(fn, join(folder, pdb + 'pdb.gz')) else: fn = gunzip(fn, join(folder, pdb + '.pdb')) filenames[i] = normpath(fn) else: not_found.append((i, pdb)) if not not_found: if len(identifiers) == 1: fn = filenames[0] items = fn.split(pathsep) if len(items) > 5: fndisp = pathsep.join(items[:3] + ['...'] + items[-1:]) else: fndisp = relpath(fn) LOGGER.debug('PDB file is found in the local folder ({0}).' .format(fndisp)) return fn else: return filenames if kwargs['copy'] or (compressed is not None and not compressed): kwargs['folder'] = folder downloads = [pdb for i, pdb in not_found] fns = None try: fns = fetchPDBfromMirror(*downloads, **kwargs) except IOError: pass else: if len(downloads) == 1: fns = [fns] temp, not_found = not_found, [] for i, fn in enumerate(fns): if fn is None: not_found.append(temp[i]) else: i, _ = temp[i] filenames[i] = fn if not not_found: return filenames[0] if len(identifiers) == 1 else filenames if fns: downloads = [pdb for i, pdb in not_found] fns = None tp = kwargs.pop('tp', None) if tp is not None: tp = tp.lower() if tp == 'http': try: fns = fetchPDBviaHTTP(*downloads, check=False, **kwargs) except Exception as err: LOGGER.warn('Downloading PDB files via HTTP failed ' '({0}).'.format(str(err))) elif tp == 'ftp': try: fns = fetchPDBviaFTP(*downloads, check=False, **kwargs) except Exception as err: LOGGER.warn('Downloading PDB files via FTP failed ' '({0}).'.format(str(err))) else: tryHTTP = False try: fns = fetchPDBviaFTP(*downloads, check=False, **kwargs) except Exception as err: tryHTTP = True if fns is None or isinstance(fns, list) and None in fns: tryHTTP = True elif isinstance(fns, list): downloads = [not_found[i][1] for i in range(len(fns)) if fns[i] is None] if len(downloads) > 0: tryHTTP = True if tryHTTP: LOGGER.info('Downloading PDB files via FTP failed, ' 'trying HTTP.') try: fns = fetchPDBviaHTTP(*downloads, check=False, **kwargs) except Exception as err: LOGGER.warn('Downloading PDB files via HTTP also failed ' '({0}).'.format(str(err))) if len(downloads) == 1: fns = [fns] if fns: for i, fn in zip([i for i, pdb in not_found], fns): filenames[i] = fn return filenames[0] if len(identifiers) == 1 else filenames
[docs]def fetchPDBs(*pdb, **kwargs): """"Wrapper function to fetch multiple files from the PDB. If no format is given, it tries PDB then mmCIF then EMD. :arg pdb: one PDB identifier or filename, or a list of them. If needed, PDB files are downloaded using :func:`.fetchPDB()` function. """ n_pdb = len(pdb) if n_pdb == 0: raise ValueError('Please provide a PDB ID or filename') if n_pdb == 1: if isListLike(pdb[0]): pdb = pdb[0] n_pdb = len(pdb) fnames = [] for p in pdb: format = kwargs.pop('format', None) if format is not None: filename = fetchPDB(p, format=format, **kwargs) else: filename = fetchPDB(p, **kwargs) if filename is None: filename = fetchPDB(p, format='cif', **kwargs) if filename is None: filename = fetchPDB(p, format='emd', **kwargs) fnames.append(filename) return fnames
[docs]def iterPDBFilenames(path=None, sort=False, unique=True, **kwargs): """Yield PDB filenames in *path* specified by the user or in local PDB mirror (see :func:`.pathPDBMirror`). When *unique* is **True**, files one of potentially identical files will be yielded (e.g. :file:`1mkp.pdb` and :file:`pdb1mkp.ent.gz1`). :file:`.pdb` and :file:`.ent` extensions, and compressed files are considered.""" from re import compile, IGNORECASE if path is None or kwargs.get('mirror') is True: if path is None: path = pathPDBMirror() if path is None: raise ValueError('path must be specified or PDB mirror path ' 'must be set') if sort: pdbs = glob(join(path, 'data/structures/divided/pdb/', '*/*.ent.gz')) pdbs.sort(reverse=kwargs.get('reverse')) else: pdbs = iglob(join(path, 'data/structures/divided/pdb/', '*/*.ent.gz')) for fn in pdbs: yield fn else: unique=bool(unique) if unique: yielded = set() compressed = kwargs.get('compressed') if compressed is None: pdbext = compile('\.(pdb|ent)(\.gz)?$', IGNORECASE) elif compressed: pdbext = compile('\.(pdb|ent)\.gz$', IGNORECASE) else: pdbext = compile('\.(pdb|ent)$', IGNORECASE) pdbs = [pdb for pdb in iglob(join(path, '*')) if pdbext.search(pdb)] if sort: pdbs.sort(reverse=kwargs.get('reverse')) for fn in pdbs: if unique: pdb = splitext(splitext(split(fn)[1])[0])[0] if len(pdb) == 7 and pdb.startswith('pdb'): pdb = pdb[3:] if pdb in yielded: continue else: yielded.add(pdb) yield fn
[docs]def findPDBFiles(path, case=None, **kwargs): """Returns a dictionary that maps PDB filenames to file paths. If *case* is specified (``'u[pper]'`` or ``'l[ower]'``), dictionary keys (filenames) will be modified accordingly. If a PDB filename has :file:`pdb` prefix, it will be trimmed, for example ``'1mkp'`` will be mapped to file path :file:`./pdb1mkp.pdb.gz`). If a file is present with multiple extensions, only one of them will be returned. See also :func:`.iterPDBFilenames`.""" case = str(case).lower() upper = lower = False if case.startswith('u'): upper = True elif case.startswith('l'): lower = True pdbs = {} for fn in iterPDBFilenames(path, sort=True, reverse=True, **kwargs): fn = normpath(fn) pdb = splitext(splitext(split(fn)[1])[0])[0] if len(pdb) == 7 and pdb.startswith('pdb'): pdb = pdb[3:] if upper: pdbs[pdb.upper()] = fn elif lower: pdbs[pdb.lower()] = fn else: pdbs[pdb] = fn return pdbs