diff --git a/CHANGELOG.md b/CHANGELOG.md index c14f980b..7d521ba5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -166,6 +166,7 @@ arbitrarily applied mapper pass. - `CNOT2CZDecomposer` decomposer pass - `RoutingChecker` routing pass - Restore SGMQ notation for barrier groups in cQASMv1 Exporter +- `CircuitAnalyzer` analyzer pass for computing structural circuit metrics (size, interaction graph, gate dependency graph, density) ### Changed diff --git a/opensquirrel/circuit.py b/opensquirrel/circuit.py index a3290825..e6ba9412 100644 --- a/opensquirrel/circuit.py +++ b/opensquirrel/circuit.py @@ -13,6 +13,7 @@ if TYPE_CHECKING: from opensquirrel.ir.ir import IR from opensquirrel.ir.unitary import Gate + from opensquirrel.passes.analyzer.general_analyzer import Analyzer from opensquirrel.passes.decomposer.general_decomposer import Decomposer from opensquirrel.passes.exporter.general_exporter import Exporter from opensquirrel.passes.mapper.general_mapper import Mapper @@ -148,6 +149,18 @@ def asm_filter(self, backend_name: str) -> None: or (isinstance(statement, AsmDeclaration) and backend_name in str(statement.backend_name)) ] + def analyze(self, analyzer: Analyzer) -> dict[str, Any]: + """Analyzes the circuit using the specified analyzer. + + Args: + analyzer (Analyzer): The analyzer to apply. + + Returns: + dict[str, Any]: The metrics computed by the analyzer. + + """ + return analyzer.analyze(self) + def decompose(self, decomposer: Decomposer) -> None: """Decomposes the circuit using to the specified decomposer. diff --git a/opensquirrel/passes/analyzer/__init__.py b/opensquirrel/passes/analyzer/__init__.py new file mode 100644 index 00000000..9ca028de --- /dev/null +++ b/opensquirrel/passes/analyzer/__init__.py @@ -0,0 +1,5 @@ +from opensquirrel.passes.analyzer.circuit_analyzer import CircuitAnalyzer + +__all__ = [ + "CircuitAnalyzer", +] diff --git a/opensquirrel/passes/analyzer/circuit_analyzer.py b/opensquirrel/passes/analyzer/circuit_analyzer.py new file mode 100644 index 00000000..7cd1c721 --- /dev/null +++ b/opensquirrel/passes/analyzer/circuit_analyzer.py @@ -0,0 +1,287 @@ +from __future__ import annotations + +import math +from typing import TYPE_CHECKING, Any + +import networkx as nx + +from opensquirrel.ir.two_qubit_gate import TwoQubitGate +from opensquirrel.ir.unitary import Gate +from opensquirrel.passes.analyzer.general_analyzer import Analyzer + +if TYPE_CHECKING: + from opensquirrel.circuit import Circuit + + +class CircuitAnalyzer(Analyzer): + """Computes structural metrics describing a quantum circuit. + + The metrics are grouped into four categories: + + * Size: number of qubits, gates, two-qubit gates, two-qubit gate percentage, depth. + * Interaction graph (IG): metrics derived from the qubit interaction graph, + where nodes are qubits and edges are two-qubit gates. + * Gate dependency graph (GDG): metrics derived from the directed acyclic graph + of gate-to-gate dependencies on shared qubits. + * Density: parallelisation-related metrics (density score, idling score). + + The metric set follows the structural circuit profiling approach proposed + in Bandic et al., "Profiling quantum circuits for their efficient execution + on single- and multi-core architectures" (Quantum Sci. Technol. 10, 015060, 2025). + + """ + + def analyze(self, circuit: Circuit) -> dict[str, Any]: + """Run the analyzer on the given circuit and return all metrics. + + Args: + circuit (Circuit): The circuit to analyze. + + Returns: + dict[str, Any]: A flat dictionary mapping metric name to its value. + + """ + metrics: dict[str, Any] = {} + metrics.update(self._size_metrics(circuit)) + metrics.update(self._interaction_graph_metrics(circuit)) + metrics.update(self._gate_dependency_graph_metrics(circuit)) + metrics.update(self._density_metrics(circuit)) + return metrics + + # ------------------------------------------------------------------ # + # Size metrics # + # ------------------------------------------------------------------ # + @staticmethod + def _size_metrics(circuit: Circuit) -> dict[str, Any]: + n_qubits = circuit.qubit_register_size + gate_statements = [s for s in circuit.ir.statements if isinstance(s, Gate)] + n_gates = len(gate_statements) + n_two_qubit_gates = sum(1 for s in gate_statements if isinstance(s, TwoQubitGate)) + two_qubit_pct = round(n_two_qubit_gates / n_gates, 4) if n_gates > 0 else 0.0 + depth = CircuitAnalyzer._compute_depth(circuit, gate_statements) + + return { + "n_qubits": n_qubits, + "n_gates": n_gates, + "n_two_qubit_gates": n_two_qubit_gates, + "two_qubit_pct": two_qubit_pct, + "depth": depth, + } + + @staticmethod + def _compute_depth(circuit: Circuit, gate_statements: list[Gate]) -> int: + """ASAP-style circuit depth (longest dependency chain).""" + n_qubits = circuit.qubit_register_size + if n_qubits == 0 or not gate_statements: + return 0 + + layer = [0] * n_qubits + for gate in gate_statements: + qubit_indices = list(gate.qubit_indices) + if not qubit_indices: + continue + new_layer = max(layer[i] for i in qubit_indices) + 1 + for i in qubit_indices: + layer[i] = new_layer + return max(layer) + + # ------------------------------------------------------------------ # + # Interaction graph metrics # + # ------------------------------------------------------------------ # + @staticmethod + def _interaction_graph_metrics(circuit: Circuit) -> dict[str, Any]: + empty: dict[str, Any] = { + "ig_avg_shortest_path": 0.0, + "ig_std_adjacency": 0.0, + "ig_diameter": 0, + "ig_central_dominance": 0.0, + "ig_avg_degree": 0.0, + "ig_n_maximal_cliques": 0, + "ig_clustering_coefficient": 0.0, + } + + weighted_edges = circuit.interaction_graph + if not weighted_edges: + return empty + + graph = nx.Graph() + graph.add_nodes_from(range(circuit.qubit_register_size)) + for (i, j), weight in weighted_edges.items(): + graph.add_edge(i, j, weight=weight) + + # avg_shortest_path: computed on the largest connected component. + try: + largest_cc_nodes = max(nx.connected_components(graph), key=len) + largest_cc = graph.subgraph(largest_cc_nodes) + avg_shortest_path = ( + round(nx.average_shortest_path_length(largest_cc), 4) if largest_cc.number_of_nodes() > 1 else 0.0 + ) + except (nx.NetworkXError, ValueError): + avg_shortest_path = 0.0 + + # std of adjacency matrix entries + adjacency_matrix = nx.to_numpy_array(graph) + std_adjacency = round(float(adjacency_matrix.std()), 4) + + # diameter: undefined for disconnected graphs so we use 0 in that case. + try: + diameter = nx.diameter(graph) if nx.is_connected(graph) else 0 + except nx.NetworkXError: + diameter = 0 + + # central point of dominance: max betweenness across nodes. + betweenness = nx.betweenness_centrality(graph) + central_dominance = round(max(betweenness.values()), 4) if betweenness else 0.0 + + # average degree + degrees = [d for _, d in graph.degree()] + avg_degree = round(sum(degrees) / len(degrees), 4) if degrees else 0.0 + + # number of maximal cliques + try: + n_maximal_cliques = sum(1 for _ in nx.find_cliques(graph)) + except nx.NetworkXError: + n_maximal_cliques = 0 + + # average clustering coefficient + clustering_coefficient = round(nx.average_clustering(graph), 4) + + return { + "ig_avg_shortest_path": avg_shortest_path, + "ig_std_adjacency": std_adjacency, + "ig_diameter": diameter, + "ig_central_dominance": central_dominance, + "ig_avg_degree": avg_degree, + "ig_n_maximal_cliques": n_maximal_cliques, + "ig_clustering_coefficient": clustering_coefficient, + } + + # ------------------------------------------------------------------ # + # Gate dependency graph metrics # + # ------------------------------------------------------------------ # + @staticmethod + def _gate_dependency_graph_metrics(circuit: Circuit) -> dict[str, Any]: + gate_statements = [s for s in circuit.ir.statements if isinstance(s, Gate)] + n_gates = len(gate_statements) + if n_gates == 0: + return { + "gdg_critical_path_length": 0, + "gdg_path_length_mean": 0.0, + "gdg_path_length_std": 0.0, + "gdg_pct_gates_in_critical_path": 0.0, + } + + gdg = CircuitAnalyzer._build_gate_dependency_graph(gate_statements) + critical_path_length = CircuitAnalyzer._safe_critical_path_length(gdg) + longest_to, longest_from = CircuitAnalyzer._compute_longest_paths(gdg) + mean_length, std_length = CircuitAnalyzer._path_length_stats(longest_to) + pct_gates_in_cp = CircuitAnalyzer._critical_path_membership_fraction( + gdg, longest_to, longest_from, critical_path_length, n_gates + ) + + return { + "gdg_critical_path_length": critical_path_length, + "gdg_path_length_mean": round(mean_length, 4), + "gdg_path_length_std": round(std_length, 4), + "gdg_pct_gates_in_critical_path": pct_gates_in_cp, + } + + @staticmethod + def _build_gate_dependency_graph(gate_statements: list[Gate]) -> nx.DiGraph: + """Build a DAG where edge (i, j) means gate i must run before gate j.""" + gdg: nx.DiGraph = nx.DiGraph() + last_gate_on_qubit: dict[int, int] = {} + for index, gate in enumerate(gate_statements): + gdg.add_node(index) + for qubit_index in gate.qubit_indices: + if qubit_index in last_gate_on_qubit: + gdg.add_edge(last_gate_on_qubit[qubit_index], index) + last_gate_on_qubit[qubit_index] = index + return gdg + + @staticmethod + def _safe_critical_path_length(gdg: nx.DiGraph) -> int: + try: + return nx.dag_longest_path_length(gdg) + except nx.NetworkXError: + return 0 + + @staticmethod + def _compute_longest_paths(gdg: nx.DiGraph) -> tuple[dict[int, int], dict[int, int]]: + """Return (longest_to, longest_from) for every node in the DAG. + + longest_to[n] = length of the longest path ending at n + longest_from[n] = length of the longest path starting at n + """ + topo_order = list(nx.topological_sort(gdg)) + longest_to: dict[int, int] = dict.fromkeys(gdg.nodes, 0) + for node in topo_order: + for successor in gdg.successors(node): + if longest_to[node] + 1 > longest_to[successor]: + longest_to[successor] = longest_to[node] + 1 + + longest_from: dict[int, int] = dict.fromkeys(gdg.nodes, 0) + for node in reversed(topo_order): + for successor in gdg.successors(node): + if longest_from[successor] + 1 > longest_from[node]: + longest_from[node] = longest_from[successor] + 1 + + return longest_to, longest_from + + @staticmethod + def _path_length_stats(longest_to: dict[int, int]) -> tuple[float, float]: + path_lengths = list(longest_to.values()) + if not path_lengths: + return 0.0, 0.0 + mean_length = sum(path_lengths) / len(path_lengths) + variance = sum((x - mean_length) ** 2 for x in path_lengths) / len(path_lengths) + return mean_length, math.sqrt(variance) + + @staticmethod + def _critical_path_membership_fraction( + gdg: nx.DiGraph, + longest_to: dict[int, int], + longest_from: dict[int, int], + critical_path_length: int, + n_gates: int, + ) -> float: + """Fraction of gates that lie on some critical path. + + A node lies on a critical path iff the longest path through it + (longest_to[n] + longest_from[n]) equals the overall critical path length. + """ + n_in_cp = sum(1 for node in gdg.nodes if longest_to[node] + longest_from[node] == critical_path_length) + return round(n_in_cp / n_gates, 4) + + # ------------------------------------------------------------------ # + # Density metrics # + # ------------------------------------------------------------------ # + @staticmethod + def _density_metrics(circuit: Circuit) -> dict[str, Any]: + n_qubits = circuit.qubit_register_size + gate_statements = [s for s in circuit.ir.statements if isinstance(s, Gate)] + n_gates = len(gate_statements) + n_two_qubit_gates = sum(1 for s in gate_statements if isinstance(s, TwoQubitGate)) + n_one_qubit_gates = n_gates - n_two_qubit_gates + depth = CircuitAnalyzer._compute_depth(circuit, gate_statements) + + # Density score: parallelisation level of the circuit (0..1). + # Formula from Bandic et al. 2025, eq. (1). + if n_qubits > 1 and depth > 1: + density_score = (2 * n_two_qubit_gates + n_one_qubit_gates) / ((depth - 1) * (n_qubits - 1)) + density_score = round(min(density_score, 1.0), 4) + else: + density_score = 0.0 + + # Idling score: average qubit idling fraction (0..1). + if n_qubits > 0 and depth > 0: + qubit_active_layers: dict[int, int] = dict.fromkeys(range(n_qubits), 0) + for gate in gate_statements: + for qubit_index in gate.qubit_indices: + qubit_active_layers[qubit_index] = qubit_active_layers.get(qubit_index, 0) + 1 + total_idle = sum(depth - active for active in qubit_active_layers.values()) + idling_score = round(max(0.0, min(total_idle / (n_qubits * depth), 1.0)), 4) + else: + idling_score = 0.0 + + return {"density_score": density_score, "idling_score": idling_score} diff --git a/opensquirrel/passes/analyzer/general_analyzer.py b/opensquirrel/passes/analyzer/general_analyzer.py new file mode 100644 index 00000000..6b12b624 --- /dev/null +++ b/opensquirrel/passes/analyzer/general_analyzer.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from opensquirrel.circuit import Circuit + + +class Analyzer(ABC): + def __init__(self, **kwargs: Any) -> None: ... + + @abstractmethod + def analyze(self, circuit: Circuit) -> dict[str, Any]: ... diff --git a/tests/passes/analyzer/__init__.py b/tests/passes/analyzer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/passes/analyzer/test_circuit_analyzer.py b/tests/passes/analyzer/test_circuit_analyzer.py new file mode 100644 index 00000000..84240573 --- /dev/null +++ b/tests/passes/analyzer/test_circuit_analyzer.py @@ -0,0 +1,214 @@ +# Tests for CircuitAnalyzer pass + +import pytest + +from opensquirrel import CircuitBuilder +from opensquirrel.circuit import Circuit +from opensquirrel.passes.analyzer import CircuitAnalyzer + + +@pytest.fixture +def analyzer() -> CircuitAnalyzer: + return CircuitAnalyzer() + + +# --------------------------------------------------------------------- # +# Sample circuits # +# --------------------------------------------------------------------- # +@pytest.fixture +def empty_circuit() -> Circuit: + """A circuit with qubits but no gates.""" + builder = CircuitBuilder(3) + return builder.to_circuit() + + +@pytest.fixture +def single_qubit_circuit() -> Circuit: + """A 1-qubit circuit with only single-qubit gates.""" + builder = CircuitBuilder(1) + builder.H(0) + builder.X(0) + builder.H(0) + return builder.to_circuit() + + +@pytest.fixture +def ghz_circuit() -> Circuit: + """A linear 3-qubit GHZ-like circuit. 3 gates, depth 3, IG is a path graph.""" + builder = CircuitBuilder(3) + builder.H(0) + builder.CNOT(0, 1) + builder.CNOT(1, 2) + return builder.to_circuit() + + +@pytest.fixture +def parallel_circuit() -> Circuit: + """4 qubits with two independent CNOTs.""" + builder = CircuitBuilder(4) + builder.CNOT(0, 1) + builder.CNOT(2, 3) + return builder.to_circuit() + + +@pytest.fixture +def sequential_circuit() -> Circuit: + """4 qubits, fully sequential CNOTs across them — depth 3, fully on critical path.""" + builder = CircuitBuilder(4) + builder.CNOT(0, 1) + builder.CNOT(1, 2) + builder.CNOT(2, 3) + return builder.to_circuit() + + +# --------------------------------------------------------------------- # +# Smoke / shape # +# --------------------------------------------------------------------- # +def test_returns_dict_with_expected_keys(analyzer: CircuitAnalyzer, ghz_circuit: Circuit) -> None: + result = analyzer.analyze(ghz_circuit) + + expected_keys = { + # Size + "n_qubits", + "n_gates", + "n_two_qubit_gates", + "two_qubit_pct", + "depth", + # Interaction graph + "ig_avg_shortest_path", + "ig_std_adjacency", + "ig_diameter", + "ig_central_dominance", + "ig_avg_degree", + "ig_n_maximal_cliques", + "ig_clustering_coefficient", + # Gate dependency graph + "gdg_critical_path_length", + "gdg_path_length_mean", + "gdg_path_length_std", + "gdg_pct_gates_in_critical_path", + # Density + "density_score", + "idling_score", + } + assert set(result.keys()) == expected_keys + + +def test_circuit_analyze_method_returns_dict(ghz_circuit: Circuit) -> None: + """The Circuit.analyze() method should propagate the analyzer's dict output.""" + result = ghz_circuit.analyze(analyzer=CircuitAnalyzer()) + assert isinstance(result, dict) + assert result["n_qubits"] == 3 + + +# --------------------------------------------------------------------- # +# Size metrics # +# --------------------------------------------------------------------- # +def test_size_metrics_on_empty_circuit(analyzer: CircuitAnalyzer, empty_circuit: Circuit) -> None: + result = analyzer.analyze(empty_circuit) + assert result["n_qubits"] == 3 + assert result["n_gates"] == 0 + assert result["n_two_qubit_gates"] == 0 + assert result["two_qubit_pct"] == pytest.approx(0.0, abs=1e-9) + assert result["depth"] == 0 + + +def test_size_metrics_on_ghz(analyzer: CircuitAnalyzer, ghz_circuit: Circuit) -> None: + result = analyzer.analyze(ghz_circuit) + assert result["n_qubits"] == 3 + assert result["n_gates"] == 3 + assert result["n_two_qubit_gates"] == 2 + assert result["two_qubit_pct"] == pytest.approx(2 / 3, abs=1e-3) + assert result["depth"] == 3 + + +def test_depth_with_parallel_gates(analyzer: CircuitAnalyzer, parallel_circuit: Circuit) -> None: + """Two independent gates can run in the same time-step, so depth is 1.""" + result = analyzer.analyze(parallel_circuit) + assert result["depth"] == 1 + assert result["n_gates"] == 2 + + +def test_depth_with_sequential_gates(analyzer: CircuitAnalyzer, sequential_circuit: Circuit) -> None: + """Gates that share qubits must serialise, so depth equals gate count.""" + result = analyzer.analyze(sequential_circuit) + assert result["depth"] == 3 + assert result["n_gates"] == 3 + + +# --------------------------------------------------------------------- # +# Interaction graph metrics # +# --------------------------------------------------------------------- # +def test_interaction_graph_empty_when_no_two_qubit_gates( + analyzer: CircuitAnalyzer, single_qubit_circuit: Circuit +) -> None: + result = analyzer.analyze(single_qubit_circuit) + assert result["ig_diameter"] == 0 + assert result["ig_avg_degree"] == pytest.approx(0.0, abs=1e-9) + assert result["ig_n_maximal_cliques"] == 0 + assert result["ig_clustering_coefficient"] == pytest.approx(0.0, abs=1e-9) + + +def test_interaction_graph_metrics_on_ghz(analyzer: CircuitAnalyzer, ghz_circuit: Circuit) -> None: + """GHZ-like has IG = path q0-q1-q2: 3 nodes, 2 edges, diameter 2.""" + result = analyzer.analyze(ghz_circuit) + assert result["ig_diameter"] == 2 + # avg degree of a 3-node path = (1 + 2 + 1) / 3 ~= 1.333 + assert result["ig_avg_degree"] == pytest.approx(4 / 3, abs=1e-3) + # 2 maximal cliques (each edge is a maximal clique) + assert result["ig_n_maximal_cliques"] == 2 + + +# --------------------------------------------------------------------- # +# Gate dependency graph metrics # +# --------------------------------------------------------------------- # +def test_critical_path_on_sequential_circuit(analyzer: CircuitAnalyzer, sequential_circuit: Circuit) -> None: + """Three sequential CNOTs form a chain. Critical path length = 2 (3 nodes, 2 edges).""" + result = analyzer.analyze(sequential_circuit) + assert result["gdg_critical_path_length"] == 2 + # All 3 gates lie on the unique critical path. + assert result["gdg_pct_gates_in_critical_path"] == pytest.approx(1.0, abs=1e-9) + + +def test_critical_path_on_parallel_circuit(analyzer: CircuitAnalyzer, parallel_circuit: Circuit) -> None: + """Two independent gates means no edges in the GDG, so critical path length is 0.""" + result = analyzer.analyze(parallel_circuit) + assert result["gdg_critical_path_length"] == 0 + + +def test_critical_path_empty_on_empty_circuit(analyzer: CircuitAnalyzer, empty_circuit: Circuit) -> None: + result = analyzer.analyze(empty_circuit) + assert result["gdg_critical_path_length"] == 0 + assert result["gdg_path_length_mean"] == pytest.approx(0.0, abs=1e-9) + assert result["gdg_path_length_std"] == pytest.approx(0.0, abs=1e-9) + assert result["gdg_pct_gates_in_critical_path"] == pytest.approx(0.0, abs=1e-9) + + +# --------------------------------------------------------------------- # +# Density metrics # +# --------------------------------------------------------------------- # +def test_density_metrics_in_unit_range(analyzer: CircuitAnalyzer, ghz_circuit: Circuit) -> None: + result = analyzer.analyze(ghz_circuit) + assert 0.0 <= result["density_score"] <= 1.0 + assert 0.0 <= result["idling_score"] <= 1.0 + + +def test_density_zero_on_empty_circuit(analyzer: CircuitAnalyzer, empty_circuit: Circuit) -> None: + result = analyzer.analyze(empty_circuit) + assert result["density_score"] == pytest.approx(0.0, abs=1e-9) + assert result["idling_score"] == pytest.approx(0.0, abs=1e-9) + + +def test_idling_score_high_when_one_qubit_unused() -> None: + """If one qubit is never touched, idling score should reflect that. + + With a 2-qubit circuit, depth 2, qubit 0 fully active and qubit 1 fully idle: + idling = ((depth - q0_active) + (depth - q1_active)) / (n_qubits * depth) + = ((2 - 2) + (2 - 0)) / (2 * 2) = 0.5 + """ + builder = CircuitBuilder(2) + builder.H(0) + builder.X(0) + circuit = builder.to_circuit() + result = CircuitAnalyzer().analyze(circuit) + assert result["idling_score"] == pytest.approx(0.5, abs=1e-3)