Source code for synkit.Graph.Matcher.multi_turbo_iso

import networkx as nx
from collections import defaultdict
from typing import Any, Dict, List, Set, Tuple, Union
from synkit.Graph.Matcher.turbo_iso import TurboISO

######################################################################
# MultiTurboISO – one global index for **N host graphs × M patterns**
######################################################################


[docs] class MultiTurboISO: """Accelerated sub-graph search across a batch of host graphs. Builds a single global signature bucket over all hosts and reuses a lightweight TurboISO matcher per host. For each query graph, hosts are first pruned by a signature + degree filter, and then TurboISO’s backtracking is run only on the surviving hosts. :param hosts: List of host graphs to index. :type hosts: List[nx.Graph] :param node_label: Node attribute(s) used for signature matching. :type node_label: str or list[str] :param edge_label: Edge attribute(s) to match; pass None to ignore edges. :type edge_label: str or list[str] or None :param distance_threshold: Skip distance filtering if candidate pool is smaller. :type distance_threshold: int :returns: An instance of MultiTurboISO with global index built. :rtype: MultiTurboISO """ # --------------------------------------------------------------- init def __init__( self, hosts: List[nx.Graph], node_label: Union[str, List[str]] = "label", edge_label: Union[str, List[str], None] = None, distance_threshold: int = 5000, ) -> None: # normalise selectors ------------------------------------------------ self._node_attr = ( [node_label] if isinstance(node_label, str) else list(node_label) ) self._edge_attr = ( [] if edge_label is None else [edge_label] if isinstance(edge_label, str) else list(edge_label) ) self._dthr = distance_threshold # store hosts & create one TurboISO per host ------------------------ self._hosts: List[nx.Graph] = hosts self._matchers: List[TurboISO] = [ TurboISO( h, node_label=self._node_attr, edge_label=self._edge_attr, distance_threshold=distance_threshold, ) for h in hosts ] # global bucket: signature → {(host_idx, node)} --------------------- self._bucket: Dict[str, Set[Tuple[int, Any]]] = defaultdict(set) for idx, m in enumerate(self._matchers): for node, sig in m._sig.items(): self._bucket[sig].add((idx, node)) # degree maps for quick filtering ----------------------------------- self._deg: List[Dict[Any, int]] = [dict(H.degree()) for H in hosts] # --------------------------------------------------------- repr / help def __repr__(self) -> str: return ( f"<MultiTurboISO hosts={len(self._hosts)} " f"node_label={self._node_attr} edge_label={self._edge_attr} " f"dthr={self._dthr}>" ) def __help__(self) -> str: # for interactive use return self.__doc__ # ----------------------------------------------------------- properties @property def hosts(self) -> List[nx.Graph]: """Return the list of host graphs.""" return self._hosts @property def num_hosts(self) -> int: """Number of host graphs indexed.""" return len(self._hosts) @property def node_label(self) -> List[str]: """Node‑attribute selector(s).""" return list(self._node_attr) @property def edge_label(self) -> List[str]: """Edge‑attribute selector(s). Empty list means ‘ignore’. """ return list(self._edge_attr) # -------------------------------------------------------------- helpers def _node_sig(self, v: Any, G: nx.Graph) -> str: return "|".join(str(G.nodes[v].get(a, "#")) for a in self._node_attr) def _init_candidates(self, Q: nx.Graph) -> Dict[int, Dict[Any, Set[Any]]]: """Return per‑host candidate sets after signature + degree filter.""" cand: Dict[int, Dict[Any, Set[Any]]] = defaultdict(dict) # (1) signature filter ------------------------------------------------ for q in Q.nodes: sig = self._node_sig(q, Q) for hidx, v in self._bucket.get(sig, ()): # O(1) bucket lookup cand[hidx].setdefault(q, set()).add(v) # (2) degree filter + host pruning ----------------------------------- qdeg = {q: Q.degree(q) for q in Q.nodes} for hidx in list(cand.keys()): cmap = cand[hidx] if len(cmap) < len(Q): # missing some query nodes entirely del cand[hidx] continue dmap = self._deg[hidx] for q in Q.nodes: vs = {v for v in cmap[q] if dmap[v] >= qdeg[q]} if not vs: # prune host if any q has no candidates del cand[hidx] break cmap[q] = vs return cand # -------------------------------------------------------------- search
[docs] def search_one( self, Q: nx.Graph, *, prune: bool = False, ) -> Dict[int, Union[bool, List[Dict[Any, Any]]]]: """Match a single pattern graph *Q* against every host. Parameters ---------- Q : nx.Graph Query / pattern graph. prune : bool, default False Forwarded to TurboISO. If *True*, return just a boolean per host (‘found?’), otherwise return the full list of mappings. Returns ------- dict ``{host_idx: result}`` where *result* is *bool* if *prune* is *True* else a list of node‑mapping dicts. """ host_cands = self._init_candidates(Q) out: Dict[int, Union[bool, List[Dict[Any, Any]]]] = {} for hidx, C in host_cands.items(): m: TurboISO = self._matchers[hidx] original = m._init_candidates m._init_candidates = lambda _Q: C # type: ignore out[hidx] = m.search(Q, prune=prune) m._init_candidates = original return out
[docs] def search_many( self, patterns: List[nx.Graph], *, prune: bool = False, ) -> List[Dict[int, Union[bool, List[Dict[Any, Any]]]]]: """Match a list of pattern graphs. Returns a list of per‑pattern dictionaries in the same order as the input list. """ return [self.search_one(p, prune=prune) for p in patterns]