Source code for synkit.Graph.Matcher.sing

from __future__ import annotations

from typing import Any, Dict, List, Optional, Set, Union

import networkx as nx
from networkx.algorithms.isomorphism import GraphMatcher


[docs] class SING: """Subgraph search In Non-homogeneous Graphs (SING). A lightweight Python implementation of the path-based *filter-and-refine* strategy introduced by Di Natale et al. (SING: Subgraph search In Non-homogeneous Graphs, BMC Bioinformatics, 2010) for subgraph search in large, possibly heterogeneous graphs. The index is built once over a single *data graph* and can then be queried with multiple *pattern graphs* via :meth:`search`. Notes ----- - This implementation focuses on the **path-feature** variant of SING, where features are simple paths (with optional node/edge labels) up to a maximum length. - Multi-graphs are not supported. - If the underlying data graph is modified after construction, call :meth:`reindex` to rebuild the feature index. Example ------- A minimal example on an undirected, unlabeled graph: .. code-block:: python import networkx as nx from sing import SING # Data graph: 0-1-2-3 G = nx.path_graph(4) # Query: 0-1-2 Q = nx.path_graph(3) index = SING(G, max_path_length=2, node_att=[], edge_att=None) matches = index.search(Q) # matches contains four embeddings: # {0: 0, 1: 1, 2: 2} # {0: 1, 1: 2, 2: 3} # {0: 2, 1: 1, 2: 0} # {0: 3, 1: 2, 2: 1} """ # ------------------------------------------------------------------ # Construction & Indexing # ------------------------------------------------------------------ def __init__( self, graph: nx.Graph, max_path_length: int = 3, node_att: Union[str, List[str], None] = None, edge_att: Union[str, List[str], None] = "order", ) -> None: """ Initialize a SING index over a data graph. :param graph: The data graph (directed or undirected; multi-graphs are not supported). :type graph: nx.Graph :param max_path_length: Maximum number of edges considered when enumerating path features (``>= 0``). A value of ``0`` keeps only single-node features. :type max_path_length: int, optional :param node_att: Node attribute name(s) whose values are concatenated to form the node label used in path features. If ``None``, defaults to ``["element", "charge"]``, which is convenient for chemical graphs. :type node_att: str | list[str] | None, optional :param edge_att: Edge attribute name(s) to include in path features. If ``None``, edge attributes are ignored. Defaults to ``"order"``, matching common chemical-graph conventions. :type edge_att: str | list[str] | None, optional """ self.graph: nx.Graph = graph self.max_path_length: int = int(max_path_length) # Normalise attribute selections -------------------------------- if node_att is None: node_att = ["element", "charge"] self.node_att: List[str] = ( [node_att] if isinstance(node_att, str) else list(node_att) ) if edge_att is None: self.edge_att: List[str] = [] else: self.edge_att = [edge_att] if isinstance(edge_att, str) else list(edge_att) # Inverted index: feature signature -> set[data-node] self.feature_index: Dict[str, Set[Any]] = {} # Per-vertex feature sets self.vertex_features: Dict[Any, Set[str]] = {} # Cached signatures for the data graph (for efficiency) self._node_sig_data: Dict[Any, str] = {} self._edge_sig_data: Dict[tuple[Any, Any], str] = {} # Build caches + index once up-front self._init_data_signatures() self._build_index() # ------------------------------------------------------------------ # Internal helpers: signatures & indexing # ------------------------------------------------------------------ def _node_signature(self, v: Any, G: nx.Graph) -> str: """ Return a string signature for node ``v`` in graph ``G`` based on :attr:`node_att`. :param v: Node identifier. :type v: Any :param G: Graph containing the node. :type G: nx.Graph :returns: Concatenated attribute values (``"|"``-separated) or an empty string if :attr:`node_att` is empty. :rtype: str """ if not self.node_att: return "" vals = [str(G.nodes[v].get(a, "#")) for a in self.node_att] return "|".join(vals) def _edge_signature(self, u: Any, v: Any, G: nx.Graph) -> str: """ Return a string signature for edge ``(u, v)`` in graph ``G`` based on :attr:`edge_att`. If no edge attributes were requested, returns an empty string. :param u: Source node identifier. :type u: Any :param v: Target node identifier. :type v: Any :param G: Graph containing the edge. :type G: nx.Graph :returns: Concatenated attribute values (``"|"``-separated), or an empty string when :attr:`edge_att` is empty. :rtype: str """ if not self.edge_att: return "" vals = [str(G[u][v].get(a, "#")) for a in self.edge_att] return "|".join(vals) def _init_data_signatures(self) -> None: """ Precompute node and edge signatures for the data graph. This avoids repeatedly looking up attributes and constructing strings during index building and search. """ G = self.graph self._node_sig_data = {v: self._node_signature(v, G) for v in G.nodes} self._edge_sig_data = {} if self.edge_att: # For undirected graphs, cache both (u, v) and (v, u) if G.is_directed(): for u, v in G.edges: self._edge_sig_data[(u, v)] = self._edge_signature(u, v, G) else: for u, v in G.edges: sig = self._edge_signature(u, v, G) self._edge_sig_data[(u, v)] = sig self._edge_sig_data[(v, u)] = sig # ------------------------------------------------------------------ # Feature extraction (paths) # ------------------------------------------------------------------ def _extract_path_features( self, node: Any, G: nx.Graph, is_query: bool = False ) -> Set[str]: """ Enumerate all simple paths starting at ``node`` up to :attr:`max_path_length` edges (inclusive), represented as label sequences. Works for both data and query graphs. :param node: Starting node in ``G``. :type node: Any :param G: Graph in which to enumerate paths (data or query graph). :type G: nx.Graph :param is_query: Flag indicating whether ``G`` is the query graph. Currently unused but kept for future extensions (e.g., query-specific feature tweaks). :type is_query: bool, optional :returns: Set of string-encoded path features. :rtype: set[str] """ features: Set[str] = set() max_len = self.max_path_length # Use cached signatures when possible (data graph) if G is self.graph: node_sig_cache = self._node_sig_data edge_sig_cache = self._edge_sig_data def get_node_sig(x: Any) -> str: return node_sig_cache[x] def get_edge_sig(a: Any, b: Any) -> str: return edge_sig_cache.get((a, b), "") else: def get_node_sig(x: Any) -> str: return self._node_signature(x, G) def get_edge_sig(a: Any, b: Any) -> str: return self._edge_signature(a, b, G) def dfs(current: Any, depth: int, visited: Set[Any], path_parts: List[str]): # Record current path (including the starting node at depth 0) features.add("-".join(path_parts)) if depth == max_len: return for nbr in G.neighbors(current): if nbr in visited: continue edge_sig = get_edge_sig(current, nbr) # Extend path in-place (append & pop for efficiency) if edge_sig: path_parts.append(edge_sig) path_parts.append(get_node_sig(nbr)) visited.add(nbr) dfs(nbr, depth + 1, visited, path_parts) visited.remove(nbr) # Backtrack path_parts.pop() if edge_sig: path_parts.pop() start_sig = get_node_sig(node) dfs(node, 0, {node}, [start_sig]) return features # ------------------------------------------------------------------ # Index construction # ------------------------------------------------------------------ def _build_index(self) -> None: """ Build the feature index over the current data graph. Called automatically during initialization and by :meth:`reindex`. """ self.feature_index.clear() self.vertex_features.clear() for v in self.graph.nodes: feats = self._extract_path_features(v, self.graph) self.vertex_features[v] = feats for f in feats: self.feature_index.setdefault(f, set()).add(v) # ------------------------------------------------------------------ # Public API: reindexing & candidate generation # ------------------------------------------------------------------
[docs] def reindex(self, graph: Optional[nx.Graph] = None) -> None: """ Rebuild the index, optionally replacing the underlying data graph. :param graph: New data graph. If ``None``, the existing :attr:`graph` is re-indexed. :type graph: nx.Graph | None, optional """ if graph is not None: self.graph = graph self._init_data_signatures() self._build_index()
def _candidate_vertices(self, query_graph: nx.Graph) -> Dict[Any, Set[Any]]: """ Return per-query-vertex candidate sets using posting-list intersections. :param query_graph: Query (pattern) graph. :type query_graph: nx.Graph :returns: Mapping from query-node -> set of candidate data-nodes. :rtype: dict[Any, set[Any]] """ cand: Dict[Any, Set[Any]] = {} for qv in query_graph.nodes: q_feats = self._extract_path_features(qv, query_graph, is_query=True) if not q_feats: # Fallback: no features → all data vertices are candidates cand[qv] = set(self.graph.nodes) continue # Initialise with posting list of *one* feature, then intersect. iterator = iter(q_feats) first_f = next(iterator) cset = set(self.feature_index.get(first_f, [])) for f in iterator: cset &= self.feature_index.get(f, set()) if not cset: break # early quit cand[qv] = cset return cand # ------------------------------------------------------------------ # Deduplication by query automorphisms (optional) # ------------------------------------------------------------------ def _deduplicate_by_query_automorphisms( self, mappings: List[Dict[Any, Any]], query_graph: nx.Graph ) -> List[Dict[Any, Any]]: """ Deduplicate embeddings up to automorphisms of the query graph. Two mappings ``M`` and ``M'`` are considered equivalent if there exists an automorphism :math:`\\sigma` of the query such that: .. math:: M' = M \\circ \\sigma That is, they differ only by a symmetry of the *pattern*. :param mappings: List of node mappings (query-node -> data-node). :type mappings: list[dict[Any, Any]] :param query_graph: The query graph whose automorphisms are used for deduplication. :type query_graph: nx.Graph :returns: Reduced list with one representative per equivalence class. :rtype: list[dict[Any, Any]] """ if not mappings: return [] gm = GraphMatcher(query_graph, query_graph) autos = list(gm.isomorphisms_iter()) # If only identity, nothing to dedup if len(autos) <= 1: return mappings # Fix a deterministic ordering of query nodes q_nodes = list(query_graph.nodes()) seen_signatures: Set[tuple] = set() unique: List[Dict[Any, Any]] = [] for m in mappings: variants: List[tuple] = [] for sigma in autos: # M' = M ∘ sigma, so M'(p) = M(sigma[p]) try: variant = tuple(m[sigma[p]] for p in q_nodes) except KeyError: # Should not happen for full mappings, but be defensive continue variants.append(variant) if not variants: # Fallback: direct tuple if something went wrong above signature = tuple(m[p] for p in q_nodes) else: signature = min(variants) if signature not in seen_signatures: seen_signatures.add(signature) unique.append(m) return unique # ------------------------------------------------------------------ # Refinement: backtracking with candidate sets # ------------------------------------------------------------------
[docs] def search( self, query_graph: nx.Graph, prune: bool = False, dedup_autos: bool = False, ) -> Union[List[Dict[Any, Any]], bool]: """ Find subgraph isomorphisms from ``query_graph`` into the data graph. This method performs a path-feature-based **filter** to obtain candidate vertices, followed by a VF2-style **refinement** via backtracking with neighbourhood and label consistency checks. :param query_graph: Pattern graph to match against :attr:`graph`. :type query_graph: nx.Graph :param prune: If ``True``, stop after finding the first mapping and return a boolean indicating existence of at least one embedding. If ``False`` (default), return the full list of mappings. :type prune: bool, optional :param dedup_autos: If ``True``, collapse symmetric embeddings that differ only by automorphisms of the query graph, returning one representative per equivalence class. Has no effect when ``prune=True``. :type dedup_autos: bool, optional :returns: Either ``True``/``False`` (when ``prune=True``) or a list of injective node mappings ``[{q_node: data_node, ...}, ...]``. :rtype: list[dict[Any, Any]] | bool Example ------- .. code-block:: python import networkx as nx from sing import SING G = nx.cycle_graph(4) Q = nx.path_graph(3) index = SING(G, max_path_length=2, node_att=[], edge_att=None) all_mappings = index.search(Q) # all embeddings unique_mappings = index.search(Q, dedup_autos=True) # collapse symmetries """ cand = self._candidate_vertices(query_graph) mapping: Dict[Any, Any] = {} used: Set[Any] = set() results: List[Dict[Any, Any]] = [] # Order query vertices by fewest candidates (fail-fast heuristic) order = sorted(query_graph.nodes, key=lambda n: len(cand[n])) def backtrack(i: int) -> bool: if i == len(order): results.append(mapping.copy()) return prune # signal to stop if pruning qv = order[i] for dv in cand[qv]: if dv in used: continue # Neighbourhood + edge-label consistency valid = True for nbr in query_graph.neighbors(qv): if nbr in mapping: dn = mapping[nbr] if not self.graph.has_edge(dv, dn): valid = False break if self.edge_att: if self._edge_signature( qv, nbr, query_graph ) != self._edge_signature(dv, dn, self.graph): valid = False break if not valid: continue # Node-label consistency if self.node_att and self._node_signature( qv, query_graph ) != self._node_signature(dv, self.graph): continue mapping[qv] = dv used.add(dv) if backtrack(i + 1): return True used.remove(dv) del mapping[qv] return False backtrack(0) if prune: return len(results) > 0 if dedup_autos: results = self._deduplicate_by_query_automorphisms(results, query_graph) return results
# ------------------------------------------------------------------ # Dunder methods # ------------------------------------------------------------------ def __len__(self) -> int: """ Return the number of vertices in the data graph. :returns: Number of data vertices. :rtype: int """ return self.graph.number_of_nodes() def __repr__(self) -> str: """ Return a concise string representation of the index. :returns: Summary string including graph size and configuration. :rtype: str """ return ( f"<SING | |V|={self.graph.number_of_nodes()} " f"|E|={self.graph.number_of_edges()} " f"max_path_length={self.max_path_length} " f"node_att={self.node_att} edge_att={self.edge_att}>" )