# -*- coding: utf-8 -*-
"""This module defines functions for accessing wwPDB servers."""
from os import getcwd
from glob import glob
from os.path import sep as pathsep
from os.path import isdir, isfile, join, split, splitext, normpath
from prody import LOGGER, SETTINGS
from prody.utilities import makePath, gunzip, relpath, copyFile, openURL
from prody.utilities import sympath, checkIdentifiers
__all__ = ['wwPDBServer', 'fetchPDBviaFTP', 'fetchPDBviaHTTP', 'WWPDB_FTP_SERVERS']
_WWPDB_RCSB = ('RCSB PDB (USA)', 'ftp.wwpdb.org', '/pub/')
_WWPDB_PDBe = ('PDBe (Europe)', 'ftp.ebi.ac.uk', '/pub/databases/rcsb/')
_WWPDB_PDBj = ('PDBj (Japan)', 'pdb.protein.osaka-u.ac.jp', '/pub/')
WWPDB_FTP_SERVERS = {
'rcsb' : _WWPDB_RCSB,
'usa' : _WWPDB_RCSB,
'us' : _WWPDB_RCSB,
'pdbe' : _WWPDB_PDBe,
'euro' : _WWPDB_PDBe,
'europe' : _WWPDB_PDBe,
'eu' : _WWPDB_PDBe,
'pdbj' : _WWPDB_PDBj,
'japan' : _WWPDB_PDBj,
'jp' : _WWPDB_PDBj,
}
# _URL_US = lambda pdb: ('https://files.rcsb.org/pub/pdb/data/structures/all/pdb/pdb%s.ent.gz' %
# pdb.lower())
_URL_US = lambda pdb: ('https://files.rcsb.org/download/%s.pdb.gz' %
pdb.upper())
_URL_EU = lambda pdb: ('http://www.ebi.ac.uk/pdbe-srv/view/files/%s.ent.gz' %
pdb.lower())
_URL_JP = lambda pdb: ('http://www.pdbj.org/pdb_all/pdb%s.ent.gz' %
pdb.lower())
WWPDB_HTTP_URL = {
'rcsb' : _URL_US,
'usa' : _URL_US,
'us' : _URL_US,
'pdbe' : _URL_EU,
'euro' : _URL_EU,
'europe' : _URL_EU,
'eu' : _URL_EU,
'pdbj' : _URL_JP,
'japan' : _URL_JP,
'jp' : _URL_JP,
}
[docs]def wwPDBServer(*key):
"""Set/get `wwPDB`_ FTP/HTTP server location used for downloading PDB
structures. Use one of the following keywords for setting a server:
+---------------------------+-----------------------------+
| wwPDB FTP server | *Key* (case insensitive) |
+===========================+=============================+
| RCSB PDB (USA) (default) | RCSB, USA, US |
+---------------------------+-----------------------------+
| PDBe (Europe) | PDBe, Europe, Euro, EU |
+---------------------------+-----------------------------+
| PDBj (Japan) | PDBj, Japan, Jp |
+---------------------------+-----------------------------+
.. _wwPDB: http://www.wwpdb.org/"""
if not key:
return SETTINGS.get('wwpdb', None)
elif len(key) == 1:
try:
key = key[0].lower()
except AttributeError:
raise TypeError('key must be a string')
if key in WWPDB_FTP_SERVERS:
SETTINGS['wwpdb'] = key
SETTINGS.save()
LOGGER.info('wwPDB server is set to {}.'
.format(WWPDB_FTP_SERVERS[key][0]))
else:
raise ValueError('{0} is not a valid wwPDB server identifier'
.format(repr(key)))
else:
raise TypeError('one wwPDB server identifier is expected, {0} given'
.format(len(key)))
[docs]def fetchPDBviaFTP(*pdb, **kwargs):
"""Retrieve PDB (default), PDBML, mmCIF, or EMD file(s) for specified *pdb*
identifier(s) and return path(s). Downloaded files will be stored in
local PDB folder, if one is set using :meth:`.pathPDBFolder`, and copied
into *folder*, if specified by the user. If no destination folder is
specified, files will be saved in the current working directory. If
*compressed* is **False**, decompressed files will be copied into
*folder*. *format* keyword argument can be used to retrieve
`PDBML <http://pdbml.pdb.org/>`_, `mmCIF <http://mmcif.pdb.org/>`_
and `PDBML <ftp://ftp.wwpdb.org/pub/emdb/doc/Map-format/current/EMDB_map_format.pdf>`_
files: ``format='cif'`` will fetch an mmCIF file, ``format='emd'`` will fetch an EMD file,
and ``format='xml'`` will fetch a PDBML file.
If PDBML header file is desired, ``noatom=True`` argument will do the job."""
format = str(kwargs.pop('format', 'pdb')).lower()
if kwargs.get('check', True):
identifiers = checkIdentifiers(*pdb, format=format)
else:
identifiers = list(pdb)
output_folder = kwargs.pop('folder', None)
compressed = bool(kwargs.pop('compressed', True))
noatom = bool(kwargs.pop('noatom', False))
if format == 'pdb':
ftp_divided = 'pdb/data/structures/divided/pdb'
ftp_pdbext = '.ent.gz'
ftp_prefix = 'pdb'
extension = '.pdb'
elif format == 'xml':
if noatom:
ftp_divided = 'pdb/data/structures/divided/XML-noatom'
ftp_pdbext = '-noatom.xml.gz'
extension = '-noatom.xml'
else:
ftp_divided = 'pdb/data/structures/divided/XML'
ftp_pdbext = '.xml.gz'
extension = '.xml'
ftp_prefix = ''
elif format == 'cif':
ftp_divided = 'pdb/data/structures/divided/mmCIF'
ftp_pdbext = '.cif.gz'
ftp_prefix = ''
extension = '.cif'
elif format == 'emd' or format == 'map':
ftp_divided = 'emdb/structures'
ftp_pdbext = '.map.gz'
ftp_prefix = 'emd_'
extension = '.map'
else:
raise ValueError(repr(format) + ' is not valid format')
local_folder = pathPDBFolder()
if format == 'pdb' and local_folder:
local_folder, is_divided = local_folder
if is_divided:
getPath = lambda pdb: join(makePath(join(local_folder, pdb[1:3])),
'pdb' + pdb + '.pdb.gz')
else:
getPath = lambda pdb: join(local_folder, pdb + '.pdb.gz')
if output_folder is None:
second = lambda filename, pdb: filename
else:
if compressed:
second = lambda filename, pdb: (copyFile(filename,
join(output_folder, pdb + extension + '.gz')))
else:
second = lambda filename, pdb: gunzip(filename,
join(output_folder, pdb + extension))
else:
if output_folder is None:
output_folder = getcwd()
if compressed:
getPath = lambda pdb: join(output_folder, pdb + extension + '.gz')
second = lambda filename, pdb: filename
else:
getPath = lambda pdb: join(output_folder, pdb + extension)
second = lambda filename, pdb: gunzip(getPath(pdb), getPath(pdb))
ftp_name, ftp_host, ftp_path = WWPDB_FTP_SERVERS[wwPDBServer() or 'us']
LOGGER.debug('Connecting wwPDB FTP server {0}.'.format(ftp_name))
from ftplib import FTP
try:
ftp = FTP(ftp_host)
except Exception as error:
raise type(error)('FTP connection problem, potential reason: '
'no internet connectivity')
else:
success = 0
failure = 0
filenames = []
ftp.login('')
for pdb in identifiers:
if pdb is None:
filenames.append(None)
continue
data = []
ftp_fn = ftp_prefix + pdb + ftp_pdbext
try:
ftp.cwd(ftp_path)
ftp.cwd(ftp_divided)
if format == 'emd':
ftp.cwd('EMD-{0}/map'.format(pdb))
else:
ftp.cwd(pdb[1:3])
ftp.retrbinary('RETR ' + ftp_fn, data.append)
except Exception as error:
if ftp_fn in ftp.nlst():
LOGGER.warn('{0} download failed ({1}). It is '
'possible that you do not have rights to '
'download .gz files in the current network.'
.format(pdb, str(error)))
else:
LOGGER.info('{0} download failed. {1} does not exist '
'on {2}.'.format(ftp_fn, pdb, ftp_host))
failure += 1
filenames.append(None)
else:
if len(data):
filename = getPath(pdb)
with open(filename, 'w+b') as pdbfile:
write = pdbfile.write
[write(block) for block in data]
filename = normpath(relpath(second(filename, pdb)))
LOGGER.debug('{0} downloaded ({1})'
.format(pdb, sympath(filename)))
success += 1
filenames.append(filename)
else:
LOGGER.warn('{0} download failed, reason unknown.'
.format(pdb))
failure += 1
filenames.append(None)
ftp.quit()
LOGGER.debug('PDB download via FTP completed ({0} downloaded, '
'{1} failed).'.format(success, failure))
if len(identifiers) == 1:
return filenames[0]
else:
return filenames
[docs]def fetchPDBviaHTTP(*pdb, **kwargs):
"""Retrieve PDB file(s) for specified *pdb* identifier(s) and return
path(s). Downloaded files will be stored in local PDB folder, if one
is set using :meth:`.pathPDBFolder`, and copied into *folder*, if
specified by the user. If no destination folder is specified, files
will be saved in the current working directory. If *compressed* is
**False**, decompressed files will be copied into *folder*."""
if kwargs.get('check', True):
identifiers = checkIdentifiers(*pdb)
else:
identifiers = list(pdb)
output_folder = kwargs.pop('folder', None)
compressed = bool(kwargs.pop('compressed', True))
extension = '.pdb'
local_folder = pathPDBFolder()
if local_folder:
local_folder, is_divided = local_folder
if is_divided:
getPath = lambda pdb: join(makePath(join(local_folder, pdb[1:3])),
'pdb' + pdb + '.pdb.gz')
else:
getPath = lambda pdb: join(local_folder, pdb + '.pdb.gz')
if output_folder is None:
second = lambda filename, pdb: filename
else:
if compressed:
second = lambda filename, pdb: (copyFile(filename,
join(output_folder, pdb + extension + '.gz')))
else:
second = lambda filename, pdb: gunzip(filename,
join(output_folder, pdb + extension))
else:
if output_folder is None:
output_folder = getcwd()
if compressed:
getPath = lambda pdb: join(output_folder, pdb + extension + '.gz')
second = lambda filename, pdb: filename
else:
getPath = lambda pdb: join(output_folder, pdb + extension)
second = lambda filename, pdb: gunzip(getPath(pdb), getPath(pdb))
getURL = WWPDB_HTTP_URL[wwPDBServer() or 'us']
success = 0
failure = 0
filenames = []
for pdb in identifiers:
if pdb is None:
filenames.append(None)
continue
try:
handle = openURL(getURL(pdb))
except Exception as err:
LOGGER.warn('{0} download failed ({1}).'.format(pdb, str(err)))
failure += 1
filenames.append(None)
else:
data = handle.read()
if len(data):
filename = getPath(pdb)
with open(filename, 'w+b') as pdbfile:
pdbfile.write(data)
filename = normpath(relpath(second(filename, pdb)))
LOGGER.debug('{0} downloaded ({1})'
.format(pdb, sympath(filename)))
success += 1
filenames.append(filename)
else:
LOGGER.warn('{0} download failed, reason unknown.'
.format(pdb))
failure += 1
filenames.append(None)
LOGGER.debug('PDB download via HTTP completed ({0} downloaded, '
'{1} failed).'.format(success, failure))
if len(identifiers) == 1:
return filenames[0]
else:
return filenames
if __name__ == '__main__':
pdbids = ['1mkp', '1zz2', 'nano']
for gzip in [False, True]:
fetchPDBviaFTP(*pdbids, compressed=gzip, folder='.')
fetchPDBviaFTP(*pdbids, compressed=gzip, folder='.', format='cif')
fetchPDBviaFTP(*pdbids, compressed=gzip, folder='.', format='xml')
fetchPDBviaFTP(*pdbids, compressed=gzip, folder='.', format='xml',
noatom=1)
fetchPDBviaHTTP(*pdbids, compressed=gzip, folder='.')
from glob import glob
from os import remove
for pdb in pdbids:
fns = glob(pdb + '.*')
print((pdb, '>', ', '.join(fns)))
for fn in fns:
remove(fn)