Source code for FEV_KEGG.KEGG.Database

import re
import time
import urllib
from socket import timeout

from FEV_KEGG.lib.Biopython.KEGG.KGML import KGML_pathway, KGML_parser
import jsonpickle
from FEV_KEGG.KEGG.DataTypes import Gene, Substance, EcEnzyme
import tqdm
from typing import Set, List, Dict, Iterable, Tuple

from FEV_KEGG.Graph.Elements import GeneID, SubstanceID, EcNumber
from FEV_KEGG.KEGG import File, Download, SSDB
import FEV_KEGG.settings as settings
from FEV_KEGG.Util import Parallelism
import concurrent.futures
from FEV_KEGG.Util.Util import chunks
import math
from urllib.error import URLError


[docs]class NoKnownPathwaysError(ValueError): """ Raised if an organism has no known pathways and is therefore rather useless. """
[docs]class ImpossiblyOrthologousError(ValueError): """ Raised if trying to find orthologs in an organism using a GeneID from the very same organism. """
[docs]class GeneDoesNotExistError(ValueError): """ Raised if trying to download a certain gene that does not exist. """
[docs]def getPathwayDescriptions(organismAbbreviation: 'eco') -> Set[str]: """ Get full pathway descriptions for an organism. Downloads the data from KEGG, if not already present on disk. Parameters ---------- organismAbbreviation : str The organism for which to retrieve all known pathways. Returns ------- Set[str] Set of pathway description lines for given organism. Raises ------ NoKnownPathwaysError If the organism has no known pathways. URLError If connection to KEGG fails. """ fileName = 'organism/' + organismAbbreviation + '/pathway/list' debugOutput = 'Getting pathway list for ' + organismAbbreviation + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') pathwayList = File.readSetFromFileAtOnce(fileName) else: if settings.verbosity >= 3: print(debugOutput + 'download.') try: pathwayList = Download.downloadPathwayList(organismAbbreviation) except urllib.error.HTTPError as exception: if isinstance(exception, urllib.error.HTTPError) and exception.code == 404: # organism has no known pathways raise NoKnownPathwaysError('The organism \'' + organismAbbreviation + '\' has no known pathways.') else: raise File.writeToFile(pathwayList, fileName) return pathwayList
[docs]def getPathway(organismAbbreviation: 'eco', pathwayName: '00260') -> KGML_pathway.Pathway: """ Get certain pathway object of an organism. Downloads the data from KEGG, if not already present on disk. Parameters ---------- organismAbbreviation : str The organism for which to retrieve the pathway. pathwayName : str The code of the pathway, e.g. '00260'. Returns ------- KGML_pathway.Pathway Pathway object. *None* if pathway does not exist. Raises ------ URLError If connection to KEGG fails. """ fileName = 'organism/' + organismAbbreviation + '/pathway/' + pathwayName debugOutput = 'Getting pathway ' + pathwayName + ' for ' + organismAbbreviation + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') fileHandle = File.getFileHandleRead(fileName) pathway = KGML_parser.read(fileHandle) else: if settings.verbosity >= 3: print(debugOutput + 'download.') try: # certain pathways might not exist as KGML (HTTP error 404), ignore these and return None pathwayXml = Download.downloadPathway(organismAbbreviation, pathwayName) except urllib.error.HTTPError as exception: if isinstance(exception, urllib.error.HTTPError) and exception.code == 404: return None else: raise File.writeToFile(pathwayXml, fileName) pathway = KGML_parser.read(pathwayXml) return pathway
[docs]def getPathwayBulk(organismAbbreviation: 'eco', pathwayNames: Iterable[str]) -> Dict[str, KGML_pathway.Pathway]: """ Get multiple pathway objects of an organism. Downloads the data from KEGG in bulk, if not already present on disk. This is done in parallel in a thread pool, see :attr:`FEV_KEGG.settings.downloadThreads`. Parameters ---------- organismAbbreviation : str The organism for which to retrieve the pathway. pathwayNames : Iterable[str] The codes of the pathways, e.g. ['00260', '00530']. Returns ------- Dict[str, KGML_pathway.Pathway] Pathway objects, keyed by their respective pathway name. A pathway object is *None* if the pathway does not exist. Raises ------ URLError If connection to KEGG fails. """ # split list into pathways on disk and pathways not downloaded yet pathwaysOnDisk = [] pathwaysToDownload = [] for pathwayName in pathwayNames: fileName = 'organism/' + organismAbbreviation + '/pathway/' + pathwayName debugOutput = 'Getting pathway ' + pathwayName + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') pathwaysOnDisk.append(pathwayName) else: if settings.verbosity >= 3: print(debugOutput + 'download.') pathwaysToDownload.append(pathwayName) pathways = dict() # get pathways from disk for pathwayName in pathwaysOnDisk: fileName = 'organism/' + organismAbbreviation + '/pathway/' + pathwayName fileHandle = File.getFileHandleRead(fileName) pathway = KGML_parser.read(fileHandle) pathways[pathwayName] = pathway # download pathways in bulk if len( pathwaysToDownload ) > 0: tqdmPosition = Parallelism.getTqdmPosition() threadPool = concurrent.futures.ThreadPoolExecutor(Parallelism.getNumberOfThreadsDownload()) futures = [] iterator = None try: # query KEGG in parallel for pathwayToDownload in pathwaysToDownload: futures.append( threadPool.submit(_downloadPathway, pathwayToDownload, organismAbbreviation) ) iterator = concurrent.futures.as_completed(futures) if settings.verbosity >= 1: if settings.verbosity >= 2: print( 'Downloading ' + str(len(pathwaysToDownload)) + ' pathways...' ) iterator = tqdm.tqdm(iterator, total = len(pathwaysToDownload), unit = ' pathways', position = tqdmPosition) for future in iterator: result_part = future.result() if result_part is not None: pathway = KGML_parser.read(result_part) pathwayName = re.sub('[^0-9]', '', pathway.name) pathways[pathwayName] = pathway fileName = 'organism/' + organismAbbreviation + '/pathway/' + pathwayName File.writeToFile(result_part, fileName) threadPool.shutdown(wait = False) except KeyboardInterrupt: # only raised in main thread (once in each process!) Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, terminateProcess=True) raise except BaseException: if Parallelism.isMainThread(): Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, silent=True) raise finally: if iterator is not None: iterator.close() return pathways
def _downloadPathway(pathwayName, organismAbbreviation): if Parallelism.getShallCancelThreads() is True: raise concurrent.futures.CancelledError() else: try: # certain pathways might not exist as KGML (HTTP error 404), ignore these and return None pathwayXml = Download.downloadPathway(organismAbbreviation, pathwayName) except urllib.error.HTTPError as exception: if isinstance(exception, urllib.error.HTTPError) and exception.code == 404: return None else: raise return pathwayXml
[docs]def getGene(geneIdString: 'eco:b0004') -> Gene: """ Get certain gene. Downloads the data from KEGG, if not already present on disk. Parameters ---------- geneIdString : str Unique ID of the gene to be downloaded, represented as a string, including organism abbreviation and gene name, e.g. 'eco:b0004'. Returns ------- Gene Gene object. Raises ------ HTTPError If gene does not exist. URLError If connection to KEGG fails. """ organismAbbreviation, geneString = geneIdString.split(':') fileName = 'organism/' + organismAbbreviation + '/gene/' + geneString debugOutput = 'Getting gene ' + geneIdString + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') fileContent = File.readStringFromFileAtOnce(fileName) gene = Gene(fileContent) else: if settings.verbosity >= 3: print(debugOutput + 'download.') try: geneText = Download.downloadGene(geneIdString) except urllib.error.HTTPError as exception: if exception.code == 404: raise GeneDoesNotExistError else: raise File.writeToFile(geneText, fileName) gene = Gene(geneText) return gene
[docs]def getGeneBulk(geneIDs: Iterable[GeneID]) -> Dict[GeneID, Gene]: """ Get multiple certain genes. Downloads the data from KEGG in bulk, if not already present on disk. This is done in parallel in a thread pool, see :attr:`FEV_KEGG.settings.downloadThreads`. Parameters ---------- geneIDs : Iterable[GeneID] Unique IDs of the genes to be downloaded, represented as :class:`FEV_KEGG.Graph.Elements.GeneID` objects. Returns ------- Dict[GeneID, Gene] Each found Gene object, keyed by the GeneID used to search it. Raises ------ IOError If result is too small. Possibly because none of the genes of a download-chunk existed. URLError If connection to KEGG fails. """ # split list into genes on disk and genes not downloaded yet genesOnDisk = [] genesToDownload = [] for geneID in geneIDs: organismAbbreviation = geneID.organismAbbreviation geneString = geneID.geneName fileName = 'organism/' + organismAbbreviation + '/gene/' + geneString debugOutput = 'Getting gene ' + str( geneID ) + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') genesOnDisk.append(geneID) else: if settings.verbosity >= 3: print(debugOutput + 'download.') genesToDownload.append(geneID) # get genes from disk geneEntries = dict() for geneID in genesOnDisk: organismAbbreviation = geneID.organismAbbreviation geneString = geneID.geneName fileName = 'organism/' + organismAbbreviation + '/gene/' + geneString fileContent = File.readStringFromFileAtOnce(fileName) gene = Gene(fileContent) geneEntries[geneID] = gene # download genes in bulk if len( genesToDownload ) > 0: geneTextBulk = Download.downloadGeneBulk([x.__str__() for x in genesToDownload]) geneTexts = re.split('///\n', geneTextBulk)[:-1] for geneText in geneTexts: geneText += '///' gene = Gene(geneText) organismAbbreviation = gene.organismAbbreviation geneString = gene.number geneEntries[GeneID(organismAbbreviation + ':' + geneString)] = gene fileName = 'organism/' + organismAbbreviation + '/gene/' + geneString File.writeToFile(geneText, fileName) return geneEntries
[docs]def getPathwayGeneIDs(organismAbbreviation: 'eco', pathwayName: '00260') -> Set[str]: """ Get all gene ID strings in an organism's pathway, if previously saved. Parameters ---------- organismAbbreviation : str The organism for which to retrieve the pathway. pathwayName : str The code of the pathway, e.g. '00260'. Returns ------- Set[str] Gene ID strings from a pathway, or *None*, if not previously saved on disk. Note ---- This requires you to previously call :func:`setPathwayGeneIDs`! """ fileName = 'organism/' + organismAbbreviation + '/pathway/' + pathwayName + '_geneID_list' debugOutput = 'Getting gene ID list for pathway ' + organismAbbreviation + pathwayName + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') return File.readSetFromFileAtOnce(fileName) else: if settings.verbosity >= 3: print(debugOutput + 'calculation.') return None
[docs]def setPathwayGeneIDs(organismAbbreviation: 'eco', pathwayName: '00260', geneIDs: Set[str]): """ Save all gene ID strings in an organism's pathway. Parameters ---------- organismAbbreviation : str The organism for which to retrieve the pathway. pathwayName : str The code of the pathway, e.g. '00260'. geneIDs : Set[str] Gene ID strings of the specified organism-specific pathway. """ fileName = 'organism/' + organismAbbreviation + '/pathway/' + pathwayName + '_geneID_list' geneIDListString = '\n'.join(geneIDs) File.writeToFile(geneIDListString, fileName)
[docs]def getOrganismList() -> List[str]: """ Get list of all organisms known to KEGG. Returns ------- List[str] All organism descriptions known to KEGG. Raises ------ URLError If connection to KEGG fails. Returns the list of all known organisms from KEGG. """ fileName = 'organism_list' debugOutput = 'Getting organism list from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') organismList = File.readListFromFileAtOnce(fileName) else: if settings.verbosity >= 3: print(debugOutput + 'download.') organismList = Download.downloadOrganismList() File.writeToFile(organismList, fileName) organismList = organismList.splitlines() return organismList
[docs]def getEnzymeEcNumbers(enzymeAbbreviation: 'MiaB') -> List[str]: """ Get EC numbers of an enzyme for the enzyme's abbreviation. Also works for everything else in the description of an enzyme, not just the abbreviation. Parameters ---------- enzymeAbbreviation : str Part of the enzymes description string. Returns ------- List[str] or None All EC numbers, as strings, for a given enzyme, identified by its abbreviation, from KEGG. Or *None* if no EC numbers could be found. Raises ------ URLError If connection to KEGG fails. """ fileName = 'enzymes/' + enzymeAbbreviation debugOutput = 'Getting enzyme EC number list for' + enzymeAbbreviation + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') ecNumbers = File.readListFromFileAtOnce(fileName) else: if settings.verbosity >= 3: print(debugOutput + 'download.') ecNumbers = Download.downloadEnzymeEcNumbers(enzymeAbbreviation) File.writeToFile(ecNumbers, fileName) ecNumbers = ecNumbers.splitlines() if len(ecNumbers) == 0 or (len(ecNumbers) == 1 and len(ecNumbers[0]) <= 2): return None return ecNumbers
[docs]def doesOrganismExist(organismAbbreviation: 'eco') -> bool: """ Check whether an organism exists. Parameters ---------- organismAbbreviation : str The abbreviation of the organism to check. Returns ------- bool *True*, if something was downloaded, and thus the organism exists. *False*, if the download was empty (400 Bad Request), because this organism does not exist. Raises ------ URLError If connection to KEGG fails. """ try: info = getOrganismInfo(organismAbbreviation, checkExpiration = False) except ValueError: info = None return True if info is not None else False
def _doesOrganismExistTuple(organismAbbreviation: 'eco') -> Tuple[str, bool]: if Parallelism.getShallCancelThreads() is True: raise concurrent.futures.CancelledError() else: return (organismAbbreviation, doesOrganismExist(organismAbbreviation))
[docs]def doesOrganismExistBulk(organismAbbreviations: List[str]) -> List[str]: """ Check whether multiple organisms exist. This is done in parallel in a thread pool, see :attr:`FEV_KEGG.settings.downloadThreads`. Parameters ---------- organismAbbreviations : List[str] The abbreviations of the organisms to check. Returns ------- List[str] List of organism abbreviations, taken from `organismAbbreviations` for which :func:`doesOrganismExist` would return *True*. Raises ------ URLError If connection to KEGG fails. """ tqdmPosition = Parallelism.getTqdmPosition() threadPool = concurrent.futures.ThreadPoolExecutor(Parallelism.getNumberOfThreadsDownload()) futures = [] iterator = None try: for organismAbbreviation in organismAbbreviations: futures.append( threadPool.submit(_doesOrganismExistTuple, organismAbbreviation) ) iterator = concurrent.futures.as_completed(futures) if settings.verbosity >= 1: if settings.verbosity >= 2: print( 'Checking existance of ' + str(len(organismAbbreviations)) + ' organisms...' ) iterator = tqdm.tqdm(iterator, total = len(organismAbbreviations), unit = ' organisms', position = tqdmPosition) existingOrganisms = [] for future in iterator: doesExistTuple = future.result() organismAbbreviation, doesExist = doesExistTuple if doesExist is True: existingOrganisms.append(organismAbbreviation) threadPool.shutdown(wait = False) return existingOrganisms except KeyboardInterrupt: # only raised in main thread (once in each process!) Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, terminateProcess=True) raise except BaseException: if Parallelism.isMainThread(): Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, silent=True) raise finally: if iterator is not None: iterator.close()
[docs]def getOrganismInfo(organismAbbreviation: 'eco', checkExpiration = False) -> str: """ Get organism info. Parameters ---------- organismAbbreviation : str The abbreviation of the organism. checkExpiration : bool, optional If *True*, check whether the last download of the organism info is older than :attr:`FEV_KEGG.settings.organismInfoExpiration`. If yes, download it again. This can be useful when relying upon a current database size for calculating E-values for a :class:`FEV_KEGG.KEGG.SSDB.Match`. Returns ------- str Raw organism info. Raises ------ ValueError If organism with `organismAbbreviation` does not exist. URLError If connection to KEGG fails. """ fileName = 'organism/' + organismAbbreviation + '/' + 'info' debugOutput = 'Getting organism info for ' + organismAbbreviation + ' from ' shallDownload = False if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') organismInfo = File.readStringFromFileAtOnce(fileName) # remove timestamp splitFirstLine = organismInfo.split(sep='\n', maxsplit=1) firstLine = splitFirstLine[0] organismInfo = splitFirstLine[1] if checkExpiration: # check if timestamp is too old currentTimestamp = int(time.time()) lastTimestamp = int(firstLine) if abs(currentTimestamp - lastTimestamp) > settings.organismInfoExpiration: # organism info expired shallDownload = True if settings.verbosity >= 2: print('Organism info expired. Getting from download again.') else: if settings.verbosity >= 3: print(debugOutput + 'download.') shallDownload = True if shallDownload: # download and save organism info organismInfo = Download.downloadOrganismInfo(organismAbbreviation) if organismInfo is None: raise ValueError('Organism with abbreviation ' + organismAbbreviation + ' does not exist!') # add timestamp currentTimestamp = int(time.time()) organismInfoWriting = str(currentTimestamp) + '\n' + organismInfo File.writeToFile(organismInfoWriting, fileName) return organismInfo
[docs]def getOrthologsOnlyGeneID(geneID: GeneID, comparisonOrganism: 'Organism or str', eValue: float = settings.defaultEvalue) -> Set[GeneID]: """ Get orthologs for a gene in a certain organism, without metadata. Parameters ---------- geneID : GeneID Gene to use for searching orthologs. comparisonOrganism : Organism or str Organism to check for orthologs. May be an Organism object or an organism abbreviation string. eValue : float, optional Statistical expectation value (E-value), below which a sequence alignment is considered significant. Returns ------- Set[GeneID] Set of orthologous genes, using `geneID` to search the genome of `comparisonOrganism`. Only matches with an E-value smaller or equal to `eValue` are returned. Matches are downloaded from KEGG SSDB. Raises ------ ImpossiblyOrthologousError If `geneID` is from `comparisonOrganism`. ValueError If any organism does not exist. URLError If connection to KEGG fails. """ if isinstance(comparisonOrganism, str): organismAbbreviation = comparisonOrganism else: organismAbbreviation = comparisonOrganism.nameAbbreviation return _filterHomologsBySignificance( _getHomologs(geneID, organismAbbreviation), eValue, onlyGeneID = True)
[docs]def getOrthologs(geneID: GeneID, comparisonOrganism: 'Organism or str', eValue: float = settings.defaultEvalue) -> SSDB.Matching: """ Get orthologs for a gene in a certain organism, including metadata. Parameters ---------- geneID : GeneID Gene to use for searching orthologs. comparisonOrganism : Organism or str Organism to check for orthologs. May be an Organism object or an organism abbreviation string. eValue : float, optional Statistical expectation value (E-value), below which a sequence alignment is considered significant. Returns ------- SSDB.Matching A matching of orthologs for gene `geneID`, searching the genome of `comparisonOrganism`. Only matches with an E-value smaller or equal to `eValue` are returned. Matches are downloaded from KEGG SSDB. Raises ------ ImpossiblyOrthologousError If `geneID` is from `comparisonOrganism`. ValueError If any organism does not exist. URLError If connection to KEGG fails. """ if isinstance(comparisonOrganism, str): organismAbbreviation = comparisonOrganism else: organismAbbreviation = comparisonOrganism.nameAbbreviation return _filterHomologsBySignificance( _getHomologs(geneID, organismAbbreviation), eValue, onlyGeneID = False)
[docs]def getParalogsOnlyGeneID(geneID: GeneID, eValue: float = settings.defaultEvalue) -> Set[GeneID]: """ Get paralogs for a gene, without metadata. Parameters ---------- geneID : GeneID Gene to use for searching paralogs. eValue : float, optional Statistical expectation value (E-value), below which a sequence alignment is considered significant. Returns ------- Set[GeneID] Set of paralogous genes, using `geneID` to search the genome of the same organism. Only matches with an E-value smaller or equal to `eValue` are returned. Matches are downloaded from KEGG SSDB. Raises ------ ValueError If any organism does not exist. URLError If connection to KEGG fails. """ return _filterHomologsBySignificance( _getHomologs(geneID, comparisonOrganismString = None), eValue, onlyGeneID = True)
[docs]def getParalogs(geneID: GeneID, eValue: float = settings.defaultEvalue) -> SSDB.Matching: """ Get paralogs for a gene, including metadata. Parameters ---------- geneID : GeneID Gene to use for searching paralogs. eValue : float, optional Statistical expectation value (E-value), below which a sequence alignment is considered significant. Returns ------- SSDB.Matching A matching of paralogous genes, using `geneID` to search the genome of the same organism. Only matches with an E-value smaller or equal to `eValue` are returned. Matches are downloaded from KEGG SSDB. Raises ------ ValueError If any organism does not exist. URLError If connection to KEGG fails. """ return _filterHomologsBySignificance( _getHomologs(geneID, comparisonOrganismString = None), eValue, onlyGeneID = False)
def _getHomologs(geneID: GeneID, comparisonOrganismString = None) -> SSDB.Matching: if comparisonOrganismString is None: # looking for paralogs fileName = 'organism/' + geneID.organismAbbreviation + '/paralogs/' + geneID.geneName debugOutput = 'Getting paralogs for ' + geneID.geneIDString + ' from ' else: # looking for orthologs if geneID.organismAbbreviation == comparisonOrganismString: raise ImpossiblyOrthologousError('GeneID is from the same Organism I ought to search in. This can never be an ortholog!') fileName = 'organism/' + geneID.organismAbbreviation + '/orthologs/' + geneID.geneName + '/' + comparisonOrganismString debugOutput = 'Getting orthologs for ' + geneID.geneIDString + ' in ' + comparisonOrganismString + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') else: if settings.verbosity >= 3: print(debugOutput + 'download.') # looking for paralog or ortholog? if comparisonOrganismString is None: # looking for paralogs preMatches = Download.downloadParalogs(geneID) databaseOrganism = geneID.organismAbbreviation else: # looking for orthologs preMatches = Download.downloadOrthologs(geneID, comparisonOrganismString) databaseOrganism = comparisonOrganismString # get length of query sequence queryGene = getGene(geneID.geneIDString) searchSequenceLength = queryGene.aaseqLength # add size of database organismInfo = getOrganismInfo(databaseOrganism, checkExpiration = True) organismGeneEntries = int( re.split('([0-9,]+) entries', organismInfo)[1].replace(',', '') ) # add lengths of result sequences matches = [] for preMatch in preMatches: # length matchedGene = getGene(preMatch.foundGeneIdString) sequenceLength = matchedGene.aaseqLength matches.append( SSDB.Match.fromPreMatch(preMatch, sequenceLength)) timestamp = int( time.time() ) # create Matching matching = SSDB.Matching(geneID, searchSequenceLength, databaseOrganism, organismGeneEntries, matches, timestamp) # save to file jsonpickle.set_encoder_options('simplejson', indent=4) File.writeToFile(jsonpickle.encode(matching), fileName) fileContent = File.readStringFromFileAtOnce(fileName) matching = jsonpickle.decode(fileContent) return matching def _filterHomologsBySignificance(matching: SSDB.Matching, eValue, onlyGeneID = False): return _filterHomologsBySignificanceBulk({'0': matching}, eValue, onlyGeneID).get('0')
[docs]def getOrthologsBulk(geneIDs: Iterable[GeneID], comparisonOrganism: 'Iterable[Organism] or Iterable[str] or Organism or str', eValue: float = settings.defaultEvalue, ignoreImpossiblyOrthologous = False) -> Dict[GeneID, List[SSDB.Matching]]: """ Get orthologs for genes in a certain organism in bulk, including metadata. This is done in parallel in a thread pool, see :attr:`FEV_KEGG.settings.downloadThreads`. Parameters ---------- geneIDs : Iterable[GeneID] Genes to use for searching orthologs. comparisonOrganism : Iterable[Organism] or Iterable[str] or Organism or str Organism(s) to check for orthologs. May be an organism abbreviation string. eValue : float, optional Statistical expectation value (E-value), below which a sequence alignment is considered significant. ignoreImpossiblyOrthologous : bool, optional If *True*, ignore if a searched gene is from any `comparisonOrganism`. Simply do not search for this particular gene in its own organism, but in all others from `comparisonOrganism`. Returns ------- Dict[GeneID, List[SSDB.Matching]] A dictionary of a list of matchings of orthologous genes, using each gene ID from `geneIDs`, searching the genome of each `comparisonOrganism`, keyed by the used gene ID. Only matches with an E-value smaller or equal to `eValue` are returned. Matches are downloaded from KEGG SSDB. Raises ------ ImpossiblyOrthologousError If any gene ID in `geneIDs` is from `comparisonOrganism`. Unless `ignoreImpossiblyOrthologous` == *True*. ValueError If any organism does not exist. URLError If connection to KEGG fails. """ if isinstance(comparisonOrganism, str): organismAbbreviation = [comparisonOrganism] elif isinstance(comparisonOrganism, Iterable): organismAbbreviation = [x if isinstance(x, str) else x.nameAbbreviation for x in comparisonOrganism] else: organismAbbreviation = [comparisonOrganism.nameAbbreviation] # improve performance by first checking IF there is an ortholog in some organisms organismsWithOrthologsDict = hasOrthologsBulk(geneIDs, organismAbbreviation, eValue) # sort several geneIDs with the same set of orthologous organisms together. Improves parallel downloading speed. geneIdSetForOrganismSet = dict() for geneID, orthologousAbbreviations in organismsWithOrthologsDict.items(): organismSet = frozenset(orthologousAbbreviations) currentGeneIdSet = geneIdSetForOrganismSet.get(organismSet, None) if currentGeneIdSet is None: currentGeneIdSet = set() geneIdSetForOrganismSet[organismSet] = currentGeneIdSet currentGeneIdSet.add(geneID) # parallelise with process pool futures = [] futuresGenerator = None resultFutures = None dictList = [] try: # submit work to process pool for orthologousAbbreviations, geneIDs in geneIdSetForOrganismSet.items(): if Parallelism.processPool is None: raise TypeError("Process pool does not exist. Did you forget to FEV_KEGG.startProcessPool()?") futures.append( Parallelism.processPool.submit( _getOrthologsBulkWorker, geneIDs, orthologousAbbreviations, ignoreImpossiblyOrthologous) ) futuresGenerator = concurrent.futures.as_completed( futures ) # add progress bar if settings.verbosity >= 1: if settings.verbosity >= 2: print( 'Fetching matchings from ' + str(len(geneIdSetForOrganismSet)) + ' GeneID-Organism associations ...' ) futuresGenerator = tqdm.tqdm(futuresGenerator, total = len(geneIdSetForOrganismSet), unit = ' matching dicts', position = 0) # when any work item in process pool finishes for future in futuresGenerator: try: orthologousMatchingsDictSingle = future.result() except KeyboardInterrupt: raise except concurrent.futures.CancelledError: Parallelism.printBelowProgress( "Future cancelled. Continuing anyway..." ) continue except concurrent.futures.TimeoutError: Parallelism.printBelowProgress( "Future timed out. Continuing anyway..." ) continue except Exception: # any non-exiting error Parallelism.printBelowProgress( "Future raised error, see stack trace above. Halting by KeyboardInterrupt..." ) raise KeyboardInterrupt() dictList.append(orthologousMatchingsDictSingle) except KeyboardInterrupt: # only raised in main thread (once in each process!) Parallelism.keyboardInterruptHandler(processPoolFutures=futures, terminateProcess=True) raise except BaseException: if Parallelism.isMainThread(): Parallelism.keyboardInterruptHandler(processPoolFutures=futures, silent=True) raise finally: if futuresGenerator is not None: futuresGenerator.close() if resultFutures is not None: resultFutures.close() Parallelism.printBelowProgress(None) # merge orthologousMatchingsDicts after parallelisation orthologousMatchingsDict = dict() for orgsDict in dictList: orthologousMatchingsDict.update( orgsDict ) # filter matchings by statistical significance result = dict() for geneID, matchingList in orthologousMatchingsDict.items(): for matching in matchingList: tmpDict = dict() tmpDict[geneID] = matching filteredMatching = _filterHomologsBySignificanceBulk(tmpDict, eValue, onlyGeneID = False) if len(filteredMatching[geneID].matches) > 0: # some match survived the E-value filter resultMatchingList = result.get(geneID) if resultMatchingList is None: result[geneID] = [] result[geneID].append(filteredMatching[geneID]) return result
def _getOrthologsBulkWorker(geneIDs, orthologousAbbreviations, ignoreImpossiblyOrthologous): # get orthologs in bulk orthologousMatchingsDict = dict() singleOrthologousMatchingDict = _getHomologsBulk(geneIDs, orthologousAbbreviations, ignoreImpossiblyOrthologous = ignoreImpossiblyOrthologous) for geneID in geneIDs: orthologousMatchingsDict[geneID] = singleOrthologousMatchingDict[geneID] return orthologousMatchingsDict
[docs]def getParalogsBulk(geneIDs: Iterable[GeneID], eValue: float = settings.defaultEvalue) -> Dict[GeneID, SSDB.Matching]: """ Get paralogs for genes in bulk, including metadata. This is done in parallel in a thread pool, see :attr:`FEV_KEGG.settings.downloadThreads`. Parameters ---------- geneIDs : Iterable[GeneID] Genes to use for searching paralogs. eValue : float, optional Statistical expectation value (E-value), below which a sequence alignment is considered significant. Returns ------- Dict[GeneID, SSDB.Matching] A dictionary of matchings of paralogous genes, using each gene ID from `geneIDs` to search the genome of the same organism, keyed by the used gene ID. Only matches with an E-value smaller or equal to `eValue` are returned. Matches are downloaded from KEGG SSDB. Raises ------ ValueError If any organism does not exist. URLError If connection to KEGG fails. """ return _filterHomologsBySignificanceBulk( _getHomologsBulk(geneIDs, comparisonOrganismString = None), eValue, onlyGeneID = False)
def _getHomologsBulk(geneIDs: Iterable[GeneID], comparisonOrganismString = None, ignoreImpossiblyOrthologous = False): # -> Dict[GeneID, List[SSDB.Matching]] if comparisonOrganismString is None: isParalog = True else: isParalog = False # turn a single comparison organism into a list if not isinstance(comparisonOrganismString, Iterable): comparisonOrganismString = [comparisonOrganismString] # split list into matchings on disk and matchings not downloaded yet matchingsOnDisk = [] matchingsToDownload = [] for geneID in geneIDs: for comparisonOrganism in comparisonOrganismString: if isParalog: # looking for paralogs fileName = 'organism/' + geneID.organismAbbreviation + '/paralogs/' + geneID.geneName debugOutput = 'Getting paralogs for ' + geneID.geneIDString + ' from ' else: # looking for orthologs if geneID.organismAbbreviation == comparisonOrganism: if ignoreImpossiblyOrthologous: continue else: raise ImpossiblyOrthologousError('GeneID is from the same Organism I ought to search in. This can never be an ortholog!') fileName = 'organism/' + geneID.organismAbbreviation + '/orthologs/' + geneID.geneName + '/' + comparisonOrganism debugOutput = 'Getting orthologs for ' + geneID.geneIDString + ' in ' + comparisonOrganism + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') matchingsOnDisk.append((geneID, comparisonOrganism)) else: if settings.verbosity >= 3: print(debugOutput + 'download.') matchingsToDownload.append((geneID, comparisonOrganism)) matchings = dict() tqdmPosition = Parallelism.getTqdmPosition() # download matchings in bulk if len( matchingsToDownload ) > 0: threadPool = concurrent.futures.ThreadPoolExecutor(Parallelism.getNumberOfThreadsDownload(isSSDB = True)) futures = [] iterator = None try: # query KEGG SSDB in parallel for geneID, comparisonOrganism in matchingsToDownload: futures.append( threadPool.submit(_getHomologsBulkHelper, geneID, comparisonOrganism) ) iterator = concurrent.futures.as_completed(futures) if settings.verbosity >= 1: if settings.verbosity >= 2: print( 'Downloading ' + str(len(matchingsToDownload)) + ' matchings...' ) iterator = tqdm.tqdm(iterator, total = len(matchingsToDownload), unit = ' matchings', position = tqdmPosition) for future in iterator: try: matching = future.result() except URLError as error: if isinstance(error.reason, timeout): print('Download timed out eventually, maybe you need to increase the timeout limit under settings.downloadTimeout') raise if matching is None: continue if isParalog: matchings[matching.queryGeneID] = matching else: matchingList = matchings.get(matching.queryGeneID) if matchingList is None: matchings[matching.queryGeneID] = [] matchings[matching.queryGeneID].append(matching) threadPool.shutdown(wait = False) except KeyboardInterrupt: # only raised in main thread (once in each process!) Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, terminateProcess=True) raise except BaseException: if Parallelism.isMainThread(): Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, silent=True) raise finally: if iterator is not None: iterator.close() # get matchings from disk if len( matchingsOnDisk ) > 0: iterator = matchingsOnDisk if settings.verbosity >= 1: if settings.verbosity >= 2: print( 'Disk-loading ' + str(len(matchingsOnDisk)) + ' matchings...' ) iterator = tqdm.tqdm(iterator, total = len(matchingsOnDisk), unit = ' matchings', position = tqdmPosition) for geneID, comparisonOrganism in iterator: if isParalog: # looking for paralogs fileName = 'organism/' + geneID.organismAbbreviation + '/paralogs/' + geneID.geneName else: # looking for orthologs fileName = 'organism/' + geneID.organismAbbreviation + '/orthologs/' + geneID.geneName + '/' + comparisonOrganism fileContent = File.readStringFromFileAtOnce(fileName) matching = jsonpickle.decode(fileContent, classes=SSDB.Matching) if isParalog: # looking for paralogs matchings[geneID] = matching else: # looking for orthologs matchingList = matchings.get(geneID) if matchingList is None: matchings[geneID] = [] matchings[geneID].append(matching) return matchings def _getHomologsBulkHelper(geneID, comparisonOrganismString) -> SSDB.Matching: if Parallelism.getShallCancelThreads() is True: raise concurrent.futures.CancelledError() homologData = _downloadHomolog(geneID, comparisonOrganismString) if homologData is None: return None searchSequenceLength, preMatches = homologData if comparisonOrganismString is None: # looking for paralogs fileName = 'organism/' + geneID.organismAbbreviation + '/paralogs/' + geneID.geneName databaseOrganism = geneID.organismAbbreviation else: # looking for orthologs fileName = 'organism/' + geneID.organismAbbreviation + '/orthologs/' + geneID.geneName + '/' + comparisonOrganismString databaseOrganism = comparisonOrganismString if Parallelism.getShallCancelThreads() is True: raise concurrent.futures.CancelledError() # add size of database organismInfo = getOrganismInfo(databaseOrganism, checkExpiration = True) organismGeneEntries = _extractGeneEntries(organismInfo) # add lengths of result sequences matches = [] for preMatch in preMatches: if Parallelism.getShallCancelThreads() is True: raise concurrent.futures.CancelledError() # length try: matchedGene = getGene(preMatch.foundGeneIdString) sequenceLength = matchedGene.aaseqLength matches.append( SSDB.Match.fromPreMatch(preMatch, sequenceLength)) except GeneDoesNotExistError: print( 'Gene does not exist: ' + str(preMatch.foundGeneIdString) + ', ignoring pre-match. Maybe you want to check whether the organism is erroneous?' ) continue # if pre-match gene does not exist, ignore pre-match except Exception: print( 'Unknown exception during creation of an SSDB match for ' + str(geneID) + ', ignoring this match.' ) continue timestamp = int( time.time() ) # create Matching matching = SSDB.Matching(geneID, searchSequenceLength, databaseOrganism, organismGeneEntries, matches, timestamp) # save to file jsonpickle.set_encoder_options('simplejson', indent=4) File.writeToFile(jsonpickle.encode(matching), fileName) return matching def _extractGeneEntries(organismInfo): return int( re.split('([0-9,]+) entries', organismInfo)[1].replace(',', '') ) def _downloadHomolog(geneID, comparisonOrganismString) -> Tuple[int, List[SSDB.PreMatch]]: try: if comparisonOrganismString is None: #paralog return Download.downloadParalogs(geneID) else: #ortholog return Download.downloadOrthologs(geneID, comparisonOrganismString) except urllib.error.HTTPError as exception: if isinstance(exception, urllib.error.HTTPError) and exception.code == 404: return None else: raise
[docs]def _filterHomologsBySignificanceBulk(matchings: Dict[GeneID, SSDB.Matching], eValue, onlyGeneID = False): """ Filter sequence alignments by statistical significance. Parameters ---------- matchings : Dict[GeneID, SSDB.Matching] Dictionary of a homolog matching, including homologous gene IDs and statistical data, keyed by the gene ID used to search for homologs. eValue : float Statistical expectation value (E-value), below which a sequence alignment is considered significant. onlyGeneID : bool, optional If *True*, return only the set of homologous gene IDs, not the whole matching including statistical data. Returns ------- Dict[GeneID, SSDB.Matching] or Dict[GeneID, Set[GeneID]] `matchings` reduced to the significant sequence alignments, with an E-value below `eValue`. If `onlyGeneID` == *True*, `matchings` is further reduced to only contain the homologous gene IDs, not the complete matching. """ result = dict() for geneID, matching in matchings.items(): if onlyGeneID is True: # filter non-significant genes geneIDs = set() for match in matching.matches: if match.eValue <= eValue: geneIDs.add( match.foundGeneID ) resultPart = geneIDs else: # filter non-significant genes validMatches = [] for match in matching.matches: if match.eValue <= eValue: validMatches.append(match) matching.matches = validMatches resultPart = matching result[geneID] = resultPart return result
[docs]def hasOrthologsBulk(geneIDs: Iterable[GeneID], comparisonOrganisms: 'Iterable[Organism] or Iterable[str]', eValue: float = settings.defaultEvalue) -> Dict[GeneID, List[str]]: """ Find out whether orthologs for genes exist in certain organisms in bulk. This is done in parallel in a thread pool, see :attr:`FEV_KEGG.settings.downloadThreads`. If orthologs exist in a certain organism, you can use :func:`getOrthologsBulk` in a seconds step, to get all orthologs in that organism. Filtering the amount of possibly orthologous organisms with this function before using the aforementioned function is much faster in total. But, using this function with only a single `comparisonOrganisms` is not. If you want to find only the best matches in every organism, use :func:`getOrthologOverviewsBulk` instead. Parameters ---------- geneIDs : Iterable[GeneID] Genes to use for searching orthologs. comparisonOrganisms : Iterable[Organism] or Iterable[str] Organisms to check for orthologs. May be an organism abbreviation string. eValue : float, optional Statistical expectation value (E-value), below which a sequence alignment is considered significant. Returns ------- Dict[GeneID, List[str]] A dictionary of a list of organisms which have at least one orthologous gene, using each gene ID from `geneIDs`, searching the genome of each `comparisonOrganisms`, keyed by the used gene ID. Only organisms with matches with an E-value smaller or equal to `eValue` are returned. Matching overviews are downloaded from KEGG SSDB. Raises ------ ValueError If any organism does not exist. URLError If connection to KEGG fails. """ if isinstance(comparisonOrganisms, Iterable) and not isinstance(comparisonOrganisms, str): organismAbbreviations = [x if isinstance(x, str) else x.nameAbbreviation for x in comparisonOrganisms] else: raise ValueError("'comparisonOrganism' must be an Iterable.") # parallelise with process pool futures = [] futuresGenerator = None resultFutures = None dictList = [] try: # split geneIDs into chunks geneIdChunks = chunks(geneIDs, math.ceil(len(geneIDs)/settings.processes)) # submit work to process pool for geneIdChunk in geneIdChunks: if Parallelism.processPool is None: raise TypeError("Process pool does not exist. Did you forget to FEV_KEGG.startProcessPool()?") futures.append( Parallelism.processPool.submit( _hasOrthologsBulkWorker, geneIdChunk, organismAbbreviations, eValue) ) futuresGenerator = concurrent.futures.as_completed( futures ) # add progress bar # if settings.verbosity >= 1: # if settings.verbosity >= 2: # print( 'Fetching '' GeneID chunks from ' + str(len(organisms)) + ' organisms...' ) # futuresGenerator = tqdm.tqdm(futuresGenerator, total = settings.processes, unit = ' chunks', position = 0) # when any work item in process pool finishes for future in futuresGenerator: try: orgsDict = future.result() except KeyboardInterrupt: raise except concurrent.futures.CancelledError: Parallelism.printBelowProgress( "Future cancelled. Continuing anyway..." ) continue except concurrent.futures.TimeoutError: Parallelism.printBelowProgress( "Future timed out. Continuing anyway..." ) continue except Exception: # any non-exiting error Parallelism.printBelowProgress( "Future raised error, see stack trace above. Halting by KeyboardInterrupt..." ) raise KeyboardInterrupt() dictList.append(orgsDict) except KeyboardInterrupt: # only raised in main thread (once in each process!) Parallelism.keyboardInterruptHandler(processPoolFutures=futures, terminateProcess=True) raise except BaseException: if Parallelism.isMainThread(): Parallelism.keyboardInterruptHandler(processPoolFutures=futures, silent=True) raise finally: if futuresGenerator is not None: futuresGenerator.close() if resultFutures is not None: resultFutures.close() Parallelism.printBelowProgress(None) # merge result dict after parallelisation result = dict() for orgsDict in dictList: result.update( orgsDict ) return result
def _hasOrthologsBulkWorker(geneIDs: Iterable[GeneID], organismAbbreviations: Iterable[str], eValue: float = settings.defaultEvalue) -> Dict[GeneID, List[str]]: matchingOverviewsDict = getOrthologOverviewsBulk(geneIDs) result = dict() for geneID, matchingOverview in matchingOverviewsDict.items(): # filter matches by relevant organisms relevantMatches = matchingOverview.getTransientMatches(organismAbbreviations) # filter matches by statistical significance significantMatches = [] for match in relevantMatches: if match.eValue <= eValue: significantMatches.append(match) if len(significantMatches) > 0: # some match survived bothfilterings result[geneID] = [x.foundGeneID.organismAbbreviation for x in significantMatches] return result
[docs]def getOrthologOverviewsBulk(geneIDs: Iterable[GeneID]) -> Dict[GeneID, SSDB.MatchingOverview]: """ Get best orthologous matches for genes in all organisms in bulk. This is done in parallel in a thread pool, see :attr:`FEV_KEGG.settings.downloadThreads`. If orthologs exist in a certain organism, you can use :func:`getOrthologsBulk` in a seconds step, to get all orthologs in that organism, not only the best match. Filtering the amount of possibly orthologous organisms with this function before using the aforementioned function is much faster in total. But, using this function with only a single organism in mind is not. Parameters ---------- geneIDs : Iterable[GeneID] Genes to use for searching orthologs. Returns ------- Dict[GeneID, MatchingOverview] A dictionary of a matching overview, containing the best :class:`FEV_KEGG.KEGG.SSDB.Match` for each possibly orthologous organism, using each gene ID from `geneIDs`, searching the genome of all organisms, keyed by the used gene ID. Matching overviews are downloaded from KEGG SSDB. Raises ------ ValueError If any organism does not exist. URLError If connection to KEGG fails. """ # split list into overviews on disk and overviews not downloaded yet overviewsOnDisk = [] overviewsToDownload = [] for geneID in geneIDs: fileName = 'organism/' + geneID.organismAbbreviation + '/orthologs/' + geneID.geneName + '/overview' debugOutput = 'Getting ortholog overview for ' + geneID.geneIDString + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') overviewsOnDisk.append(geneID) else: if settings.verbosity >= 3: print(debugOutput + 'download.') overviewsToDownload.append(geneID) overviews = dict() # download overviews in bulk if len( overviewsToDownload ) > 0: tqdmPosition = Parallelism.getTqdmPosition() threadPool = concurrent.futures.ThreadPoolExecutor(Parallelism.getNumberOfThreadsDownload(isSSDB = True)) futures = [] iterator = None try: # query KEGG SSDB in parallel for geneID in overviewsToDownload: futures.append( threadPool.submit(_getOrthologOverviewsBulkHelper, geneID) ) iterator = concurrent.futures.as_completed(futures) if settings.verbosity >= 1: if settings.verbosity >= 2: print( 'Downloading ' + str(len(overviewsToDownload)) + ' overviews...' ) iterator = tqdm.tqdm(iterator, total = len(overviewsToDownload), unit = ' overviews', position = tqdmPosition) for future in iterator: overview = future.result() if overview is None: continue overviews[overview.queryGeneID] = overview threadPool.shutdown(wait = False) except KeyboardInterrupt: # only raised in main thread (once in each process!) Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, terminateProcess=True) raise except BaseException: if Parallelism.isMainThread(): Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, silent=True) raise finally: if iterator is not None: iterator.close() # get overviews from disk try: tqdmPosition = Parallelism.getTqdmPosition() iterator = overviewsOnDisk if settings.verbosity >= 1: if settings.verbosity >= 2: print( 'Disk-loading ' + str(len(overviewsOnDisk)) + ' overviews...' ) iterator = tqdm.tqdm(iterator, total = len(overviewsOnDisk), unit = ' overviews', position = tqdmPosition) for geneID in iterator: fileName = 'organism/' + geneID.organismAbbreviation + '/orthologs/' + geneID.geneName + '/overview' fileContent = File.readStringFromFileAtOnce(fileName) overview = jsonpickle.decode(fileContent, classes=SSDB.MatchingOverview) overviews[geneID] = overview finally: if not isinstance(iterator, list): iterator.close() return overviews
def _getOrthologOverviewsBulkHelper(geneID: GeneID) -> SSDB.MatchingOverview: if Parallelism.getShallCancelThreads() is True: raise concurrent.futures.CancelledError() downloadData = _downloadOrthologOverview(geneID) if downloadData is None: return None searchSequenceLength, bestMatches = downloadData fileName = 'organism/' + geneID.organismAbbreviation + '/orthologs/' + geneID.geneName + '/overview' if Parallelism.getShallCancelThreads() is True: raise concurrent.futures.CancelledError() timestamp = int( time.time() ) # create Matching matchingOverview = SSDB.MatchingOverview(geneID, searchSequenceLength, bestMatches, timestamp) # save to file jsonpickle.set_encoder_options('simplejson', indent=4) File.writeToFile(jsonpickle.encode(matchingOverview), fileName) return matchingOverview def _getOrthologOverviewsBulkHelperDisk(geneID: GeneID) -> SSDB.MatchingOverview: fileName = 'organism/' + geneID.organismAbbreviation + '/orthologs/' + geneID.geneName + '/overview' fileContent = File.readStringFromFileAtOnce(fileName) overview = jsonpickle.decode(fileContent, classes=SSDB.MatchingOverview) return overview def _downloadOrthologOverview(geneID: GeneID) -> Tuple[int, List[SSDB.Match]]: try: return Download.downloadOrthologOverview(geneID) except urllib.error.HTTPError as exception: if isinstance(exception, urllib.error.HTTPError) and exception.code == 404: return None else: raise
[docs]def getTaxonomyNCBI() -> List[str]: """ Get NCBI taxonomy from KEGG BRITE. Returns ------- List[str] Taxonomy of organisms in KEGG, in special text format, following the NCBI scheme, line by line. Raises ------ URLError If connection to KEGG fails. """ fileName = 'taxonomy/NCBI_raw' debugOutput = 'Getting NCBI taxonomy from ' return _getTaxonomy(fileName, debugOutput, True)
[docs]def getTaxonomyKEGG() -> List[str]: """ Get KEGG taxonomy from KEGG BRITE. Returns ------- List[str] Taxonomy of organisms in KEGG, in special text format, following KEGG's own scheme, line by line. Raises ------ URLError If connection to KEGG fails. """ fileName = 'taxonomy/KEGG_raw' debugOutput = 'Getting KEGG taxonomy from ' return _getTaxonomy(fileName, debugOutput, False)
def _getTaxonomy(fileName, debugOutput, isNCBI) -> List[str]: if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') else: if settings.verbosity >= 3: print(debugOutput + 'download.') if isNCBI: organismList = Download.downloadTaxonomyNCBI() else: organismList = Download.downloadTaxonomyKEGG() File.writeToFile(organismList, fileName) fileContent = File.readListFromFileAtOnce(fileName) return fileContent
[docs]def getSubstanceBulk(substances: Iterable[SubstanceID]) -> Dict[str, Substance]: """ Get multiple substance descriptions. Downloads the data from KEGG in bulk, if not already present on disk. This is done in parallel in a thread pool, see :attr:`FEV_KEGG.settings.downloadThreads`. Parameters ---------- substances : Iterable[SubstanceID] Substances to be downloaded. Returns ------- Dict[str, Substance] Each found substance, keyed by the unique ID of the substance used to search it. Raises ------ IOError If result is too small. Possibly because none of the genes of a download-chunk existed. URLError If connection to KEGG fails. """ # split list into substances on disk and substances not downloaded yet substancesOnDisk = [] substancesToDownload = [] for substance in substances: fileName = 'substance/' + substance.uniqueID debugOutput = 'Getting substance ' + str( substance.uniqueID ) + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') substancesOnDisk.append(substance) else: if settings.verbosity >= 3: print(debugOutput + 'download.') substancesToDownload.append(substance) # get substances from disk substancesDict = dict() for substance in substancesOnDisk: fileName = 'substance/' + substance.uniqueID fileContent = File.readStringFromFileAtOnce(fileName) substancesDict[substance.uniqueID] = Substance(fileContent) # download substances in bulk if len( substancesToDownload ) > 0: tqdmPosition = Parallelism.getTqdmPosition() threadPool = concurrent.futures.ThreadPoolExecutor(Parallelism.getNumberOfThreadsDownload()) futures = [] iterator = None try: # query KEGG in parallel for substance in substancesToDownload: futures.append( threadPool.submit(_downloadSubstance, substance) ) iterator = concurrent.futures.as_completed(futures) if settings.verbosity >= 1: if settings.verbosity >= 2: print( 'Downloading ' + str(len(substancesToDownload)) + ' substances...' ) iterator = tqdm.tqdm(iterator, total = len(substancesToDownload), unit = ' substances', position = tqdmPosition) for future in iterator: result_part = future.result() if result_part is not None: substance = Substance(result_part) substancesDict[substance.uniqueID] = substance fileName = 'substance/' + substance.uniqueID File.writeToFile(result_part, fileName) threadPool.shutdown(wait = False) except KeyboardInterrupt: # only raised in main thread (once in each process!) Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, terminateProcess=True) raise except BaseException: if Parallelism.isMainThread(): Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, silent=True) raise finally: if iterator is not None: iterator.close() return substancesDict
def _downloadSubstance(substance): if Parallelism.getShallCancelThreads() is True: raise concurrent.futures.CancelledError() else: try: substance = Download.downloadSubstance(substance.uniqueID) except urllib.error.HTTPError as exception: if isinstance(exception, urllib.error.HTTPError) and exception.code == 404: return None else: raise return substance
[docs]def getEcEnzymeBulk(ecNumbers: Iterable[EcNumber]) -> Dict[str, EcEnzyme]: """ Get multiple enzyme descriptions, defined by its EC number. Downloads the data from KEGG in bulk, if not already present on disk. This is done in parallel in a thread pool, see :attr:`FEV_KEGG.settings.downloadThreads`. Parameters ---------- ecNumbers : Iterable[EcNumber] Enzymes to be downloaded. Returns ------- Dict[str, EcEnzyme] Each found enzyme, keyed by the unique ID of the EC number used to search it. Raises ------ IOError If result is too small. Possibly because none of the genes of a download-chunk existed. URLError If connection to KEGG fails. """ # split list into ecEnzymes on disk and ecEnzymes not downloaded yet ecEnzymesOnDisk = [] ecEnzymesToDownload = [] for ecNumber in ecNumbers: # skip EC numbers with wildcards, because they are obviously not in the database if ecNumber.hasWildcard(): continue fileName = 'EC_number/' + ecNumber.uniqueID debugOutput = 'Getting EcEnzyme ' + str( ecNumber.uniqueID ) + ' from ' if File.doesFileExist(fileName): if settings.verbosity >= 3: print(debugOutput + 'disk.') ecEnzymesOnDisk.append(ecNumber) else: if settings.verbosity >= 3: print(debugOutput + 'download.') ecEnzymesToDownload.append(ecNumber) # get EcEnzymes from disk ecEnzymesDict = dict() for ecEnzyme in ecEnzymesOnDisk: fileName = 'EC_number/' + ecEnzyme.uniqueID fileContent = File.readStringFromFileAtOnce(fileName) ecEnzymesDict[ecEnzyme.uniqueID] = EcEnzyme(fileContent) # download EcEnzymes in bulk if len( ecEnzymesToDownload ) > 0: tqdmPosition = Parallelism.getTqdmPosition() threadPool = concurrent.futures.ThreadPoolExecutor(Parallelism.getNumberOfThreadsDownload()) futures = [] iterator = None try: # query KEGG in parallel for ecNumber in ecEnzymesToDownload: futures.append( threadPool.submit(_downloadEcEnzyme, ecNumber) ) iterator = concurrent.futures.as_completed(futures) if settings.verbosity >= 1: if settings.verbosity >= 2: print( 'Downloading ' + str(len(ecEnzymesToDownload)) + ' EcEnzymes...' ) iterator = tqdm.tqdm(iterator, total = len(ecEnzymesToDownload), unit = ' EcEnzymes', position = tqdmPosition) for future in iterator: result_part = future.result() if result_part is not None: ecEnzyme = EcEnzyme(result_part) ecEnzymesDict[ecEnzyme.uniqueID] = ecEnzyme fileName = 'EC_number/' + ecEnzyme.uniqueID File.writeToFile(result_part, fileName) threadPool.shutdown(wait = False) except KeyboardInterrupt: # only raised in main thread (once in each process!) Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, terminateProcess=True) raise except BaseException: if Parallelism.isMainThread(): Parallelism.keyboardInterruptHandler(threadPool=threadPool, threadPoolFutures=futures, silent=True) raise finally: if iterator is not None: iterator.close() return ecEnzymesDict
def _downloadEcEnzyme(ecNumber: EcNumber): if Parallelism.getShallCancelThreads() is True: raise concurrent.futures.CancelledError() else: try: ecEnzyme = Download.downloadEcEnzyme(ecNumber.uniqueID) except urllib.error.HTTPError as exception: if isinstance(exception, urllib.error.HTTPError) and exception.code == 404: return None else: raise return ecEnzyme