Source code for FEV_KEGG.Graph.Models

from math import ceil

import networkx.algorithms.components
import networkx.algorithms.operators.all
import networkx.algorithms.shortest_paths
from typing import Set, Tuple, Generator, Iterable, List, Dict

from FEV_KEGG.Graph import Elements
from FEV_KEGG.Graph.Implementations.NetworkX import NetworkX, MultiDiGraph, MultiGraph
from networkx.exception import NetworkXNoPath, NodeNotFound




[docs]class MutablePath(): def __init__(self, node1: Elements.Element, edge1: 'Elements.Element or Iterable[Elements.Element]', node2: Elements.Element): """ Mutable directed path of nodes connected by multiple edges. The path can be elongated after creation. Cycles or loops are not forbidden, you should stay aware of them. Parameters ---------- node1 : Element First node of the path. edge1 : Element or Iterable[Element] First edge key of the path, connecting `node1` and `node2`. May be an Iterable, if multiple edges connect the two nodes. node2 : Element Second node of the path. Attributes ---------- self.nodes : List[Element] List of nodes in order of appearance in the path. self.edges : List[Element or FrozenSet[Element]] List of multi-edges in order of appearance in the path. A multi-edge may be just a single edge, or a set of parallel edges. self.edgesExpanded : Set[Element] Set of all edges in the path in arbitrary order. This is calculated by expanding all multi-edges in `self.edges` into a single set. self.path : List[Element or FrozenSet[Element]] List of nodes and multi-edges, alternating in order of appearance in the path. A multi-edge may be just a single edge, or a set of parallel edges. Raises ------ ValueError If `edge1` is an empty :class:`Iterable`. TypeError If you try to use a mutable path in a set. Warnings -------- A path must be made immutable to be able to use it in a set! See :class:`Path`. Otherwise, a TypeError will be raised at some point. """ self.path = [node1] self.elongate(edge1, node2) @property def nodes(self): return self.path[::2] @property def edges(self): return self.path[1::2] @property def edgesExpanded(self): edgesExpanded = set() for edgeEntry in self.edges: if isinstance(edgeEntry, Elements.Element): # single element edgesExpanded.add(edgeEntry) else: # iterable of elements edgesExpanded.update(edgeEntry) return edgesExpanded
[docs] def elongate(self, nextEdge: 'Elements.Element or Iterable[Elements.Element]', nextNode: Elements.Element): """ Elongates the path by another (multiple) edge and node. Parameters ---------- nextEdge : Element or Iterable[Element] Next edge key of the path, connecting the last node with `nextNode`. May be an :class:`Iterable`, if multiple edges connect the two nodes. nextNode : Element Next node of the path. Raises ------ ValueError If `nextEdge` is an empty :class:`Iterable`. """ if not isinstance(nextEdge, Elements.Element): if len(nextEdge) == 0: # error on wrong data raise ValueError("List of edges is empty.") elif len(nextEdge) == 1: # unpack Iterable with single entry nextEdge = nextEdge.pop() else: # put into set nextEdge = frozenset(nextEdge) self.path.append(nextEdge) self.path.append(nextNode)
def __len__(self): return len(self.nodes) def __eq__(self, other): if isinstance(self, other.__class__): return self.path == other.path return False def __ne__(self, other): return not self == other def __str__(self): return self.path.__str__() def __repr__(self): return self.__str__()
[docs]class Path():#TODO: omptimise memory usage, especially when there are tens of thousands of paths in memory def __init__(self, mutablePath: MutablePath): """ Directed path of nodes connected by multiple edges, immutable. The path can not be changed after creation. Cycles or loops are not forbidden, you should stay aware of them. Parameters ---------- mutablePath : MutablePath The path to be made immutable. Attributes ---------- self.nodes : Tuple[Element] List of nodes in order of appearance in the path. self.edges : Tuple[Element or FrozenSet[Element]] List of multi-edges in order of appearance in the path. A multi-edge may be just a single edge, or a set of parallel edges. self.edgesExpanded : FrozenSet[Element] Set of all edges in the path in arbitrary order. This is calculated by expanding all multi-edges in `self.edges` into a single set. self.path : Tuple[Element or FrozenSet[Element]] List of nodes and multi-edges, alternating in order of appearance in the path. A multi-edge may be just a single edge, or a set of parallel edges. Raises ------ ValueError If `edge1` is an empty :class:`Iterable`. TypeError If you try to use a mutable path in a set. Warnings -------- A path must be made immutable to be able to use it in a set! See :class:`Path`. Otherwise, a TypeError will be raised at some point. """ # finalise mutable path self.path = tuple(mutablePath.path) @property def nodes(self): return self.path[::2] @property def edges(self): return self.path[1::2] @property def edgesExpanded(self): edgesExpanded = set() for edgeEntry in self.edges: if isinstance(edgeEntry, Elements.Element): # single element edgesExpanded.add(edgeEntry) else: # iterable of elements edgesExpanded.update(edgeEntry) return frozenset(edgesExpanded)
[docs] def toHtml(self, short = False): html = '' arrow = "->" openTd = "<td>" closeTd = "</td>" newLine = "<br/>" for index, step in enumerate(self.path): html += openTd if isinstance(step, Iterable): for element in step: html += element.toHtml(short = short, noTd = True) + newLine else: html += step.toHtml(short = short, noTd = True) html += closeTd if index < len(self.path) - 1: html += openTd html += arrow html += closeTd return html
def __len__(self): return len(self.nodes) def __eq__(self, other): if isinstance(self, other.__class__): return self.path == other.path return False def __ne__(self, other): return not self == other def __str__(self): return self.path.__str__() def __repr__(self): return self.__str__() def __hash__(self): return self.path.__hash__()
[docs]class MarkedPath(Path): def __init__(self, path: Path, specialKeys: Set[Elements.Element] = None, specialNodes: Set[Elements.Element] = None): """ Immutable linear :class:`Path`, including markings for special edges/nodes. Parameters ---------- path : Path The path to be marked. specialKeys : Set[Element], optional Special edge keys. specialNodes : Set[Element], optional Special nodes. Attributes ---------- self.specialKeys : Set[Element] Set of edge keys of the `path` which have been marked as special. *None* if `specialKeys` == *None*, empty if no special edges found. Special edges with parallel edges are also listed here. self.parallelSpecialKeys : Dict[Element, int] If there are multiple edges between two nodes, and one of them is marked as special, this special edge is one of x edges between those nodes. This dictionary contains the special edge pointing to its x. Special edges without parallel edges are not listed here. *None* if `specialKeys` == *None*, empty if no special edges with parallel edges found. self.specialNodes : Set[Element] Set of nodes of the `path` which have been marked as special. *None* if `specialNodes` == *None*, empty if no special nodes found. Raises ------ ValueError If both `specialKeys` and `specialNodes` are *None*. """ if specialKeys is None and specialNodes is None: raise ValueError("Both, special edges and special nodes are None.") # copy from Path self.path = path.path # new attributes self.specialKeys = None self.parallelSpecialKeys = None self.specialNodes = None if specialKeys is not None: self.specialKeys = set() self.parallelSpecialKeys = dict() # search path for special edges for edge in path.edges: # iterate (multi-)edges of path if not isinstance(edge, Elements.Element): # multi-edge for key in edge: if key in specialKeys: self.specialKeys.add(key) self.parallelSpecialKeys[key] = len(edge) else: # single edge if edge in specialKeys: self.specialKeys.add(edge) if specialNodes is not None: self.specialNodes = set() # search path for special nodes for node in path.nodes: if node in specialNodes: self.specialNodes.add(node) @property def hasSpecialKey(self) -> bool: """ Whether this marked path has a special edge. Returns ------- bool """ return self.specialKeys is not None and len(self.specialKeys) > 0 @property def hasSpecialNode(self) -> bool: """ Whether this marked path has a special node. Returns ------- bool """ return self.specialNodes is not None and len(self.specialNodes) > 0
[docs]class CommonGraphApi(object): # choose a lib as implementation of graphs implementationLib = NetworkX """ Implementation library of a general graph. This implementation should be able to differentiate between directed and undirected graphs, see :attr:`DirectedMultiGraph.implementationGraph` and :attr:`UndirectedMultiGraph.implementationGraph` """ def __init__(self, underlyingRawGraph = None): """ Represents any type of graph. The library to implement graphs is chosen here. Parameters ---------- underlyingRawGraph : :attr:`implementationLib`, optional If not *None*, copies `underlyingRawGraph` and stores it for this object. Attributes ---------- self.underlyingRawGraph : :mod:`FEV_KEGG.Graph.Implementations` The actual graph containing the data. This is dependant on the implementation chosen in :attr:`implementationLib`. self.name : str Custom name of the graph. This is often set, but not necessary in any calculations. self.nodeCounts : Dict[Element, int], optional Number of precursor graphs which contained certain :class:`Element` nodes still in this graph. *None* by default. self.edgeCounts : Dict[Tuple[Element, Element, Element], int], optional Number of precursor graphs which contained a certain edge (a Tuple of three :class:`Element`) still in this graph. *None* by default. self.edgeElementCounts : Dict[Element, int], optional Number of precursor graphs which contained certain :class:`Element` edge keys still in this graph. *None* by default. """ if underlyingRawGraph != None: self.underlyingRawGraph = underlyingRawGraph.copy() self.nodeCounts = None self.edgeCounts = None self.edgeElementCounts = None @property def name(self): """ Custom name of the graph. This is often set in calcuations, but not used for any calculations. Returns ------- str Custom name of the graph. """ return self.underlyingRawGraph.name @name.setter def name(self, name: str): self.underlyingRawGraph.name = name
[docs] @classmethod def composeAll(cls, graphs: Iterable['CommonGraphApi'], name: str = None, pathwaySet = None) -> 'CommonGraphApi': """ Simple UNION of node and edge lists. A node is defined by its hash(). An edge is defined by Tuple[node1, node2, hash(edge key)], while the order of node1 and node2 encodes the direction, if the graph is directed. This is similar to :func:`union`, but aims at a special use case. You will most likely want to use :func:`union`. Parameters ---------- graphs : Iterable[CommonGraphApi] Iterable of graphs to be composed. name : str, optional Name of the new graph. pathwaySet : Set[KGML_pathway.Pathway], optional Set of pathways this graph was derived from. Especially useful for e.g. :func:`FEV_KEGG.Graph.SubstanceGraphs.Conversion.SubstanceReactionGraph2SubstanceGeneGraph`. Returns ------- CommonGraphApi Composition of all `graphs` by simple union operation. Includes `pathwaySet`, if given. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # for all implementations allUnderlyingGraphs = [] for abstractGraph in graphs: underlyingGraph = abstractGraph.underlyingRawGraph allUnderlyingGraphs.append(underlyingGraph) # NetworkX was chosen as graph implementation if cls.implementationLib == NetworkX: newUnderlyingGraph = networkx.algorithms.operators.all.compose_all(allUnderlyingGraphs) newGraph = cls() newGraph.underlyingRawGraph = newUnderlyingGraph if name is not None: newGraph.name = name if pathwaySet is not None: # some graph had a set of pathways it was derived from, apply it to the new graph newGraph.pathwaySet = pathwaySet return newGraph # unknown implementation else: raise NotImplementedError
[docs] def getNodes(self) -> Set[Elements.Element]: """ Get all nodes. Returns ------- Set[Element] A set-like object of all nodes. Even though the type list does not enforce it, this should never return duplicates. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: return self.underlyingRawGraph.nodes # unknown implementation else: raise NotImplementedError
[docs] def getEdges(self, fromNode: Elements.Element = None, toNode: Elements.Element = None) -> Set[Tuple]: """ Get all edges, optionally directly between two nodes. Parameters ---------- fromNode : Element The node where the edges start. toNode : Element The node where the edges end. Returns ------- Set[Tuple[Element, Element, Element]] A set-like object of all edges, defined by Tuples of (node1, node2, edge key). This is **not** a copy, but the original internal list. Do **not** change while iterating! Make a copy instead: copy = list(getEdges()) Only returns outgoing edges, so that no edge is reported twice. If `fromNode` and `toNode` are specified, returns only edges directly between these nodes. If there are none, returns an empty set. Does *not* report whole paths, only single edges! Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationGraph == MultiDiGraph: if fromNode is not None and toNode is not None: try: keys = self.underlyingRawGraph[fromNode][toNode] edges = set() for key in keys: edges.add((fromNode, toNode, key)) return edges except (KeyError, NetworkXNoPath, NodeNotFound): return set() else: return self.underlyingRawGraph.edges(keys=True) # unknown implementation else: raise NotImplementedError
[docs] def getEdgesFromKey(self, key: Elements.Element) -> List[Tuple[Elements.Element, Elements.Element, Elements.Element]]: """ Get all edges with a certain key element. Parameters ---------- key : Element The key element of the edges to be returned. Returns ------- Set[Tuple[Element, Element, Element]] A set of all edges, defined by Tuples of (node1, node2, edge key), where edge key == `key`. This is a copy of the original internal list. Only returns outgoing edges, so that no edge is reported twice. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ allEdges = self.getEdges() matchingEdges = [] for edge in allEdges: _, _, element = edge if element == key: matchingEdges.append( edge ) return matchingEdges
[docs] def getEdgesForKey(self) -> Dict[Elements.Element, List[Tuple[Elements.Element, Elements.Element, Elements.Element]]]: """ Get all edges, sorted by their key element. Returns ------- Dict[Element, List[Tuple[Element, Element, Element]]] A dict of all key elements pointing to a list of their edge tuples. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ edges = self.getEdges() edgesByKey = dict() for edge in edges: _, _, key = edge edgeList = edgesByKey.get(key, []) edgeList.append(edge) edgesByKey[key] = edgeList return edgesByKey
[docs] def getEdgeKeys(self) -> Set[Elements.Element]: """ Get edge key elements of all edges. Returns ------- Set[Element] Set of all edge's key elements, extracted from edge tuples of (node1, node2, edge key). Element objects which are the edge key of multiple edges are only returned once in the set. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ edges = self.getEdges() elementSet = set() for edge in edges: _, _, element = edge elementSet.add(element) return elementSet
[docs] def addNode(self, node: Elements.Element): """ Add a node to the graph. Parameters ---------- node : Element Node to add to the graph, if not already present. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationGraph`. """ # NetworkX.MultiDiGraph was chosen as graph implementation if self.__class__.implementationGraph == MultiDiGraph: self.underlyingRawGraph.add_node(node) # unknown implementation else: raise NotImplementedError
[docs] def addNodes(self, nodes: Iterable[Elements.Element]): """ Add nodes to the graph. Parameters ---------- nodes : Iterable[Element] Iterable of elements to be added as nodes. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: self.underlyingRawGraph.add_nodes_from(nodes) # unknown implementation else: raise NotImplementedError
[docs] def addEdge(self, node1: Elements.Element, node2: Elements.Element, key: Elements.Element, isReversible: bool = False): """ Add an edge to the graph. Parameters ---------- node1 : Element Node from which the newly created edge starts. Is added to the graph, if not already present. node2 : Element Node at which the newly created edge ends. Is added to the graph, if not already present. key : Element Edge key element annotating the newly created edge. isReversible : bool, optional If *True*, both directions are added, swapping `node1` and `node2`. If the graph is undirected, this option is ignored. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationGraph`. """ # NetworkX.MultiDiGraph was chosen as graph implementation if self.__class__.implementationGraph == MultiDiGraph: self.underlyingRawGraph.add_edge(node1, node2, key) # automatically creates node, if not already present if isReversible == True: self.underlyingRawGraph.add_edge(node2, node1, key) # also add reverse direction # unknown implementation else: raise NotImplementedError
[docs] def addEdges(self, edges: Iterable[Tuple[Elements.Element, Elements.Element, Elements.Element]]): """ Add edges to the graph. Parameters ---------- edges : Iterable[Tuple[Element, Element, Element]] Iterable of edge tuples, defined as (node1, node2, edge key). If the nodes do not already exist, they are silently added. If the graph is directed, the order of node1 and node2 counts as direction. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: self.underlyingRawGraph.add_edges_from(edges) # unknown implementation else: raise NotImplementedError
[docs] def removeEdge(self, node1: Elements.Element, node2: Elements.Element, key:Elements.Element, bothDirections: bool = False): """ Remove an edge from the graph. You may want to :func:`removeIsolatedNodes` afterwards, to remove nodes that now have no edge. Parameters ---------- node1 : Element Node from which the edge to be removed starts. Is not removed itself. node2 : Element Node at which the edge to be removed ends. Is not removed itself. key : Element Edge key element annotating the edge to be removed. bothDirections : bool, optional If *True*, both directions are removed, swapping `node1` and `node2`. If the graph is undirected, this option is ignored. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationGraph`. """ # NetworkX.MultiDiGraph was chosen as graph implementation if self.__class__.implementationGraph == MultiDiGraph: self.underlyingRawGraph.remove_edge(node1, node2, key) if bothDirections == True: self.underlyingRawGraph.remove_edge(node2, node1, key) # also add reverse direction # unknown implementation else: raise NotImplementedError
[docs] def removeEdges(self, edges: Iterable[Tuple[Elements.Element, Elements.Element, Elements.Element]]): """ Remove certain `edges`. Parameters ---------- edges : Iterable[Tuple[Element, Element, Element]] Iterable of edge tuples for edges to be removed, defined as (node1, node2, edge key). If an edge to be removed does not exist, the next edge will be tried, without any error message. If the graph is directed, the order of node1 and node2 counts as direction. The edge of opposing direction is not removed. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: self.underlyingRawGraph.remove_edges_from(edges) # unknown implementation else: raise NotImplementedError
[docs] def removeEdgesByElements(self, elements: Iterable[Elements.Element]): """ Removes all edges associated with each of the :class:`Element` in `elements`. Parameters ---------- elements : Iterable[Element] Iterable of edge keys. Every edge keyed with an edge key equal (by __eq__) to any of these `elements` is removed. Direction of the graph does not affect removal. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX.MultiDiGraph was chosen as graph implementation if self.__class__.implementationGraph == MultiDiGraph: allEdges = self.getEdges() edgesToBeRemoved = [] for edgeTuple in allEdges: _, _, element = edgeTuple if element in elements: edgesToBeRemoved.append(edgeTuple) self.removeEdges(edgesToBeRemoved) # unknown implementation else: raise NotImplementedError
[docs] def getIsolatedNodes(self) -> Iterable[Elements.Element]: """ Get all nodes without any edge to another node. Returns ------- Iterable[Element] Iterable of nodes without any edge to another node. Even though the type does not enforce it, this should never return duplicates. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: return list(networkx.algorithms.isolate.isolates(self.underlyingRawGraph)) # unknown implementation else: raise NotImplementedError
[docs] def removeNodes(self, nodes: Iterable[Elements.Element]): """ Remove all `nodes`. Parameters ---------- nodes : Iterable[Element] Iterable of elements representing nodes to be removed. Any edges involving these nodes are removed as well! Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: self.underlyingRawGraph.remove_nodes_from(nodes) # unknown implementation else: raise NotImplementedError
[docs] def removeIsolatedNodes(self): """ Remove all nodes without any edge to another node. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ self.removeNodes(self.getIsolatedNodes()) return self
[docs] def removeSmallComponents(self, upToNumberOfNodes: int): """ Remove every isolated component of the graph with a total count of nodes <= `upToNumberOfNodes`. For a directed graph, this considers weakly connected components, too. This means that there do **not** have to be edges in **both** directions to be counted as a component. Even an edge in only one direction counts as connecting a component. Parameters ---------- upToNumberOfNodes : int Maximum number of nodes a component has to connect to be completely removed. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ components = self.getComponents() nodesToRemove = [] for componentNodes in components: if len( componentNodes ) <= upToNumberOfNodes: # component too small nodesToRemove.extend(componentNodes) # remove this component completely self.removeNodes(nodesToRemove) return self
[docs] def getComponents(self) -> Generator[Set, None, None]: """ Get all isolated components. For a directed graph, this considers weakly connected components, too. This means that there do **not** have to be edges in **both** directions to be counted as a component. Even an edge in only one direction counts as connecting a component. Returns ------- Generator[Set[Element]] Generator of any isolated component of the graph. Each represented by a set of their nodes, each represented by an Element. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: if isinstance(self, UndirectedMultiGraph): # undirected graph return networkx.algorithms.components.connected_components(self.underlyingRawGraph) elif isinstance(self, DirectedMultiGraph): # directed graph return networkx.algorithms.components.weakly_connected_components(self.underlyingRawGraph) # unknown implementation else: raise NotImplementedError
[docs] def getLargestComponentNodes(self) -> Set[Elements.Element]: """ Get nodes of the largest component. Returns ------- Set[Element] Set of all nodes, represented by an Element, of the largest component. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ return max(self.getComponents(), key=len)
[docs] def getLargestComponent(self) -> 'CommonGraphApi': """ Get the largest component. Returns ------- CommonGraphApi Copy of this graph, reduced to the largest component. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ return self.getSubgraph( self.getLargestComponentNodes() )
[docs] def getSubgraph(self, byNodes:Iterable[Elements.Element] = None, byEdges:Iterable[Tuple[Elements.Element, Elements.Element, Elements.Element]] = None) -> 'CommonGraphApi': """ Get sub-graph defined by nodes or edges. If both are passed, only nodes are used. If nothing is passed, *None* is returned. Parameters ---------- byNodes : Iterable[Element], optional Iterable of nodes defining the sub-graph. All edges between these nodes are conserved. byEdges : Iterable[Tuple[Element, Element, Element]], optional Iterable of edges defining the sub-graph, each defined as (node1, node2, edge key). All nodes involved with these edges, i.e. all node1's and node2's are preserved. Returns ------- CommonGraphApi Copy of the sub-graph specified by either nodes or edges. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: if byNodes is not None: return self.copy(self.underlyingRawGraph.subgraph(byNodes)) elif byEdges is not None: return self.copy(self.underlyingRawGraph.edge_subgraph(byEdges)) else: return None # unknown implementation else: raise NotImplementedError
[docs] def copy(self, underlyingRawGraph = None) -> 'CommonGraphApi': """ Shallow copy of the whole graph. However, some attributes are explicitly copied (although each attribute might in itself be shallowly copied): - .underlyingRawGraph - .name - .nodeCounts - .edgeCounts - .edgeElementCounts Parameters ---------- underlyingRawGraph : :mod:`FEV_KEGG.Graph.Implementations`, optional If given, does not copy the underlying raw graph, but uses this one. Returns ------- CommonGraphApi Shallow copy of the whole graph. """ copy = self.__class__(underlyingRawGraph = underlyingRawGraph) if underlyingRawGraph is None: copy.underlyingRawGraph = self.underlyingRawGraph.copy() if self.nodeCounts is not None: copy.nodeCounts = self.nodeCounts.copy() if self.edgeCounts is not None: copy.edgeCounts = self.edgeCounts.copy() if self.edgeElementCounts is not None: copy.edgeElementCounts = self.edgeElementCounts.copy() return copy
[docs] def difference(self, subtrahend: 'Graph to be subtracted', subtractNodes = False, updateName = False) -> 'CommonGraphApi': """ Difference between this graph and `subtrahend` graph, i.e. ``self - subtrahend``. You may want to :func:`removeIsolatedNodes` afterwards, to remove nodes that now have no edge. Parameters ---------- subtrahend : CommonGraphApi The graph to be subtracted. subtractNodes : bool, optional If *True*, also remove all nodes present in `subtrahend` from this graph. WARNING: This may remove edges that only exist in this graph, because they are removed with their associated node! updateName : bool, optional If *True*, update this graph's name. Returns ------- CommonGraphApi A copy of this graph, containing all nodes which are present in this graph and all edges present in this graph, but which are **not** present in `subtrahend`. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ copy = self.copy() subtrahendEdges = subtrahend.getEdges() copy.removeEdges(subtrahendEdges) if (subtractNodes == True): subtrahendNodes = subtrahend.getNodes() copy.removeNodes(subtrahendNodes) # update name if updateName: copy.name = 'Difference ( [' + self.name + '], [' + subtrahend.name + '] )' return copy
[docs] def intersection(self, withGraph: 'Graph to be intersected with, allows list of graphs', addCount = False, updateName = False) -> 'CommonGraphApi': """ Intersection of this graph and the graph(s) in `withGraph`. You may want to :func:`removeIsolatedNodes` afterwards, to remove nodes that now have no edge. Parameters ---------- withGraph : CommonGraphApi or Iterable[CommonGraphApi] The graph(s) this graph is to be intersected with. addCount : bool, optional If *True*, the returned graph contains extra dicts: - graph.nodeCounts[node] = number of graphs which contained this node - graph.edgeCounts[(node, node, element)] = number of graphs which contained this edge - graph.edgeElementCounts[element] = number of graphs which contained this element updateName : bool, optional If *True*, update this graph's name. Returns ------- CommonGraphApi A copy of this graph, containing nodes and edges present in both this graph and the other graph(s). Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # check if a list of graphs was passed if not isinstance(withGraph, Iterable): withGraph = [withGraph] nodesA = self.getNodes() edgesA = self.getEdges() if updateName: newGraphNameList = [self.name] if addCount is True: # count nodes and edges per graph nodesCount = dict.fromkeys(nodesA, 1) edgesCount = dict.fromkeys(edgesA, 1) elementSet = set() for edge in edgesA: _, _, element = edge elementSet.add(element) edgeElementsCount = dict.fromkeys(elementSet, 1) nodesIntersection = set(nodesA) edgesIntersection = set(edgesA) for graph in withGraph: # intersect set of nodes nodesB = graph.getNodes() if addCount is True: for node in nodesB: nodesCount[node] = nodesCount.get(node, 0) + 1 nodesIntersection = nodesIntersection.intersection(nodesB) # intersect set of edges edgesB = graph.getEdges() if addCount is True: elementSet = set() for edge in edgesB: edgesCount[edge] = edgesCount.get(edge, 0) + 1 _, _, element = edge elementSet.add(element) for element in elementSet: edgeElementsCount[element] = edgeElementsCount.get(element, 0) + 1 edgesIntersection = edgesIntersection.intersection(edgesB) if updateName: newGraphNameList.append(graph.name) # add intersected nodes and edges to new graph newGraph = self.__class__() newGraph.addNodes(nodesIntersection) newGraph.addEdges(edgesIntersection) # update name if updateName: newGraph.name = 'Intersection ( [' + '], ['.join(newGraphNameList) + '] )' # if requested, add node counts if addCount is True: newGraph.nodeCounts = nodesCount newGraph.edgeCounts = edgesCount newGraph.edgeElementCounts = edgeElementsCount return newGraph
[docs] def majorityIntersection(self, withGraph: 'Graph to be majority-intersected with, allows list of graphs', majorityPercentage = 51, addCount = False, updateName = False) -> 'CommonGraphApi': """ Majority-Intersection of this graph and the graph(s) in `withGraph`. You may want to :func:`removeIsolatedNodes` afterwards, to remove nodes that now have no edge. Parameters ---------- withGraph : CommonGraphApi or Iterable[CommonGraphApi] The graph(s) this graph is to be intersected with. majorityPercentage : float, optional The majority percentage means 'at least x%' and is rounded up. For example 90% of 11 organisms (including the organism this method is called on) would be ceiling(9,9) = 10 organisms. If the rounded majority total effectively equated to 100% of all graphs, regular :func:`intersection` is called instead. If only one graph is passed in `withGraph` AND the rounded majority total effectively equates 1, regular :func:`union` is called instead. addCount : bool, optional If *True*, the returned graph contains extra dicts: - graph.nodeCounts[node] = number of graphs which contained this node - graph.edgeCounts[(node, node, element)] = number of graphs which contained this edge - graph.edgeElementCounts[element] = number of graphs which contained this element updateName : bool, optional If *True*, update this graph's name. Returns ------- CommonGraphApi A copy of this graph, containing all nodes and edges present in the majority of this graph and the other graph(s). Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # check if majorityPercentage is sane if majorityPercentage <= 0 or majorityPercentage > 100: raise ValueError('Majority percentage is not a sane value (0 < percentage <= 100): ' + str(majorityPercentage)) # check if a list of graphs was passed # calculate total number of graphs needed for majority if isinstance(withGraph, Iterable): withGraphLength = len( withGraph ) majorityTotal = ceil((majorityPercentage/100) * (withGraphLength + 1)) if majorityTotal >= (withGraphLength + 1): # effectively 100% majority needed, >= because of float rounding error return self.intersection(withGraph) else: if majorityPercentage <= 50: # effectively 'or', thus union return self.union(withGraph, addCount) else: # effectively 100% majority needed, thus intersection return self.intersection(withGraph) # multiple graphs passed, single graphs are completely handled above if updateName: newGraphNameList = [self.name] nodesA = self.getNodes() edgesA = self.getEdges() # count nodes and edges per graph nodesCount = dict.fromkeys(nodesA, 1) edgesCount = dict.fromkeys(edgesA, 1) if addCount is True: elementSet = set() for edge in edgesA: _, _, element = edge elementSet.add(element) edgeElementsCount = dict.fromkeys(elementSet, 1) for graph in withGraph: nodesB = graph.getNodes() for node in nodesB: nodesCount[node] = nodesCount.get(node, 0) + 1 edgesB = graph.getEdges() if addCount is True: elementSet = set() for edge in edgesB: edgesCount[edge] = edgesCount.get(edge, 0) + 1 if addCount is True: _, _, element = edge elementSet.add(element) if addCount is True: for element in elementSet: edgeElementsCount[element] = edgeElementsCount.get(element, 0) + 1 if updateName: newGraphNameList.append(graph.name) # remove nodes and edges with count < majorityTotal for item in list( nodesCount.items() ): node, count = item if count < majorityTotal: # count not high enough del nodesCount[node] for item in list( edgesCount.items() ): edge, count = item if count < majorityTotal: # count not high enough del edgesCount[edge] # add majority-intersected nodes and edges to new graph newGraph = self.__class__() newGraph.addNodes(nodesCount.keys()) newGraph.addEdges(edgesCount.keys()) # update name if updateName: newGraph.name = 'Majority-Intersection ' + str(majorityPercentage) + '% ( [' + '], ['.join(newGraphNameList) + '] )' # if requested, add node counts if addCount is True: newGraph.nodeCounts = nodesCount newGraph.edgeCounts = edgesCount newGraph.edgeElementCounts = edgeElementsCount return newGraph
[docs] def union(self, withGraph: 'Graph to be unified with, allows list of graphs', addCount = False, updateName = False) -> 'CommonGraphApi': """ Union of this graph and the graph(s) in `withGraph`. Parameters ---------- withGraph : CommonGraphApi or Iterable[CommonGraphApi] The graph(s) this graph is to be unified with. addCount : bool, optional If *True*, the returned graph contains extra dicts: - graph.nodeCounts[node] = number of graphs which contained this node - graph.edgeCounts[(node, node, element)] = number of graphs which contained this edge - graph.edgeElementCounts[element] = number of graphs which contained this element updateName : bool, optional If *True*, update this graph's name. Returns ------- CommonGraphApi A copy of this graph, containing all nodes and all edges present in any of this graph or the other graph(s). Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # check if a list of graphs was passed if not isinstance(withGraph, Iterable): withGraph = [withGraph] nodesA = self.getNodes() edgesA = self.getEdges() if updateName: newGraphNameList = [self.name] if addCount is True: # count nodes and edges per graph nodesCount = dict.fromkeys(nodesA, 1) edgesCount = dict.fromkeys(edgesA, 1) elementSet = set() for edge in edgesA: _, _, element = edge elementSet.add(element) edgeElementsCount = dict.fromkeys(elementSet, 1) for graph in withGraph: # unify set of nodes nodesB = graph.getNodes() for node in nodesB: nodesCount[node] = nodesCount.get(node, 0) + 1 # unify set of edges edgesB = graph.getEdges() elementSet = set() for edge in edgesB: edgesCount[edge] = edgesCount.get(edge, 0) + 1 _, _, element = edge elementSet.add(element) for element in elementSet: edgeElementsCount[element] = edgeElementsCount.get(element, 0) + 1 if updateName: newGraphNameList.append(graph.name) nodesUnion = nodesCount.keys() edgesUnion = edgesCount.keys() else: nodesUnion = set(nodesA) edgesUnion = set(edgesA) for graph in withGraph: # unify set of nodes nodesB = graph.getNodes() nodesUnion = nodesUnion.union(nodesB) # unify set of edges edgesB = graph.getEdges() edgesUnion = edgesUnion.union(edgesB) if updateName: newGraphNameList.append(graph.name) # add unified nodes and edges to new graph newGraph = self.__class__() newGraph.addNodes(nodesUnion) newGraph.addEdges(edgesUnion) # update name if updateName: newGraph.name = 'Union ( [' + '], ['.join(newGraphNameList) + '] )' # if requested, add node counts if addCount is True: newGraph.nodeCounts = nodesCount newGraph.edgeCounts = edgesCount newGraph.edgeElementCounts = edgeElementsCount return newGraph
[docs] def __eq__(self, other: object): """ Determine equality of two graphs. Parameters ---------- other : object The object to compare this graph with. Returns ------- bool Both graphs are considered equal, if they have identical memory addresses OR (the same class AND the same number of nodes and edges AND they are ismorphic). WARNING: isomorphism check is NP-hard! Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ if self is other: # identical object -> True return True if isinstance(self, other.__class__): # same class # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: if self.underlyingRawGraph.order() == other.underlyingRawGraph.order() and self.underlyingRawGraph.size() == other.underlyingRawGraph.size(): # same number of nodes and edges (weight 1) if len( set(self.underlyingRawGraph.nodes).symmetric_difference( other.underlyingRawGraph.nodes ) ) == 0: # same node set if len( set(self.underlyingRawGraph.edges).symmetric_difference( other.underlyingRawGraph.edges ) ) == 0: # same edge set return True # -> True # unknown implementation else: raise NotImplementedError return False # everything else -> False
[docs] def __ne__(self, other: object): """ Determine non-equality of two graphs. This simply negates :func:`__eq__`. Parameters ---------- other : object The object to compare this graph with. Returns ------- bool Both graphs are considered inequal, if they do not have identical memory addresses NOR (the same class AND the same number of nodes and edges AND they are ismorphic). WARNING: isomorphism check is NP-hard! Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ return not self == other
[docs] def replaceNode(self, oldNode: Elements.Element, newNode: Elements.Element): """ Replaces a certain node, if present, with another node. Silently ignores non-existing nodes. Parameters ---------- oldNode : Element Node to be replaced. newNode : Element Node used to replace the `oldNode`. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ if oldNode.__class__ is not newNode.__class__: raise TypeError('classes of nodes do not match') # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: networkx.relabel_nodes(self.underlyingRawGraph, {oldNode : newNode}, copy = False) # unknown implementation else: raise NotImplementedError
[docs] def replaceNodes(self, oldToNew: Dict[Elements.Element, Elements.Element]): """ Replaces certain nodes, if present, with another node each. Silently ignores non-existing nodes. Parameters ---------- oldToNew : Dict[Element, Element] Node to be replaced pointing to the node to replace it. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: networkx.relabel_nodes(self.underlyingRawGraph, oldToNew, copy = False) # unknown implementation else: raise NotImplementedError
[docs] def replaceEdgeElement(self, edge: Tuple[Elements.Element, Elements.Element, Elements.Element], newElement: Elements.Element, bothDirections: bool = False): """ Replaces a certain edge key element, if the edge is present, with another element. Silently ignores non-existing edge, especially never adds the new edge. Treats both directions independently. Parameters ---------- edge : Tuple[Element, Element, Element] Tuple representing the edge which key element is to be replaced. newElement : Element The element to replace the edge's former key element. bothDirections : bool, optional If *True*, automatically replace the edge key element of both directions. If the other direction does not exist, nothing happens. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. """ node1, node2, oldElement = edge if oldElement.__class__ is not newElement.__class__: raise TypeError('classes of edge elements do not match') # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: if self.underlyingRawGraph.has_edge(node1, node2, oldElement): self.removeEdge(node1, node2, oldElement, bothDirections = False) self.addEdge(node1, node2, newElement, isReversible = False) if bothDirections == True: if self.underlyingRawGraph.has_edge(node2, node1, oldElement): self.removeEdge(node2, node1, oldElement, bothDirections = False) self.addEdge(node2, node1, newElement, isReversible = False) # unknown implementation else: raise NotImplementedError
[docs] def getShortestPaths(self, fromNode: Elements.Element, toNode: Elements.Element) -> Set[Path]: """ Get all shortest paths between two nodes. Parameters ---------- fromNode : Element Where to start searching for shortest paths. If *None*, searches for **all** shortest paths ending in `toNode`, which are obviously all of length 1. toNode : Element Where to stop searching. If *None*, searches for **all** shortest paths starting in `fromNode`, which are obviously all of length 1. Returns ------- Set[Path] Set of all shortest paths between `fromNode` and `toNode`. If `fromNode` or `toNode` does not exist, or there is no path, returns an empty list. Note ---- If one of the two nodes is *None*, determine if there is any path starting/ending in the one node given. If this is the case, the shortest paths are obviously **always of length 1**. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. ValueError If both `fromNode` and `toNode` are given as *None*. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: try: if fromNode is None and toNode is None: raise ValueError("Only one node may be None.") elif toNode is None: # search from source to all nodeLists = [] # is there any successor? for toNode in self.underlyingRawGraph.successors(fromNode): nodeLists.append([fromNode, toNode]) elif fromNode is None: # search from all to target nodeLists = [] # is there any predecessor? for fromNode in self.underlyingRawGraph.predecessors(toNode): nodeLists.append([fromNode, toNode]) # elif toNode is None: # search from source to all # # # get lengths of shortest paths from fromNode -> any other node # shortestPathLengthsDict = networkx.algorithms.shortest_paths.shortest_path_length(self.underlyingRawGraph, source = fromNode, target = None) # shortestPathLengthsDict.pop(fromNode, None) # # # populate initial values # nodeLists = [] # if len(shortestPathLengthsDict.items()) > 0: # firstEntry = shortestPathLengthsDict.popitem() # firstTarget, firstLength = firstEntry # shortestPathLength = firstLength # shortestPathTargets = [firstTarget] # # # find target nodes with the shortest path length # for target, length in shortestPathLengthsDict.items(): # # if length > 0 and length < shortestPathLength: # not empty (0) AND new shortest path length # shortestPathLength = length # shortestPathTargets = [target] # # elif length == shortestPathLength: # additional target of the same path length # shortestPathTargets.append(target) # # else: # longer than shortest path length, skip # pass # # # for each target node with the shortest path length, find ALL paths from fromNode to these targets. Each target may have several paths of the same shortest length! # for target in shortestPathTargets: # nodeListPart = networkx.algorithms.shortest_paths.all_shortest_paths(self.underlyingRawGraph, source = fromNode, target = target) # nodeLists.extend(nodeListPart) # # # elif fromNode is None: # search from all to target # # # get lengths of shortest paths from any node -> toNode # shortestPathLengthsDict = networkx.algorithms.shortest_paths.shortest_path_length(self.underlyingRawGraph, source = None, target = toNode) # shortestPathLengthsDict.pop(toNode, None) # # # populate initial values # nodeLists = [] # if len(shortestPathLengthsDict.items()) > 0: # firstEntry = shortestPathLengthsDict.popitem() # firstSource, firstLength = firstEntry # shortestPathLength = firstLength # shortestPathSources = [firstSource] # # # find source nodes with the shortest path length # for source, length in shortestPathLengthsDict.items(): # # if length > 0 and length < shortestPathLength: # not empty (0) AND new shortest path length # shortestPathLength = length # shortestPathSources = [source] # # elif length == shortestPathLength: # additional source of the same path length # shortestPathSources.append(source) # # else: # longer than shortest path length, skip # pass # # # for each source node with the shortest path length, find ALL paths from these source to toNode. Each source may have several paths of the same shortest length! # for source in shortestPathSources: # nodeListPart = networkx.algorithms.shortest_paths.all_shortest_paths(self.underlyingRawGraph, source = source, target = toNode) # nodeLists.extend(nodeListPart) else: # search from source to target nodeLists = networkx.algorithms.shortest_paths.all_shortest_paths(self.underlyingRawGraph, source = fromNode, target = toNode) result = set() for nodeList in nodeLists: lastNode = nodeList.pop(0) # remove fromNode path = None for node in nodeList: # iterate over next nodes edges = self.getEdges(fromNode = lastNode, toNode = node) if len(edges) > 0: # there are edges, continue path keys = {edge[2] for edge in edges} if path is None: # first round path = MutablePath(lastNode, keys, node) else: # subsequent rounds path.elongate(keys, node) lastNode = node # continue with next node else: # no edges found, this must not happen raise Exception("No edges found, even though there are supposed to be some. Bewitched, bothered, and bewildered.") if path is not None: result.add(Path(path)) return result except (NetworkXNoPath, NodeNotFound): return set() # unknown implementation else: raise NotImplementedError
[docs] def getPaths(self, fromNode: Elements.Element, toNode: Elements.Element) -> Set[Path]: """ Get all simple paths between two nodes. Simple paths are loop-free. If one of the two nodes is *None*, searches for all paths starting/ending in the the one node given. Parameters ---------- fromNode : Element Where to start searching for paths. If *None*, searches for **all** paths ending in `toNode`. toNode : Element Where to stop searching. If *None*, searches for **all** paths starting in `fromNode`. Returns ------- Set[Path] Set of all paths between `fromNode` and `toNode`. If `fromNode` or `toNode` does not exist, or there is no path, returns an empty list. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationLib`. ValueError If both `fromNode` and `toNode` are given as *None*. """ # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: try: if fromNode is None and toNode is None: raise ValueError("Only one node may be None.") elif toNode is None: # search from source to all nodeLists = [] for target in self.underlyingRawGraph.nodes: nodeListPart = networkx.algorithms.simple_paths.all_simple_paths(self.underlyingRawGraph, source = fromNode, target = target) nodeLists.extend(nodeListPart) elif fromNode is None: # search from all to target nodeLists = [] for source in self.underlyingRawGraph.nodes: nodeListPart = networkx.algorithms.simple_paths.all_simple_paths(self.underlyingRawGraph, source = source, target = toNode) nodeLists.extend(nodeListPart) else: # search from source to target nodeLists = networkx.algorithms.simple_paths.all_simple_paths(self.underlyingRawGraph, source = fromNode, target = toNode) result = set() for nodeList in nodeLists: nodeList.pop(0) # remove fromNode lastNode = fromNode path = None for node in nodeList: # iterate over next nodes edges = self.getEdges(fromNode = lastNode, toNode = node) if len(edges) > 0: # there are edges, continue path keys = {edge[2] for edge in edges} if path is None: # first round path = MutablePath(lastNode, keys, node) else: # subsequent rounds path.elongate(keys, node) lastNode = node # continue with next node else: # no edges found, this must not happen raise Exception("No edges found, even though there are supposed to be some. Bewitched, bothered, and bewildered.") if path is not None: result.add(Path(path)) return result except (NetworkXNoPath, NodeNotFound): return set() # unknown implementation else: raise NotImplementedError
[docs]class DirectedMultiGraph(CommonGraphApi): implementationGraph = MultiDiGraph """ Implementation library's class for a directed graph. """ def __init__(self, underlyingRawGraph: 'implementationGraph' = None): """ Represents a directed multigraph. Parameters ---------- underlyingRawGraph : :attr:`implementationGraph`, optional If not *None*, copies `underlyingRawGraph` and stores it for this object. Attributes ---------- self.underlyingRawGraph : :mod:`FEV_KEGG.Graph.Implementations` The actual graph containing the data. This is dependant on the implementation chosen in :attr:`implementationLib`. self.name : str Custom name of the graph. This is often set, but not necessary in any calculations. self.nodeCounts : Dict[Element, int], optional Number of precursor graphs which contained certain :class:`Element` nodes still in this graph. *None* by default. self.edgeCounts : Dict[Tuple[Element, Element, Element], int], optional Number of precursor graphs which contained a certain edge (a Tuple of three :class:`Element`) still in this graph. *None* by default. self.edgeElementCounts : Dict[Element, int], optional Number of precursor graphs which contained certain :class:`Element` edge keys still in this graph. *None* by default. """ CommonGraphApi.__init__(self, underlyingRawGraph) if underlyingRawGraph == None: self.underlyingRawGraph = self.__class__.implementationGraph()
[docs] def getUnidirectionalEdges(self) -> Set[Tuple[Elements.Element, Elements.Element, Elements.Element]]: """ Get all edges which are unidirectional only. Returns ------- Set[Tuple[Element, Element, Element]] Set of all edge tuples (node1, node2, edge key) that have only one direction, i.e. there is no other edge tuple in reverse direction (node2, node1, edge key). Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationGraph`. """ undirectedGraphKeeping = self.toUndirectedGraph(True) undirectedGraph = self.toUndirectedGraph(False) # NetworkX was chosen as graph implementation if self.__class__.implementationLib == NetworkX: differenceGraph = networkx.algorithms.operators.difference(undirectedGraphKeeping.underlyingRawGraph, undirectedGraph.underlyingRawGraph) edgeList = differenceGraph.edges(keys=True) return edgeList # unknown implementation else: raise NotImplementedError
[docs] def getUnidirectionalEdgesElements(self) -> Set[Elements.Element]: """ Get the edge key elements of all edges which are unidirectional only. Returns ------- Set[Element] Set of all edge key elements of edges returned by :func:`getUnidirectionalEdges`. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationGraph`. """ unidirectionalEdges = self.getUnidirectionalEdges() elementSet = set() for edge in unidirectionalEdges: _, _, element = edge elementSet.add(element) return elementSet
[docs] def toUndirectedGraph(self, keepUnidirectionalEdges = False) -> 'UndirectedMultiGraph': """ Create undirected graph from this directed graph. Parameters ---------- keepUnidirectionalEdges : bool, optional If *True*, treat unidirectional edges in `directedMultiGraph` as undirected edges, thus keep them for this graph. If *False* (default), undirected edges are only created if there are edges for both directions in `directedMultiGraph`. Returns ------- UndirectedMultiGraph Undirected graph converted from this directed graph. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationGraph`. """ return UndirectedMultiGraph.fromDirectedMultiGraph(self, keepUnidirectionalEdges)
[docs]class UndirectedMultiGraph(CommonGraphApi): implementationGraph = MultiGraph """ Implementation library's class for an undirected graph. """ def __init__(self, underlyingRawGraph: 'implementationGraph' = None): """ Represents an undirected multi graph. Parameters ---------- underlyingRawGraph : :attr:`implementationGraph`, optional If not *None*, copies `underlyingRawGraph` and stores it for this object. Attributes ---------- self.underlyingRawGraph : :mod:`FEV_KEGG.Graph.Implementations` The actual graph containing the data. This is dependant on the implementation chosen in :attr:`implementationLib`. self.name : str Custom name of the graph. This is often set, but not necessary in any calculations. self.nodeCounts : Dict[Element, int], optional Number of precursor graphs which contained certain :class:`Element` nodes still in this graph. *None* by default. self.edgeCounts : Dict[Tuple[Element, Element, Element], int], optional Number of precursor graphs which contained a certain edge (a Tuple of three :class:`Element`) still in this graph. *None* by default. self.edgeElementCounts : Dict[Element, int], optional Number of precursor graphs which contained certain :class:`Element` edge keys still in this graph. *None* by default. """ CommonGraphApi.__init__(self, underlyingRawGraph) if underlyingRawGraph == None: self.underlyingRawGraph = self.__class__.implementationGraph()
[docs] @classmethod def fromDirectedMultiGraph(cls, directedMultiGraph: DirectedMultiGraph, keepUnidirectionalEdges = False): """ Create undirected graph from a directed graph. Parameters ---------- directedMultiGraph : DirectedMultiGraph Directed graph to use for conversion. keepUnidirectionalEdges : bool, optional If *True*, treat unidirectional edges in `directedMultiGraph` as undirected edges, thus keep them for this graph. If *False* (default), undirected edges are only created if there are edges for both directions in `directedMultiGraph`. Returns ------- UndirectedMultiGraph Undirected graph converted from `directedMultiGraph`. Raises ------ NotImplementedError If this function has not been adapted to the chosen graph implementation, yet. See :attr:`implementationGraph`. """ instance = cls() # if the graph has a pathway set it was derived from, copy it, too if hasattr(directedMultiGraph, 'pathwaySet'): instance.pathwaySet = directedMultiGraph.pathwaySet.copy() # NetworkX was chosen as graph implementation if directedMultiGraph.__class__.implementationGraph == MultiDiGraph: instance.underlyingRawGraph = directedMultiGraph.underlyingRawGraph.to_undirected(reciprocal = not keepUnidirectionalEdges) # unknown implementation else: raise NotImplementedError return instance