from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Mapping, Optional, Set, Tuple
import itertools
import json
import networkx as nx
from ..Petrinet import Multiset, PetriNet, Place, TransitionId
from ._adapter import tokenize_syncrn_incidence
[docs]
@dataclass
class RealizabilityConfig:
"""
Configuration for bounded realizability search.
:param max_states:
Maximum number of BFS states explored during exact realizability
search.
:type max_states: int
:param max_depth:
Maximum firing-sequence depth explored during exact realizability
search.
:type max_depth: int
Example
-------
.. code-block:: python
cfg = RealizabilityConfig(
max_states=200_000,
max_depth=5_000,
)
"""
max_states: int = 100_000
max_depth: int = 10_000
[docs]
@dataclass
class RealizabilitySummary:
"""
Small serializable summary of the active realizability instance.
:param n_species:
Number of species currently loaded in the instance.
:type n_species: int
:param n_reactions:
Number of reactions currently loaded in the instance.
:type n_reactions: int
:param active_flow:
Active reaction flow restricted to reactions with positive requested
firing count.
:type active_flow: Dict[str, int]
:param initial_marking:
Initial marking supplied by the user before Petri-net augmentation.
:type initial_marking: Dict[str, int]
:param goal_exact:
Exact target marking constraints currently imposed on the auxiliary
Petri-net places.
:type goal_exact: Dict[str, int]
:param goal_atleast:
Lower-bound target marking constraints currently imposed on the
auxiliary Petri-net places.
:type goal_atleast: Dict[str, int]
Example
-------
.. code-block:: python
summary = pr.summary()
print(summary.n_species)
print(summary.active_flow)
"""
n_species: int
n_reactions: int
active_flow: Dict[str, int] = field(default_factory=dict)
initial_marking: Dict[str, int] = field(default_factory=dict)
goal_exact: Dict[str, int] = field(default_factory=dict)
goal_atleast: Dict[str, int] = field(default_factory=dict)
[docs]
class PathwayRealizability:
"""
Exact flow-realizability utilities for SynCRN-like inputs.
A pathway flow is realizable if there exists an ordering of reaction
firings such that:
- each selected reaction is fired exactly the requested number of times,
- the execution starts from the supplied initial marking,
- every firing respects stoichiometric token consumption and production.
Unspecified reactions default to flow 0.
The class supports:
- loading directly from tokenized hypergraph-style inputs,
- loading from a SynCRN-like object,
- exact bounded BFS realizability testing,
- a sufficient acyclicity test via a König-style construction,
- scaled realizability,
- borrow realizability.
:param config:
Optional configuration for bounded realizability search. If ``None``,
a default :class:`RealizabilityConfig` is used.
:type config: Optional[RealizabilityConfig]
Example
-------
.. code-block:: python
pr = PathwayRealizability().load_syncrn_and_flow(
syn,
flow={"12": 1, "13": 1},
species="label",
reaction="id",
)
pr.build_petri_net_from_flow()
ok, cert = pr.is_realizable()
print(ok, cert)
"""
def __init__(self, config: Optional[RealizabilityConfig] = None) -> None:
"""
Initialize an empty pathway realizability instance.
:param config:
Optional configuration for bounded realizability search.
:type config: Optional[RealizabilityConfig]
:returns:
None.
:rtype: None
"""
self.vertices: Set[str] = set()
self.edges: Dict[str, Tuple[Multiset, Multiset]] = {}
self.flow: Dict[str, int] = {}
self.species_token_to_id: Dict[str, str] = {}
self.reaction_token_to_id: Dict[str, str] = {}
self.species_id_to_token: Dict[str, str] = {}
self.reaction_id_to_token: Dict[str, str] = {}
self._provided_initial_marking: Dict[str, int] = {}
self._petri: Optional[PetriNet] = None
self._initial_marking: Optional[Dict[Place, int]] = None
self._goal_exact: Dict[Place, int] = {}
self._goal_atleast: Dict[Place, int] = {}
self._certificate: Optional[List[TransitionId]] = None
self._config = config or RealizabilityConfig()
# ------------------------------------------------------------------
# loading
# ------------------------------------------------------------------
@staticmethod
def _clean_multiset(data: Optional[Mapping[str, int]]) -> Dict[str, int]:
"""
Normalize a multiset mapping by dropping zero entries.
Negative multiplicities are rejected.
:param data:
Input multiset or ``None``.
:type data: Optional[Mapping[str, int]]
:returns:
Cleaned multiset containing only strictly positive entries.
:rtype: Dict[str, int]
:raises ValueError:
If a negative multiplicity is encountered.
"""
if data is None:
return {}
out: Dict[str, int] = {}
for k, v in data.items():
n = int(v)
if n < 0:
raise ValueError(f"Negative multiplicity not allowed: {k!r} -> {n}")
if n > 0:
out[str(k)] = n
return out
[docs]
def load_hypergraph_and_flow(
self,
vertices: Iterable[str],
edges: Mapping[str, Tuple[Mapping[str, int], Mapping[str, int]]],
flow: Mapping[str, int],
*,
initial_marking: Optional[Mapping[str, int]] = None,
) -> "PathwayRealizability":
"""
Load tokenized hypergraph data and a requested pathway flow.
The ``edges`` mapping is interpreted as
``reaction_id -> (tail_multiset, head_multiset)``. Reactions not
present in ``flow`` default to 0. Negative flow values are excluded
here and validated again later during Petri-net construction.
:param vertices:
Iterable of species tokens.
:type vertices: Iterable[str]
:param edges:
Mapping from reaction token to ``(tail, head)`` multisets.
:type edges: Mapping[str, Tuple[Mapping[str, int], Mapping[str, int]]]
:param flow:
Requested reaction firing counts.
:type flow: Mapping[str, int]
:param initial_marking:
Optional initial species marking.
:type initial_marking: Optional[Mapping[str, int]]
:returns:
The current instance.
:rtype: PathwayRealizability
Example
-------
.. code-block:: python
pr = PathwayRealizability().load_hypergraph_and_flow(
vertices=["A", "B", "C"],
edges={
"r1": ({"A": 1}, {"B": 1}),
"r2": ({"B": 1}, {"C": 1}),
},
flow={"r1": 1, "r2": 1},
initial_marking={"A": 1},
)
"""
self.vertices = {str(v) for v in vertices}
self.edges = {
str(eid): (
{str(s): int(v) for s, v in tail.items() if int(v) > 0},
{str(s): int(v) for s, v in head.items() if int(v) > 0},
)
for eid, (tail, head) in edges.items()
}
self.flow = {
str(eid): int(flow.get(eid, 0))
for eid in self.edges
if int(flow.get(eid, 0)) >= 0
}
self.species_token_to_id = {v: v for v in self.vertices}
self.reaction_token_to_id = {eid: eid for eid in self.edges}
self.species_id_to_token = {v: v for v in self.vertices}
self.reaction_id_to_token = {eid: eid for eid in self.edges}
self._provided_initial_marking = self._clean_multiset(initial_marking)
self._petri = None
self._initial_marking = None
self._goal_exact = {}
self._goal_atleast = {}
self._certificate = None
return self
def _resolve_flow_tokens(
self,
incidence_reaction_order: List[str],
reaction_token_to_id: Mapping[str, str],
reaction_labels: Mapping[str, str],
reaction_source_node_ids: Mapping[str, object],
provided: Mapping[str, int],
) -> Dict[str, int]:
"""
Resolve user-supplied flow keys onto reaction tokens.
Keys may match reaction token, internal id, label, or source node id.
Missing reactions default to 0.
:param incidence_reaction_order:
Canonical reaction-id order from the incidence view.
:type incidence_reaction_order: List[str]
:param reaction_token_to_id:
Mapping from reaction token to internal reaction id.
:type reaction_token_to_id: Mapping[str, str]
:param reaction_labels:
Mapping from reaction id to human-readable label.
:type reaction_labels: Mapping[str, str]
:param reaction_source_node_ids:
Mapping from reaction id to source node id.
:type reaction_source_node_ids: Mapping[str, object]
:param provided:
User-provided flow mapping.
:type provided: Mapping[str, int]
:returns:
Flow map keyed by reaction token.
:rtype: Dict[str, int]
"""
flow_map: Dict[str, int] = {}
id_to_token = {rid: tok for tok, rid in reaction_token_to_id.items()}
for rid in incidence_reaction_order:
rtoken = id_to_token[rid]
label = reaction_labels.get(rid, rid)
source = str(reaction_source_node_ids.get(rid, rid))
if rtoken in provided:
flow_map[rtoken] = int(provided[rtoken])
elif rid in provided:
flow_map[rtoken] = int(provided[rid])
elif label in provided:
flow_map[rtoken] = int(provided[label])
elif source in provided:
flow_map[rtoken] = int(provided[source])
else:
flow_map[rtoken] = 0
return flow_map
def _resolve_species_marking_tokens(
self,
incidence_species_order: List[str],
species_token_to_id: Mapping[str, str],
species_labels: Mapping[str, str],
species_source_node_ids: Mapping[str, object],
provided: Mapping[str, int],
) -> Dict[str, int]:
"""
Resolve user-supplied initial marking keys onto species tokens.
Keys may match species token, internal id, label, or source node id.
Missing species default to 0 and are dropped from the returned mapping.
:param incidence_species_order:
Canonical species-id order from the incidence view.
:type incidence_species_order: List[str]
:param species_token_to_id:
Mapping from species token to internal species id.
:type species_token_to_id: Mapping[str, str]
:param species_labels:
Mapping from species id to human-readable label.
:type species_labels: Mapping[str, str]
:param species_source_node_ids:
Mapping from species id to source node id.
:type species_source_node_ids: Mapping[str, object]
:param provided:
User-provided initial marking.
:type provided: Mapping[str, int]
:returns:
Initial marking keyed by species token.
:rtype: Dict[str, int]
"""
marking_map: Dict[str, int] = {}
id_to_token = {sid: tok for tok, sid in species_token_to_id.items()}
for sid in incidence_species_order:
stok = id_to_token[sid]
label = species_labels.get(sid, sid)
source = str(species_source_node_ids.get(sid, sid))
if stok in provided:
marking_map[stok] = int(provided[stok])
elif sid in provided:
marking_map[stok] = int(provided[sid])
elif label in provided:
marking_map[stok] = int(provided[label])
elif source in provided:
marking_map[stok] = int(provided[source])
else:
marking_map[stok] = 0
return {k: v for k, v in marking_map.items() if int(v) > 0}
[docs]
def load_syncrn_and_flow(
self,
crn: object,
flow: Optional[Mapping[str, int]] = None,
*,
initial_marking: Optional[Mapping[str, int]] = None,
species: str = "label",
reaction: str = "id",
) -> "PathwayRealizability":
"""
Load a SynCRN-like object together with a requested pathway flow.
The SynCRN object is tokenized via :func:`tokenize_syncrn_incidence`.
User-supplied flow and marking keys are resolved flexibly against token,
internal id, label, and source node id.
:param crn:
SynCRN-like object.
:type crn: object
:param flow:
Requested reaction firing counts.
:type flow: Optional[Mapping[str, int]]
:param initial_marking:
Optional initial species marking.
:type initial_marking: Optional[Mapping[str, int]]
:param species:
Species tokenization mode.
:type species: str
:param reaction:
Reaction tokenization mode.
:type reaction: str
:returns:
The current instance.
:rtype: PathwayRealizability
Example
-------
.. code-block:: python
pr = PathwayRealizability().load_syncrn_and_flow(
syn,
flow={"12": 1, "13": 1},
initial_marking={"A": 2},
species="label",
reaction="id",
)
"""
vertices, edges, species_token_to_id, reaction_token_to_id, incidence = (
tokenize_syncrn_incidence(crn, species=species, reaction=reaction)
)
provided_flow = (
{} if flow is None else {str(k): int(v) for k, v in flow.items()}
)
flow_map = self._resolve_flow_tokens(
incidence.reaction_order,
reaction_token_to_id,
incidence.reaction_labels,
incidence.reaction_source_node_ids,
provided_flow,
)
provided_marking = (
{}
if initial_marking is None
else {str(k): int(v) for k, v in initial_marking.items()}
)
marking_map = self._resolve_species_marking_tokens(
incidence.species_order,
species_token_to_id,
incidence.species_labels,
incidence.species_source_node_ids,
provided_marking,
)
self.load_hypergraph_and_flow(
vertices,
edges,
flow_map,
initial_marking=marking_map,
)
self.species_token_to_id = species_token_to_id
self.reaction_token_to_id = reaction_token_to_id
self.species_id_to_token = {
sid: tok for tok, sid in species_token_to_id.items()
}
self.reaction_id_to_token = {
rid: tok for tok, rid in reaction_token_to_id.items()
}
return self
# ------------------------------------------------------------------
# Petri net construction
# ------------------------------------------------------------------
[docs]
def set_initial_marking(self, marking: Mapping[str, int]) -> "PathwayRealizability":
"""
Replace the user-provided initial marking.
This resets any previously built Petri-net instance, goals, and cached
firing certificate.
:param marking:
New initial marking.
:type marking: Mapping[str, int]
:returns:
The current instance.
:rtype: PathwayRealizability
"""
self._provided_initial_marking = self._clean_multiset(marking)
self._petri = None
self._initial_marking = None
self._goal_exact = {}
self._goal_atleast = {}
self._certificate = None
return self
[docs]
def build_petri_net_from_flow(self) -> "PathwayRealizability":
"""
Build an augmented Petri net encoding the requested pathway flow.
For every active reaction ``eid`` with requested flow ``f > 0``, two
auxiliary places are created:
- ``__ext__{eid}``: supply place initialized with ``f`` tokens,
- ``__target__{eid}``: target place expected to accumulate ``f`` tokens.
The reaction transition consumes one token from the supply place on each
firing and produces one token in the target place on each firing. This
enforces exact realization of the requested number of firings.
:returns:
The current instance.
:rtype: PathwayRealizability
:raises RuntimeError:
If no pathway data has been loaded.
:raises ValueError:
If a negative flow value is encountered.
Example
-------
.. code-block:: python
pr = PathwayRealizability().load_syncrn_and_flow(
syn,
flow={"12": 1, "13": 1},
)
pr.build_petri_net_from_flow()
print(pr.initial_marking)
print(pr.goal_exact)
"""
if not self.edges:
raise RuntimeError(
"No pathway data loaded; call load_syncrn_and_flow() or load_hypergraph_and_flow()."
)
net = PetriNet()
m0: Dict[Place, int] = defaultdict(int)
goal_exact: Dict[Place, int] = {}
for v in sorted(self.vertices):
net.add_place(v, label=v, source_node_id=v)
m0[v] = int(self._provided_initial_marking.get(v, 0))
for eid in sorted(self.edges):
fval = int(self.flow.get(eid, 0))
if fval < 0:
raise ValueError(f"Flow must be non-negative, got {eid!r} -> {fval}")
if fval == 0:
continue
tail, head = self.edges[eid]
pre = {v: int(w) for v, w in tail.items() if int(w) > 0}
post = {v: int(w) for v, w in head.items() if int(w) > 0}
ext = f"__ext__{eid}"
tgt = f"__target__{eid}"
net.add_place(ext, label=ext)
net.add_place(tgt, label=tgt)
pre_with_supply = dict(pre)
pre_with_supply[ext] = 1
post_with_target = dict(post)
post_with_target[tgt] = 1
net.add_transition(
eid,
pre_with_supply,
post_with_target,
label=eid,
source_reaction_id=self.reaction_token_to_id.get(eid, eid),
)
m0[ext] = fval
goal_exact[ext] = 0
goal_exact[tgt] = fval
m0[tgt] = 0
self._petri = net
self._initial_marking = dict(m0)
self._goal_exact = goal_exact
self._goal_atleast = {}
self._certificate = None
return self
# ------------------------------------------------------------------
# properties
# ------------------------------------------------------------------
@property
def petri(self) -> PetriNet:
"""
Return the active Petri net.
:returns:
Augmented Petri net for realizability checking.
:rtype: PetriNet
:raises RuntimeError:
If the Petri net has not yet been built.
"""
if self._petri is None:
raise RuntimeError("Petri net not built; call build_petri_net_from_flow().")
return self._petri
@property
def initial_marking(self) -> Dict[Place, int]:
"""
Return the active initial marking of the augmented Petri net.
:returns:
Initial marking.
:rtype: Dict[Place, int]
:raises RuntimeError:
If the initial marking has not yet been initialized.
"""
if self._initial_marking is None:
raise RuntimeError("Initial marking missing; build Petri net first.")
return self._initial_marking
@property
def goal_exact(self) -> Dict[Place, int]:
"""
Return exact target marking constraints.
:returns:
Exact goal marking constraints.
:rtype: Dict[Place, int]
"""
return dict(self._goal_exact)
@property
def goal_atleast(self) -> Dict[Place, int]:
"""
Return lower-bound target marking constraints.
:returns:
Lower-bound goal marking constraints.
:rtype: Dict[Place, int]
"""
return dict(self._goal_atleast)
@property
def certificate(self) -> Optional[List[TransitionId]]:
"""
Return the most recently found firing certificate.
:returns:
Realizing firing sequence, or ``None`` if none is cached.
:rtype: Optional[List[TransitionId]]
"""
return self._certificate
[docs]
def summary(self) -> RealizabilitySummary:
"""
Return a compact summary of the current realizability instance.
:returns:
Serializable instance summary.
:rtype: RealizabilitySummary
Example
-------
.. code-block:: python
info = pr.summary()
print(info.active_flow)
"""
return RealizabilitySummary(
n_species=len(self.vertices),
n_reactions=len(self.edges),
active_flow={k: v for k, v in self.flow.items() if int(v) > 0},
initial_marking=dict(self._provided_initial_marking),
goal_exact=dict(self._goal_exact),
goal_atleast=dict(self._goal_atleast),
)
# ------------------------------------------------------------------
# goal helpers
# ------------------------------------------------------------------
def _goal_reached(self, marking: Mapping[str, int]) -> bool:
"""
Test whether a marking satisfies the active goal constraints.
Exact constraints in ``_goal_exact`` must match exactly, while lower
bounds in ``_goal_atleast`` must be met or exceeded.
:param marking:
Candidate marking.
:type marking: Mapping[str, int]
:returns:
``True`` if the marking satisfies all active goals.
:rtype: bool
"""
for p, val in self._goal_exact.items():
if int(marking.get(p, 0)) != int(val):
return False
for p, val in self._goal_atleast.items():
if int(marking.get(p, 0)) < int(val):
return False
return True
# ------------------------------------------------------------------
# bounded BFS
# ------------------------------------------------------------------
[docs]
def is_realizable(
self,
max_states: Optional[int] = None,
max_depth: Optional[int] = None,
) -> Tuple[bool, Optional[List[TransitionId]]]:
"""
Test exact realizability by bounded breadth-first search.
The search explores reachable Petri-net markings while recording firing
sequences. A certificate is returned when a goal-satisfying marking is
found.
:param max_states:
Optional override for the maximum number of explored states.
:type max_states: Optional[int]
:param max_depth:
Optional override for the maximum explored firing depth.
:type max_depth: Optional[int]
:returns:
Pair ``(is_realizable, certificate)`` where ``certificate`` is a
realizing transition sequence if one is found.
:rtype: Tuple[bool, Optional[List[TransitionId]]]
Example
-------
.. code-block:: python
ok, cert = pr.is_realizable(max_states=50000, max_depth=2000)
print(ok)
print(cert)
"""
net = self.petri
m0 = dict(self.initial_marking)
max_states = max_states if max_states is not None else self._config.max_states
max_depth = max_depth if max_depth is not None else self._config.max_depth
if self._goal_reached(m0):
self._certificate = []
return True, []
start = net.marking_to_tuple(m0)
q: deque[Tuple[Tuple[int, ...], List[TransitionId]]] = deque([(start, [])])
visited = {start}
states = 0
while q:
mtuple, seq = q.popleft()
states += 1
if states > max_states:
break
if len(seq) > max_depth:
continue
marking = net.tuple_to_marking(mtuple)
for tid in net.transition_order:
if not net.enabled(marking, tid):
continue
new_mark = net.fire(marking, tid)
if self._goal_reached(new_mark):
seq2 = seq + [tid]
self._certificate = seq2
return True, seq2
new_tuple = net.marking_to_tuple(new_mark)
if new_tuple not in visited:
visited.add(new_tuple)
q.append((new_tuple, seq + [tid]))
self._certificate = None
return False, None
# ------------------------------------------------------------------
# König sufficient test
# ------------------------------------------------------------------
[docs]
def is_realizable_via_konig(self) -> bool:
"""
Apply a König-style sufficient acyclicity test.
A directed bipartite dependency graph is built using:
- species vertices ``("v", species)``
- active-reaction vertices ``("e", reaction)``
with edges:
- species -> reaction for reactant incidence,
- reaction -> species for product incidence.
If this dependency graph is acyclic, the active flow is certified
realizable by this sufficient test.
:returns:
``True`` if the sufficient acyclicity condition holds, else
``False``.
:rtype: bool
Example
-------
.. code-block:: python
if pr.is_realizable_via_konig():
print("Guaranteed realizable by sufficient test")
"""
active_edges = {
eid: self.edges[eid] for eid, fval in self.flow.items() if int(fval) > 0
}
vertices: Set[str] = set()
for tail, head in active_edges.values():
vertices.update(tail.keys())
vertices.update(head.keys())
g = nx.DiGraph()
for v in vertices:
g.add_node(("v", v))
for eid in active_edges:
g.add_node(("e", eid))
for eid, (tail, head) in active_edges.items():
for v in tail:
g.add_edge(("v", v), ("e", eid))
for v in head:
g.add_edge(("e", eid), ("v", v))
return nx.is_directed_acyclic_graph(g)
# ------------------------------------------------------------------
# scaled / borrow realizability
# ------------------------------------------------------------------
[docs]
def is_scaled_realizable(self, k_max: int = 4) -> Tuple[bool, Optional[int]]:
"""
Test whether a scaled version of the requested flow is realizable.
For each integer ``k`` from 1 to ``k_max``, the requested flow is scaled
to ``k * flow`` and tested for exact realizability.
:param k_max:
Maximum scale factor to try.
:type k_max: int
:returns:
Pair ``(success, k)`` where ``k`` is the first successful scale
factor, or ``None`` if no tested scale succeeds.
:rtype: Tuple[bool, Optional[int]]
Example
-------
.. code-block:: python
ok, k = pr.is_scaled_realizable(k_max=5)
print(ok, k)
"""
saved_flow = dict(self.flow)
for k in range(1, k_max + 1):
self.flow = {eid: k * int(v) for eid, v in saved_flow.items()}
self.build_petri_net_from_flow()
ok, _ = self.is_realizable()
if ok:
self.flow = saved_flow
self.build_petri_net_from_flow()
return True, k
self.flow = saved_flow
self.build_petri_net_from_flow()
return False, None
[docs]
def is_borrow_realizable(
self,
max_borrow_each: int = 2,
) -> Tuple[bool, Optional[Mapping[str, int]]]:
"""
Test realizability when temporary borrowed initial tokens are allowed.
Every combination of per-species borrowed tokens from 0 up to
``max_borrow_each`` is tried. For a candidate borrow multiset, the
borrowed tokens are added to the initial marking and then imposed again
as lower-bound final goals in ``goal_atleast`` so that the borrowed
material must be returned.
:param max_borrow_each:
Maximum borrowed amount tested independently for each species.
:type max_borrow_each: int
:returns:
Pair ``(success, borrow)`` where ``borrow`` is the first successful
borrow multiset, or ``None`` if no tested borrow succeeds.
:rtype: Tuple[bool, Optional[Mapping[str, int]]]
Example
-------
.. code-block:: python
ok, borrow = pr.is_borrow_realizable(max_borrow_each=1)
print(ok, borrow)
"""
saved_initial = dict(self._provided_initial_marking)
species = sorted(self.vertices)
for comb in itertools.product(range(max_borrow_each + 1), repeat=len(species)):
borrow = {s: comb[i] for i, s in enumerate(species) if comb[i] > 0}
self._provided_initial_marking = dict(saved_initial)
for s, val in borrow.items():
self._provided_initial_marking[s] = (
self._provided_initial_marking.get(s, 0) + val
)
self.build_petri_net_from_flow()
self._goal_atleast = dict(borrow)
ok, _ = self.is_realizable()
if ok:
self._provided_initial_marking = saved_initial
self.build_petri_net_from_flow()
return True, borrow
self._provided_initial_marking = saved_initial
self.build_petri_net_from_flow()
return False, None
# ------------------------------------------------------------------
# export
# ------------------------------------------------------------------
[docs]
def export_pnml(self, fn: str) -> "PathwayRealizability":
"""
Export the current augmented Petri net to a JSON-based PNML-like file.
The exported structure contains places, transitions, initial marking,
and active goal constraints. Despite the method name, the file written
here is JSON rather than XML PNML.
:param fn:
Output filename.
:type fn: str
:returns:
The current instance.
:rtype: PathwayRealizability
Example
-------
.. code-block:: python
pr.export_pnml("pathway_realizability.json")
"""
self.build_petri_net_from_flow()
net = self.petri
data = {
"places": net.place_order,
"transitions": {
tid: {"pre": t.pre, "post": t.post}
for tid, t in net.transitions.items()
},
"initial_marking": self.initial_marking,
"goal_exact": self._goal_exact,
"goal_atleast": self._goal_atleast,
}
with open(fn, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2)
return self
def __repr__(self) -> str: # pragma: no cover - simple repr
"""
Return a compact representation.
:returns:
Readable summary string.
:rtype: str
"""
return (
f"<PathwayRealizability species={len(self.vertices)} "
f"reactions={len(self.edges)}>"
)
[docs]
def run_realizability_from_syncrn(
crn: object,
flow: Optional[Mapping[str, int]] = None,
*,
initial_marking: Optional[Mapping[str, int]] = None,
species: str = "label",
reaction: str = "id",
verbose: bool = True,
) -> Tuple[PathwayRealizability, Dict[str, object]]:
"""
Run König-style and exact BFS realizability tests on a SynCRN-like object.
This convenience wrapper:
1. loads the SynCRN object and requested flow,
2. builds the augmented Petri net,
3. applies the sufficient König-style acyclicity test,
4. runs exact bounded BFS realizability search,
5. optionally prints a human-readable summary.
:param crn:
SynCRN-like object.
:type crn: object
:param flow:
Optional requested reaction firing counts.
:type flow: Optional[Mapping[str, int]]
:param initial_marking:
Optional initial marking.
:type initial_marking: Optional[Mapping[str, int]]
:param species:
Species tokenization mode.
:type species: str
:param reaction:
Reaction tokenization mode.
:type reaction: str
:param verbose:
Whether to print a textual summary.
:type verbose: bool
:returns:
Pair ``(pathway_realizability_instance, info_dict)``.
:rtype: Tuple[PathwayRealizability, Dict[str, object]]
Example
-------
.. code-block:: python
pr, info = run_realizability_from_syncrn(
syn,
flow={"12": 1, "13": 1},
initial_marking={"A": 2},
verbose=True,
)
print(info["konig"])
print(info["bfs"])
print(info["certificate"])
"""
pr = PathwayRealizability().load_syncrn_and_flow(
crn,
flow=flow,
initial_marking=initial_marking,
species=species,
reaction=reaction,
)
pr.build_petri_net_from_flow()
konig_ok = pr.is_realizable_via_konig()
bfs_ok, cert = pr.is_realizable()
if verbose:
print("Active flow:")
for eid, val in sorted(pr.flow.items()):
if int(val) > 0:
print(f" {eid}: {val}")
print("Initial marking:", dict(sorted(pr._provided_initial_marking.items())))
print("König sufficient test:", konig_ok)
print("BFS realizable:", bfs_ok)
print("Firing certificate:", cert)
info: Dict[str, object] = {
"konig": konig_ok,
"bfs": bfs_ok,
"certificate": cert,
"summary": pr.summary(),
}
return pr, info