from enum import Enum
import re
import anytree
from anytree.node.node import Node
from typing import Iterable, List
from FEV_KEGG.KEGG import Database
from FEV_KEGG.KEGG.File import cache
from FEV_KEGG import settings
[docs]class TaxonType(Enum):
"""
Type of a taxon.
"""
ROOT = 0
"""
Root taxon, i.e. '/'.
"""
ORGANISM = 1
"""
Organism taxon, i.e. a leaf with a unique sequenced genome.
"""
SPECIES = 2
"""
Species taxon, e.g. 'Escherichia Coli'.
"""
OTHER = 3
"""
Other taxon, i.e. any other taxonomic rank in between.
"""
[docs]class Taxonomy(object):
def __init__(self, rawLines, isNCBI):
"""
Generic taxonomy of organisms in KEGG.
Parameters
----------
rawLines : List[str]
List of lines making up the raw data of a known taxonomy, either NCBI or KEGG.
isNCBI : bool
If *True*, `rawLines` is parsed as NCBI taxonomy. If *False*, `rawLines` is parsed as KEGG taxonomy.
Attributes
----------
self.indexOnAbbreviation : Dict[str, :class:`anytree.node.node.Node`]
Index to find a :class:`anytree.node.node.Node` for an organism abbreviation, with `.type` == :attr:`TaxonType.ORGANISM`.
self.tree : :class:`anytree.node.node.Node`
The root node of the taxonomy, with `.type` == :attr:`TaxonType.ROOT`.
"""
self.indexOnAbbreviation = dict()
self.tree = self._parse(rawLines, isNCBI)
[docs] def getOrganismNodeByAbbreviation(self, abbreviation: 'eco') -> Node:
"""
Get node for an organism by its abbreviation.
Parameters
----------
abbreviation : str
Abbreviation of the organism in KEGG.
Returns
-------
:class:`anytree.node.node.Node`
Node of `.type` == :attr:`TaxonType.ORGANISM` with `.abbreviation` == `abbreviation`. *None* if none can be found.
"""
return self.indexOnAbbreviation.get(abbreviation, None)
[docs] def getOrganismNodesByName(self, name: 'Escherichia', oneOrganismPerSpecies = settings.defaultOneOrganismPerSpecies) -> List[Node]:
"""
Get nodes for organisms by a part of their name.
Parameters
----------
name : str
Part of the name of the desired organism taxons. This does **not** search parts of the path! The name may be abbreviated, i.e. 'Escherichia' will match 'Escherichia Coli K-12 MG1655'.
oneOrganismPerSpecies : bool, optional
If *True*, return only the first organism node of each species node.
Returns
-------
List[:class:`anytree.node.node.Node`]
List of organism nodes containing `name` in their name attribute. *None* if none found.
"""
if oneOrganismPerSpecies is True:
organismNodes = []
speciesNodes = self.searchNodesByName(name, TaxonType.SPECIES)
for speciesNode in speciesNodes:
organismNode = speciesNode.descendants[0]
organismNodes.append(organismNode)
return organismNodes
else:
return self.searchNodesByName(name, TaxonType.ORGANISM)
[docs] def getOrganismNodesByPath(self, path: 'Gammaproteobacteria/Enterobacterales', exceptPaths: List['Gammaproteobacteria/unclassified'] = None, oneOrganismPerSpecies = settings.defaultOneOrganismPerSpecies) -> List[Node]:
"""
Get nodes for organisms by a part of their `path`.
Parameters
----------
path : str
Part of the path of the desired organism taxons. The parts of the path specified here have to match the wording of the path nodes exactly, i.e. 'Enterobac' will **not** match 'Enterobacterales'.
exceptPaths : Iterable[str] or str
Paths which match any of these will not be returned. Accepts iterables of exceptions or a single string exception.
oneOrganismPerSpecies : bool, optional
If *True*, return only the first organism node of each species node.
Returns
-------
List[:class:`anytree.node.node.Node`]
List of organism nodes containing `path` in their path. *None* if none found.
"""
if oneOrganismPerSpecies is True:
organismNodes = []
speciesNodes = self.searchNodesByPath(path, TaxonType.SPECIES, exceptPaths)
for speciesNode in speciesNodes:
try:
organismNode = speciesNode.descendants[0]
except IndexError:
continue
organismNodes.append(organismNode)
return organismNodes
else:
return self.searchNodesByPath(path, TaxonType.ORGANISM, exceptPaths)
[docs] def getOrganismAbbreviations(self, nodes: Iterable[Node]) -> List[str]:
"""
Get abbreviations of organisms for organism taxon `nodes`.
Parameters
----------
nodes : List[:class:`anytree.node.node.Node`]
List of organism taxon nodes. These nodes are **not** traversed to find child nodes!
Returns
-------
List[str]
List of organism abbreviations from the `nodes` passed. *None* if no :attr:`TaxonType.ORGANISM` node was passed.
"""
if nodes is None:
return None
abbreviations = []
for node in nodes:
if node.type == TaxonType.ORGANISM:
abbreviations.append(node.abbreviation)
if len(abbreviations) == 0:
abbreviations = None
return abbreviations
[docs] def getOrganismAbbreviationsByPath(self, path: 'Gammaproteobacteria/Enterobacterales', exceptPaths: List['Gammaproteobacteria/unclassified'] = None, oneOrganismPerSpecies = settings.defaultOneOrganismPerSpecies) -> List[str]:
"""
Get abbreviations of organisms by a part of their `path`.
Parameters
----------
path : str
Part of the path of the desired organism taxons. The parts of the path specified here have to match the wording of the path nodes exactly, i.e. 'Enterobac' will **not** match 'Enterobacterales'.
exceptPaths : Iterable[str] or str
Paths which match any of these will not be returned. Accepts iterables of exceptions or a single string exception.
oneOrganismPerSpecies : bool, optional
If *True*, return only the first organism node of each species node.
Returns
-------
List[str]
List of organism abbreviations from the organism taxon nodes found at the end of `path`. *None* if no `path` leading to an :attr:`TaxonType.ORGANISM` node was passed.
"""
return self.getOrganismAbbreviations( self.getOrganismNodesByPath(path, exceptPaths, oneOrganismPerSpecies=oneOrganismPerSpecies) )
[docs] def getOrganismAbbreviationsByName(self, name: 'Escherichia', oneOrganismPerSpecies = settings.defaultOneOrganismPerSpecies) -> List[str]:
"""
Get abbreviations of organisms by a part of their `name`.
Parameters
----------
name : str
Part of the name of the desired organism taxons. The name may be abbreviated, i.e. 'Escherichia' will match 'Escherichia Coli K-12 MG1655'.
oneOrganismPerSpecies : bool, optional
If *True*, return only the first organism node of each species node.
Returns
-------
List[str]
List of organism abbreviations containing `name` in their name attribute. *None* if none found.
"""
return self.getOrganismAbbreviations( self.getOrganismNodesByName(name, oneOrganismPerSpecies=oneOrganismPerSpecies) )
[docs] def searchNodesByName(self, name: 'Escherichia', taxonType: TaxonType = None) -> List[Node]:
"""
Search taxons of a certain type by their name.
Parameters
----------
name : str
Name of the taxon to be found. The name may be abbreviated, i.e. 'Escherichia' will match 'Escherichia Coli K-12 MG1655'.
taxonType : TaxonType, optional
Type of the taxons to be searched. Taxons of any other type are ignored. If *None*, all taxon types are searched.
Returns
-------
List[:class:`anytree.node.node.Node`]
All Nodes containing `name` in their name attribute. *None* if none can be found.
Only taxons of :class:`TaxonType` `taxonType` are returned. If *None*, all taxon types are considered.
"""
resultsTuple = anytree.search.findall(self.tree, filter_ = lambda node: (taxonType is node.type or taxonType is None) and name in node.name)
if len( resultsTuple ) == 0:
return None
else:
return list(resultsTuple)
[docs] def searchNodesByPath(self, path: 'Gammaproteobacteria/Enterobacterales', taxonType: TaxonType = None, exceptPaths: 'list of "Gammaproteobacteria/unclassified Bacteria" etc.' = None) -> List[Node]:
"""
Search taxons of a certain type by their path, allowing exceptions.
Parameters
----------
path : str
Part of the path of the desired organism taxons. The parts of the path specified here have to match the wording of the path nodes exactly, i.e. 'Enterobac' will **not** match 'Enterobacterales'.
taxonType : TaxonType, optional
Type of the taxons to be searched. Taxons of any other type are ignored. If *None*, all taxon types are searched.
exceptPaths : Iterable[str] or str
Paths which match any of these will not be returned. Accepts iterables of exceptions or a single string exception.
Returns
-------
List[:class:`anytree.node.node.Node`]
All nodes containing `path` in their path. *None* if none can be found.
Each path element has to be delimited by a slash ('/').
Each path element has to match the name of the intermediate taxon exactly, i.e. 'Enterobac' will **not** match 'Enterobacterales'.
"""
pathElements = path.split('/')
if path.startswith("/"):
pathElements[0] = "root"
lastPathElement = pathElements.pop()
pathElements.reverse()
nodesFound = anytree.search.findall(self.tree, filter_ = lambda node: node.name == lastPathElement)
matchingNodes = []
# find nodes down to the supplied path
for node in nodesFound:
parent = node.parent
nodeMatches = True
for lastPathElement in pathElements:
if parent.name != lastPathElement:
nodeMatches = False
break
else:
parent = parent.parent
if nodeMatches is True:
matchingNodes.append(node)
# find all children of surviving nodes, filter by TaxonType
validNodes = []
for node in matchingNodes:
descendants = node.descendants
if taxonType is not None:
for descendant in descendants:
if descendant.type == taxonType:
validNodes.append(descendant)
else:
validNodes = matchingNodes
validNodes.extend(descendants)
if exceptPaths is None:
return validNodes
else:
if not isinstance(exceptPaths, Iterable) or isinstance(exceptPaths, str):
exceptPaths = [exceptPaths]
# filter excepted paths
filteredNodes = []
for node in validNodes:
filterNode = False
for exception in exceptPaths:
nodePath = Taxonomy.nodePath2String(node)
if exception in nodePath:
filterNode = True
break
if filterNode is False:
filteredNodes.append(node)
return filteredNodes
[docs] @staticmethod
def nodePath2String(node: Node) -> str:
"""
Parameters
----------
node : :class:`anytree.node.node.Node`
Node which' path to be expressed as a string.
Returns
-------
str
Full path of `node`, expressed as string. Each taxon level is delimited by a slash ('/').
"""
return '/'.join([''] + [str(x.name) for x in node.path])
def _parse(self, raw, isNCBI) -> Node:
speciesRegex = re.compile(' \[TAX:[\d]+\]$')
organismRegex = re.compile('^([a-z]{3,4}) ')
root = Node('root', type = TaxonType.ROOT)
previousNode = root
previousLevel = 0
for line in raw:
# filter empty line
if len(line) == 0:
continue
levelCharacter = line[0]
# filter comments etc. Eveything but line starting with an uppercase letter.
if not levelCharacter.isupper():
continue
levelNumber = ord(levelCharacter) - 64
# check if the level is [A-Z]
if levelNumber < 1 or levelNumber > 26:
continue
entry = line[1:].strip()
# has level changed?
levelChange = levelNumber - previousLevel
# find parent Node
if levelChange == 0: # same level
parent = previousNode.parent
elif levelChange < 0: # went up in tree
parent = previousNode.parent
for _ in range(-levelChange): # for each level we jumped up the tree
parent = parent.parent # trace back parents
else: # went down in tree
parent = previousNode
# is this a species?
isSpecies = False
if parent.type is TaxonType.OTHER:
speciesSplit = speciesRegex.split(entry)
if len(speciesSplit) > 1: # is species
isSpecies = True
species = speciesSplit[0]
# is this an organism?
isOrganism = False
if isNCBI is False or parent.type == TaxonType.SPECIES:
organismSplit = organismRegex.split(entry)
if len(organismSplit) > 1: # is organism
isOrganism = True
abbreviation = organismSplit[1]
name = organismSplit[2]
elif isNCBI is True: # for NCBI only, this is an incomplete organism (no abbreviation, only Taxon number)
continue
# create new Node
if isOrganism is True:
newNode = Node(name, parent, type = TaxonType.ORGANISM, abbreviation = abbreviation)
self.indexOnAbbreviation[abbreviation] = newNode
elif isSpecies is True:
newNode = Node(species, parent, type = TaxonType.SPECIES)
else:
newNode = Node(entry, parent, type = TaxonType.OTHER)
# save variables for next round
previousNode = newNode
previousLevel = levelNumber
return root
[docs]class NCBI(Taxonomy):
"""
The taxonomy of organisms in KEGG, following the NCBI scheme: `<http://www.kegg.jp/kegg-bin/get_htext?br08610.keg>`_
"""
[docs] @staticmethod
@cache(folder_path = 'taxonomy/', file_name = 'NCBI_parsed')
def getTaxonomy() -> Taxonomy:
"""
Downloads and parses raw taxonomy from KEGG into an anytree object.
Returns
-------
Taxonomy
NCBI taxonomy.
Raises
------
URLError
If connection to KEGG fails.
"""
raw = Database.getTaxonomyNCBI()
return Taxonomy(raw, isNCBI = True)
[docs]class KEGG(Taxonomy):
"""
The taxonomy of organisms in KEGG, following KEGG's own scheme: `<http://www.kegg.jp/kegg-bin/get_htext?br08601.keg>`_
"""
[docs] @staticmethod
@cache(folder_path = 'taxonomy/', file_name = 'KEGG_parsed')
def getTaxonomy() -> Taxonomy:
"""
Downloads and parses raw taxonomy from KEGG into an anytree object.
Returns
-------
Taxonomy
KEGG taxonomy.
Raises
------
URLError
If connection to KEGG fails.
"""
raw = Database.getTaxonomyKEGG()
return Taxonomy(raw, isNCBI = False)