Source code for synkit.Graph.Wildcard.its_merge

from __future__ import annotations

from typing import Any, Dict, Optional, Tuple, Union, Set
import logging

import networkx as nx

GraphType = Union[nx.Graph, nx.DiGraph, nx.MultiGraph, nx.MultiDiGraph]

__all__ = ["ITSMerge", "fuse_its_graphs"]


[docs] class ITSMerge: """ Merge two ITS graphs given a node mapping between them. This class encapsulates the logic of fusing two ITS graphs (e.g. from wildcard pattern matching) in an object-oriented way. The result is a **fused** graph that: * starts as a copy of the **host** graph, * merges ITS typing (``typesGH``) on mapped node pairs, * adds leftover (non-mapped, non-wildcard) pattern nodes and edges, and * optionally removes all wildcard nodes in the final fused graph. Orientation ----------- The graph whose nodes appear as **values** of the mapping is treated as the **host**; the other graph is the **pattern**. If the mapping is given in the opposite direction (host → pattern), the class detects this and automatically inverts the mapping. ITS merging semantics --------------------- * For mapped node pairs (p → h), the ``typesGH`` attribute is merged: the hydrogen count entries (index 2 in each inner tuple) are set to the **maximum** of host vs pattern. * Leftover pattern nodes: - If ``element == wildcard_element``, they are **ignored**. - Otherwise they are added as new nodes with new IDs and edges are created according to the pattern topology. * If :paramref:`remove_wildcards` is ``True`` (default), **all wildcard nodes** (``element == wildcard_element``) are removed from the fused graph; their incident edges disappear. If ``False``, wildcard nodes are kept. Examples -------- Simple usage with integer-labeled graphs: .. code-block:: python import networkx as nx from synkit.Graph.ITS.its_merge import ITSMerge G1 = nx.Graph() G2 = nx.Graph() # Example: two ITS graphs with 'typesGH' and 'element' attributes G1.add_node(1, element="C", typesGH=(("C", False, 2, 0, ["O"]), ("C", False, 2, 0, ["O"]))) G2.add_node(10, element="C", typesGH=(("C", False, 1, 0, ["O"]), ("C", False, 1, 0, ["O"]))) mapping = {1: 10} # pattern node → host node merger = ITSMerge(G1, G2, mapping, remove_wildcards=True).merge() F = merger.fused_graph print("Fused nodes:", F.nodes(data=True)) :param G1: First input ITS graph. :type G1: GraphType :param G2: Second input ITS graph. :type G2: GraphType :param mapping: Node mapping between the graphs. Must be a bijection either from pattern → host or host → pattern; the class detects orientation automatically. :type mapping: dict[Any, Any] :param types_key: Node attribute key holding the ITS typing tuple, e.g. ``(('C', False, 3, 0, ['O']), ('C', False, 3, 0, ['O']))``. :type types_key: str :param element_key: Node attribute key for element / atom type. :type element_key: str :param wildcard_element: Value of :paramref:`element_key` that denotes wildcard nodes. :type wildcard_element: str :param remove_wildcards: If ``True``, remove wildcard nodes in the final fused graph. If ``False``, wildcard nodes are kept. :type remove_wildcards: bool :param logger: Optional logger for debug output. :type logger: logging.Logger | None """ def __init__( self, G1: GraphType, G2: GraphType, mapping: Dict[Any, Any], *, types_key: str = "typesGH", element_key: str = "element", wildcard_element: str = "*", remove_wildcards: bool = True, logger: Optional[logging.Logger] = None, ) -> None: self._G1 = G1 self._G2 = G2 self._mapping_input: Dict[Any, Any] = dict(mapping) self._types_key = types_key self._element_key = element_key self._wildcard_element = wildcard_element self._remove_wildcards_flag = remove_wildcards self._logger = logger or logging.getLogger(__name__) self._host: GraphType self._pattern: GraphType self._pat_is_G1: bool self._pat_to_host: Dict[Any, Any] self._fused: Optional[GraphType] = None self._orient_graphs_and_mapping() # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def merge(self) -> "ITSMerge": """ Execute the ITS fusion process. The method: 1. Starts from a copy of the host graph. 2. Merges ``typesGH`` attributes on mapped node pairs. 3. Adds leftover non-wildcard pattern nodes. 4. Adds pattern edges between mapped/added nodes. 5. Optionally removes wildcard nodes from the fused graph. :returns: Self, with :pyattr:`fused_graph` updated. :rtype: ITSMerge """ fused = self._host.copy() self._merge_anchor_nodes(fused) leftover_map = self._add_leftover_pattern_nodes(fused) self._add_pattern_edges(fused, leftover_map) if self._remove_wildcards_flag: self._remove_wildcards(fused) self._fused = fused return self
# ------------------------------------------------------------------ # Properties # ------------------------------------------------------------------ @property def fused_graph(self) -> GraphType: """ Fused ITS graph. The graph is in the **host's node ID space**, plus any extra IDs for leftover pattern nodes. Wildcard nodes may have been removed, depending on :paramref:`remove_wildcards`. :returns: Fused ITS graph. :rtype: GraphType :raises RuntimeError: If :meth:`merge` has not been called yet. """ if self._fused is None: raise RuntimeError( "ITSMerge.merge() must be called before accessing fused_graph." ) return self._fused @property def host_graph(self) -> GraphType: """ Graph that was treated as the **host** for merging. :returns: Host graph. :rtype: GraphType """ return self._host @property def pattern_graph(self) -> GraphType: """ Graph that was treated as the **pattern** for merging. :returns: Pattern graph. :rtype: GraphType """ return self._pattern @property def pattern_to_host(self) -> Dict[Any, Any]: """ Mapping from pattern node IDs to host node IDs. Orientation is resolved automatically during construction. :returns: Pattern → host node mapping. :rtype: dict[Any, Any] """ return dict(self._pat_to_host) def __repr__(self) -> str: """ Short textual representation for debugging. :returns: Summary string with node/edge counts when available. :rtype: str """ fused_info = "unmerged" if self._fused is not None: fused_info = ( f"fused_nodes={self._fused.number_of_nodes()}, " f"fused_edges={self._fused.number_of_edges()}" ) return ( f"<ITSMerge host_nodes={self._host.number_of_nodes()} " f"pattern_nodes={self._pattern.number_of_nodes()} " f"remove_wildcards={self._remove_wildcards_flag} " f"{fused_info}>" ) # ------------------------------------------------------------------ # Internal: orientation & mapping # ------------------------------------------------------------------ def _orient_graphs_and_mapping(self) -> None: """ Decide which graph is host and which is pattern, and orient mapping. Orientation rules ----------------- 1. If mapping keys belong to G1 and values to G2, then pattern = G1, host = G2 and pat_to_host = mapping. 2. If mapping keys belong to G2 and values to G1, then the provided mapping appears to be host→pattern. Invert it so that pattern = G1, host = G2 and pat_to_host = inverted mapping. 3. Otherwise, try the inverse mapping similarly. 4. If none of these combinations works, raise :class:`ValueError`. """ mapping = self._mapping_input def all_in(G: GraphType, nodes: Set[Any]) -> bool: return all(n in G for n in nodes) keys = set(mapping.keys()) vals = set(mapping.values()) # Case A: mapping is already pattern(G1) -> host(G2) if all_in(self._G1, keys) and all_in(self._G2, vals): self._pattern, self._host = self._G1, self._G2 self._pat_is_G1 = True self._pat_to_host = dict(mapping) return # Case B: mapping keys in G2 and values in G1 -> mapping likely host->pattern # invert it to get pattern(G1)->host(G2) if all_in(self._G2, keys) and all_in(self._G1, vals): inv = {v: k for k, v in mapping.items()} if all_in(self._G1, set(inv.keys())) and all_in( self._G2, set(inv.values()) ): self._pattern, self._host = self._G1, self._G2 self._pat_is_G1 = True self._pat_to_host = inv return # Case C: try inverse mapping explicitly (in case mapping was provided in # unexpected orientation) inv = {v: k for k, v in mapping.items()} inv_keys = set(inv.keys()) inv_vals = set(inv.values()) if all_in(self._G1, inv_keys) and all_in(self._G2, inv_vals): # inv is pattern(G1) -> host(G2) self._pattern, self._host = self._G1, self._G2 self._pat_is_G1 = True self._pat_to_host = dict(inv) return if all_in(self._G2, inv_keys) and all_in(self._G1, inv_vals): # inv is pattern(G2) -> host(G1) -> invert it to pattern(G1)->host(G2) inv2 = {v: k for k, v in inv.items()} if all_in(self._G1, set(inv2.keys())) and all_in( self._G2, set(inv2.values()) ): self._pattern, self._host = self._G1, self._G2 self._pat_is_G1 = True self._pat_to_host = dict(inv2) return # Fallback: if we reach here, we cannot reliably orient the mapping if all_in(self._G2, keys) and all_in(self._G1, vals): # keep original mapping as pattern=G2, host=G1 (fallback) self._pattern, self._host = self._G2, self._G1 self._pat_is_G1 = False self._pat_to_host = dict(mapping) return raise ValueError( "ITSMerge: cannot orient mapping; keys/values do not consistently " "belong to G1/G2." ) # ------------------------------------------------------------------ # Internal: ITS types merging # ------------------------------------------------------------------ def _merge_types_gh( self, host_data: Dict[str, Any], pat_data: Dict[str, Any], ) -> Optional[Tuple[Tuple[Any, ...], Tuple[Any, ...]]]: """ Merge host vs pattern ``typesGH``, keeping max hydrogen counts. Only index 2 (H count) of each side is altered; remaining entries are taken from the host when present. :param host_data: Host node attribute dictionary. :type host_data: dict[str, Any] :param pat_data: Pattern node attribute dictionary. :type pat_data: dict[str, Any] :returns: Merged ``typesGH`` tuple (left, right) or ``None`` if both are missing. :rtype: tuple[tuple[Any, ...], tuple[Any, ...]] | None """ t_host = host_data.get(self._types_key) t_pat = pat_data.get(self._types_key) if t_host is None and t_pat is None: return None if t_host is None: t_host = t_pat if t_host is None: return None try: left_h = list(t_host[0]) right_h = list(t_host[1]) except Exception: # pragma: no cover - defensive self._logger.debug( "ITSMerge: host typesGH not a 2-tuple, leaving as-is: %r", t_host ) return t_host if t_pat is not None: try: left_p = t_pat[0] right_p = t_pat[1] if len(left_h) > 2 and len(left_p) > 2: left_h[2] = max(left_h[2], left_p[2]) if len(right_h) > 2 and len(right_p) > 2: right_h[2] = max(right_h[2], right_p[2]) except Exception: # pragma: no cover - defensive self._logger.debug( "ITSMerge: pattern typesGH not mergeable, keeping host: %r", t_pat, ) return (tuple(left_h), tuple(right_h)) def _merge_anchor_nodes(self, fused: GraphType) -> None: """ Merge ITS typing for mapped pattern→host node pairs. :param fused: Graph being constructed (copy of host). :type fused: GraphType """ for p_node, h_node in self._pat_to_host.items(): if h_node not in fused or p_node not in self._pattern: continue host_attrs = fused.nodes[h_node] pat_attrs = self._pattern.nodes[p_node] merged_t = self._merge_types_gh(host_attrs, pat_attrs) if merged_t is not None: host_attrs[self._types_key] = merged_t # ------------------------------------------------------------------ # Internal: leftover nodes & edges # ------------------------------------------------------------------ def _next_int_id(self, fused: GraphType) -> Optional[int]: """ Determine starting integer ID for new nodes, if applicable. :param fused: Graph being constructed. :type fused: GraphType :returns: Next integer ID or ``None`` if node IDs are not all ints. :rtype: int | None """ if not fused.nodes: return 0 if all(isinstance(n, int) for n in fused.nodes): return max(fused.nodes) + 1 return None def _add_leftover_pattern_nodes(self, fused: GraphType) -> Dict[Any, Any]: """ Add leftover non-wildcard pattern nodes to the fused graph. :param fused: Graph being constructed. :type fused: GraphType :returns: Mapping from pattern node IDs to new fused node IDs. :rtype: dict[Any, Any] """ mapped_p_nodes = set(self._pat_to_host.keys()) leftover_map: Dict[Any, Any] = {} next_int_id = self._next_int_id(fused) tag = "p1" if self._pat_is_G1 else "p2" for p_node, p_data in self._pattern.nodes(data=True): if p_node in mapped_p_nodes: continue if p_data.get(self._element_key) == self._wildcard_element: continue if next_int_id is not None: f_node = next_int_id next_int_id += 1 else: f_node = (tag, p_node) leftover_map[p_node] = f_node fused.add_node(f_node, **p_data) return leftover_map def _add_pattern_edges( self, fused: GraphType, leftover_map: Dict[Any, Any], ) -> None: """ Add edges from the pattern graph into the fused graph. Edges are added between: * mapped pattern nodes, via their host IDs, and * leftover pattern nodes that were newly added. :param fused: Graph being constructed. :type fused: GraphType :param leftover_map: Mapping from pattern node IDs to new fused node IDs. :type leftover_map: dict[Any, Any] """ def map_p_to_f(n: Any) -> Optional[Any]: if n in self._pat_to_host: return self._pat_to_host[n] return leftover_map.get(n) is_multi = isinstance(fused, (nx.MultiGraph, nx.MultiDiGraph)) for u, v, data in self._pattern.edges(data=True): fu = map_p_to_f(u) fv = map_p_to_f(v) if fu is None or fv is None: continue if is_multi: fused.add_edge(fu, fv, **data) else: if fused.has_edge(fu, fv): continue fused.add_edge(fu, fv, **data) # ------------------------------------------------------------------ # Internal: wildcard removal # ------------------------------------------------------------------ def _remove_wildcards(self, fused: GraphType) -> None: """ Remove wildcard nodes (and incident edges) from the fused graph. :param fused: Graph being constructed. :type fused: GraphType """ wc_nodes = [ n for n, d in fused.nodes(data=True) if d.get(self._element_key) == self._wildcard_element ] if wc_nodes: self._logger.debug( "ITSMerge: removing %d wildcard nodes from fused graph", len(wc_nodes), ) fused.remove_nodes_from(wc_nodes)
# ---------------------------------------------------------------------- # Functional wrapper for backwards compatibility # ----------------------------------------------------------------------
[docs] def fuse_its_graphs( G1: GraphType, G2: GraphType, mapping: Dict[Any, Any], *, types_key: str = "typesGH", element_key: str = "element", wildcard_element: str = "*", remove_wildcards: bool = True, logger: Optional[logging.Logger] = None, ) -> GraphType: """ Functional wrapper around :class:`ITSMerge`. :param G1: First input ITS graph. :type G1: GraphType :param G2: Second input ITS graph. :type G2: GraphType :param mapping: Node mapping between the graphs. :type mapping: dict[Any, Any] :param types_key: Node attribute key holding ITS typing information. :type types_key: str :param element_key: Node attribute key for element / atom type. :type element_key: str :param wildcard_element: Value of :paramref:`element_key` that denotes wildcard nodes. :type wildcard_element: str :param remove_wildcards: If ``True``, remove wildcard nodes from the fused graph. If ``False``, keep them. :type remove_wildcards: bool :param logger: Optional logger for debug output. :type logger: logging.Logger | None :returns: Fused ITS graph. :rtype: GraphType """ merger = ITSMerge( G1, G2, mapping, types_key=types_key, element_key=element_key, wildcard_element=wildcard_element, remove_wildcards=remove_wildcards, logger=logger, ).merge() return merger.fused_graph