From b0b1707982ddd0fb70d132f2cbb1bd1de4098f75 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Sun, 7 Jun 2026 22:18:41 -0700 Subject: [PATCH] feat: Kraus implementation for density matrix MCM Uses Kraus formalism to implement mid-circuit measurements in `braket_dm`; this allows `shots=0` simulation of dynamic circuits. As well, instead of rerunning a simulation for each branch, this implementation merges intermediate ensembles during the course of a simulation. This takes more space than the branching algorithm, but the intermediate merging prevents the full exponential space explosion from branching. --- .../density_matrix_simulation.py | 107 +++-- .../density_matrix_simulator.py | 61 +++ .../density_matrix_program_context.py | 370 +++++++++++++++++ .../openqasm/program_context.py | 9 +- .../openqasm/simulation_path.py | 42 ++ .../test_density_matrix_program_context.py | 108 +++++ .../openqasm/test_simulation_path.py | 125 +++++- .../test_density_matrix_simulation.py | 147 +++++++ .../test_density_matrix_simulator.py | 112 +++++ .../braket/default_simulator/test_mcm.py | 383 +++++++++++++----- 10 files changed, 1336 insertions(+), 128 deletions(-) create mode 100644 src/braket/default_simulator/openqasm/density_matrix_program_context.py create mode 100644 test/unit_tests/braket/default_simulator/openqasm/test_density_matrix_program_context.py diff --git a/src/braket/default_simulator/density_matrix_simulation.py b/src/braket/default_simulator/density_matrix_simulation.py index 84701a87..2bc7f714 100644 --- a/src/braket/default_simulator/density_matrix_simulation.py +++ b/src/braket/default_simulator/density_matrix_simulation.py @@ -13,7 +13,7 @@ import numpy as np -from braket.default_simulator.gate_operations import Projection, Reset +from braket.default_simulator.gate_operations import Reset from braket.default_simulator.linalg_utils import ( QuantumGateDispatcher, controlled_matrix, @@ -127,6 +127,81 @@ def _probabilities(state) -> np.ndarray: tol = 1e-20 return np.where((np.abs(diag) >= tol) & (diag >= 0), diag, 0.0) + @staticmethod + def project_unnormalized( + rho: np.ndarray, + qubit_count: int, + qubit_axis: int, + outcome: int, + ) -> tuple[np.ndarray, float]: + r"""Project a density matrix onto a single qubit's ``|outcome\rangle`` subspace + **without** renormalization. + + Computes :math:`P_b \rho P_b` where :math:`P_b` is the single-qubit computational basis + projector ``|outcome \rangle\langle outcome|`` acting on the qubit at ``qubit_axis``. + + This method never divides by the trace, so the returned matrix stays unnormalized + and a plain sum of sub-ensembles reconstructs the total density matrix. + The returned trace is the exact Branch_Probability of the measurement outcome. + + Args: + rho (np.ndarray): The density matrix to project, of shape + ``(2**qubit_count, 2**qubit_count)``. + qubit_count (int): The number of qubits in ``rho``. + qubit_axis (int): The big-endian axis of the measured qubit. + outcome (int): The measured bit value, either 0 or 1. + + Returns: + tuple[np.ndarray, float]: A tuple containing: + - The unnormalized projected density matrix :math:`P_b \rho P_b`. + - ``tr(P_b \rho P_b)``, the exact Branch_Probability. + """ + projector = ( + np.array([[1, 0], [0, 0]], dtype=complex) + if outcome == 0 + else np.array([[0, 0], [0, 1]], dtype=complex) + ) + dispatcher = QuantumGateDispatcher(rho.size) + original_shape = rho.shape + result = rho.copy() + result.shape = [2] * 2 * qubit_count + temp = np.zeros_like(result, dtype=complex) + result, _ = DensityMatrixSimulation._apply_gate( + result, + temp, + qubit_count, + projector, + (qubit_axis,), + (), + (), + dispatcher, + None, + ) + indices = list(range(qubit_count)) + probability = float(np.real(np.einsum(result, indices + indices))) + result = np.ascontiguousarray(result).reshape(original_shape) + return result, probability + + @staticmethod + def expand_with_ancilla(rho: np.ndarray, num_new: int) -> np.ndarray: + r"""Append ``num_new`` fresh ``|0\rangle`` qubits to a density matrix. + + Args: + rho (np.ndarray): The existing density matrix. Not modified. + num_new (int): The number of fresh ``|0\rangle`` qubits to append. If not positive, + ``rho`` is returned unchanged. + + Returns: + np.ndarray: The expanded density matrix ``rho \otimes |0 \rangle\langle 0|`` + of dimension ``2**num_new`` times larger per side. + """ + if num_new <= 0: + return rho + dim = 2**num_new + zero_projector = np.zeros((dim, dim), dtype=complex) + zero_projector[0, 0] = 1 + return np.kron(rho, zero_projector) + @staticmethod def _apply_operations( state: np.ndarray, @@ -155,11 +230,7 @@ def _apply_operations( work_buffer2 = np.zeros_like(result, dtype=complex) for operation in operations: - if isinstance(operation, Projection): - result, temp = DensityMatrixSimulation._apply_projection( - result, temp, qubit_count, operation, dispatcher - ) - elif isinstance(operation, Reset): + if isinstance(operation, Reset): result, temp = DensityMatrixSimulation._apply_reset( result, temp, work_buffer1, work_buffer2, qubit_count, operation, dispatcher ) @@ -192,30 +263,6 @@ def _apply_operations( result.shape = original_shape return result - @staticmethod - def _apply_projection( - result: np.ndarray, - temp: np.ndarray, - qubit_count: int, - operation: Projection, - dispatcher: QuantumGateDispatcher, - ) -> tuple[np.ndarray, np.ndarray]: - result, temp = DensityMatrixSimulation._apply_gate( - result, - temp, - qubit_count, - operation._base_matrix, - operation.targets, - (), - (), - dispatcher, - None, - ) - indices = list(range(qubit_count)) - norm = float(np.real(np.einsum(result, indices + indices))) - result /= norm - return result, temp - @staticmethod def _apply_reset( result: np.ndarray, diff --git a/src/braket/default_simulator/density_matrix_simulator.py b/src/braket/default_simulator/density_matrix_simulator.py index 4c534122..eeb6193f 100644 --- a/src/braket/default_simulator/density_matrix_simulator.py +++ b/src/braket/default_simulator/density_matrix_simulator.py @@ -14,17 +14,78 @@ import sys from braket.default_simulator import DensityMatrixSimulation +from braket.default_simulator.openqasm.density_matrix_program_context import ( + DensityMatrixProgramContext, +) +from braket.default_simulator.openqasm.program_context import AbstractProgramContext +from braket.default_simulator.result_types import TargetedResultType from braket.default_simulator.simulator import BaseLocalSimulator from braket.device_schema.simulators import ( GateModelSimulatorDeviceCapabilities, GateModelSimulatorDeviceParameters, ) from braket.ir.openqasm import Program as OpenQASMProgram +from braket.task_result import GateModelTaskResult class DensityMatrixSimulator(BaseLocalSimulator): DEVICE_ID = "braket_dm" + def create_program_context(self) -> AbstractProgramContext: + return DensityMatrixProgramContext(simulator=self) + + def _run_branched( + self, + context: AbstractProgramContext, + openqasm_ir: OpenQASMProgram, + shots: int, + batch_size: int, + ) -> GateModelTaskResult: + context._merge_subensembles() + + qubit_axis, m = context.active_qubit_axis_map() + + simulation = DensityMatrixSimulation(qubit_count=m, shots=shots) + simulation._density_matrix = context.total_density_matrix() + + if not shots: + results = self._analytical_results_from_total(context, openqasm_ir, simulation, m) + return self._create_results_obj(results, openqasm_ir, simulation) + + circuit = context.circuit + classical_bit_positions = {b: i for i, b in enumerate(circuit.target_classical_indices)} + measured_qubits = [ + circuit.measured_qubits[classical_bit_positions[i]] + for i in sorted(circuit.target_classical_indices) + ] + + mapped_measured_qubits = ( + [qubit_axis.get(q, m) for q in measured_qubits] if measured_qubits else None + ) + + return self._create_results_obj( + [], openqasm_ir, simulation, measured_qubits, mapped_measured_qubits + ) + + def _analytical_results_from_total( + self, + context: AbstractProgramContext, + openqasm_ir: OpenQASMProgram, + simulation: DensityMatrixSimulation, + m: int, + ) -> list: + circuit = context.circuit + result_types = BaseLocalSimulator._translate_result_types(circuit.results) + BaseLocalSimulator._validate_result_types_qubits_exist( + [ + result_type + for result_type in result_types + if isinstance(result_type, TargetedResultType) + ], + m, + ) + return BaseLocalSimulator._generate_results(circuit.results, result_types, simulation) + @staticmethod def _remove_verbatim_box(source: str) -> str: """Remove verbatim box wrappers while preserving the contents inside. diff --git a/src/braket/default_simulator/openqasm/density_matrix_program_context.py b/src/braket/default_simulator/openqasm/density_matrix_program_context.py new file mode 100644 index 00000000..6088afba --- /dev/null +++ b/src/braket/default_simulator/openqasm/density_matrix_program_context.py @@ -0,0 +1,370 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +from __future__ import annotations + +from collections.abc import Iterable +from copy import copy +from typing import TYPE_CHECKING, Any + +import numpy as np + +from braket.default_simulator.density_matrix_simulation import DensityMatrixSimulation +from braket.default_simulator.gate_operations import ( + BRAKET_GATES, + GPhase, + Reset, + Unitary, +) +from braket.default_simulator.noise_operations import ( + AmplitudeDamping, + BitFlip, + Depolarizing, + GeneralizedAmplitudeDamping, + Kraus, + PauliChannel, + PhaseDamping, + PhaseFlip, + TwoQubitDephasing, + TwoQubitDepolarizing, +) +from braket.default_simulator.operation import GateOperation, KrausOperation + +from .circuit import Circuit +from .parser.openqasm_ast import ( + ArrayLiteral, + BinaryExpression, + BooleanLiteral, + FloatLiteral, + IntegerLiteral, +) +from .program_context import ( + _BINARY_EQUALS, + _CLASSICAL_CONTROL_GATES, + ProgramContext, + _feedback_key_identifier, +) +from .simulation_path import FramedVariable, SubEnsemble + +if TYPE_CHECKING: # pragma: no cover + from braket.default_simulator.simulator import BaseLocalSimulator + +_ONE_PROB_NOISE_MAP = { + "bit_flip": BitFlip, + "phase_flip": PhaseFlip, + "pauli_channel": PauliChannel, + "depolarizing": Depolarizing, + "two_qubit_depolarizing": TwoQubitDepolarizing, + "two_qubit_dephasing": TwoQubitDephasing, + "amplitude_damping": AmplitudeDamping, + "generalized_amplitude_damping": GeneralizedAmplitudeDamping, + "phase_damping": PhaseDamping, +} + + +class DensityMatrixProgramContext(ProgramContext): + r"""Density matrix program context using Kraus formalism to perform mid-circuit measurement. + + Active qubits are assigned matrix axes in _insertion order_: the first physical qubit touched + gets axis 0, the next gets axis 1, and so on. Qubits discovered later are appended as the + next axes, which makes expanding every sub-ensemble a single ``np.kron`` with + ``|0 \rangle\langle 0|`` rather than a tensor transpose. + """ + + _TOL: float = 1e-10 + + def __init__(self, circuit: Circuit | None = None, simulator: BaseLocalSimulator | None = None): + super().__init__(circuit=circuit, simulator=simulator) + self._active_qubits = [] + self._qubit_axes = {} + + def _should_branch_on_mcm(self) -> bool: + """Always branch on a control-flow-relevant MCM, regardless of shot count.""" + return True + + def _ensure_qubits(self, targets: Iterable[int]) -> None: + new_qubits: list[int] = [] + for qubit in targets: + if qubit not in self._qubit_axes and qubit not in new_qubits: + new_qubits.append(qubit) + + if not new_qubits: + return + + for qubit in new_qubits: + self._qubit_axes[qubit] = len(self._active_qubits) + self._active_qubits.append(qubit) + + num_new = len(new_qubits) + for path in self._paths: + if isinstance(path, SubEnsemble) and path.density_matrix is not None: + path.density_matrix = DensityMatrixSimulation.expand_with_ancilla( + path.density_matrix, num_new + ) + + def _remap_to_axes(self, op: GateOperation | KrausOperation) -> GateOperation | KrausOperation: + remapped = copy(op) + remapped._targets = tuple(self._qubit_axes[q] for q in op.targets) + return remapped + + def _apply_to_active(self, op: GateOperation | KrausOperation) -> None: + self._ensure_qubits(op.targets) + qubit_count = len(self._active_qubits) + remapped = self._remap_to_axes(op) + for path_idx in self._active_path_indices: + sub = self._paths[path_idx] + sub.density_matrix = DensityMatrixSimulation._apply_operations( + sub.density_matrix, qubit_count, [remapped] + ) + + def total_density_matrix(self) -> np.ndarray | None: + """Return the total density matrix over all active sub-ensembles. + + This is the physical density matrix of the system marginalized over the classical tags. + + Returns: + np.ndarray | None: The summed density matrix, or ``None`` if no active + sub-ensemble holds a density matrix yet (e.g. before materialization). + """ + matrices = [ + self._paths[i].density_matrix + for i in self._active_path_indices + if isinstance(self._paths[i], SubEnsemble) and self._paths[i].density_matrix is not None + ] + if not matrices: + return None + total = matrices[0].copy() + for matrix in matrices[1:]: + total += matrix + return total + + def active_qubit_axis_map(self) -> tuple[dict[int, int], int]: + """The active qubit layout for runtime aggregation. + + Returns: + tuple[dict[int, int], int]: A copy of the physical-qubit-to-axis map and + the number of active qubits (``m``), where each sub-ensemble matrix has + shape ``2**m × 2**m``. + """ + return dict(self._qubit_axes), len(self._active_qubits) + + @staticmethod + def _normalize_value(value: Any) -> tuple: + match value: + case BooleanLiteral(value=v): + return ("bool", bool(v)) + case IntegerLiteral(value=v): + return ("int", int(v)) + case FloatLiteral(value=v): + return ("float", float(v)) + case ArrayLiteral(values=items): + return ( + "array", + tuple(DensityMatrixProgramContext._normalize_value(item) for item in items), + ) + case bool(): + return ("bool", value) + case int(): + return ("int", value) + case float(): + return ("float", value) + case _: + return ("repr", repr(value)) + + def _subensemble_signature(self, sub: SubEnsemble) -> frozenset: + return frozenset( + (name, self._normalize_value(var.value)) for name, var in sub.variables.items() + ) + + def _merge_subensembles(self) -> None: + if not self._is_branched or len(self._active_path_indices) <= 1: + return + + groups: dict[frozenset, int] = {} + for path_idx in self._active_path_indices: + sub = self._paths[path_idx] + sig = self._subensemble_signature(sub) + if sig in groups: + representative = self._paths[groups[sig]] + representative.density_matrix = representative.density_matrix + sub.density_matrix + sub.density_matrix = None + else: + groups[sig] = path_idx + self._active_path_indices = list(groups.values()) + + def evaluate_condition(self, condition): + yield from super().evaluate_condition(condition) + self._merge_subensembles() + + def evaluate_for_range(self, set_declaration, loop_var: str, loop_type): + try: + yield from super().evaluate_for_range(set_declaration, loop_var, loop_type) + finally: + self._merge_subensembles() + + def evaluate_while_condition(self, condition): + try: + yield from super().evaluate_while_condition(condition) + finally: + self._merge_subensembles() + + def _initialize_paths_from_circuit(self) -> None: + ordered_targets: list[int] = [] + for instruction in self._circuit.instructions: + ordered_targets.extend(instruction.targets) + self._ensure_qubits(ordered_targets) + + m = len(self._active_qubits) + rho = np.zeros((2**m, 2**m), dtype=complex) + rho[0, 0] = 1.0 + + remapped = [self._remap_to_axes(instruction) for instruction in self._circuit.instructions] + rho = DensityMatrixSimulation._apply_operations(rho, m, remapped) + + frame_number = self._paths[0].frame_number + sub = SubEnsemble(density_matrix=rho, frame_number=frame_number) + + for name, value in self.variable_table.items(): + var_type = self.get_type(name) + is_const = self.get_const(name) + fv = FramedVariable( + name=name, + var_type=var_type, + value=value, + is_const=bool(is_const), + frame_number=frame_number, + ) + sub.set_variable(name, fv) + + self._paths = [sub] + self._active_path_indices = [0] + + def _branch_single_qubit( + self, path_idx: int, qubit_idx: int, new_active_indices: list[int] + ) -> None: + self._ensure_qubits([qubit_idx]) + + axis = self._qubit_axes[qubit_idx] + sub = self._paths[path_idx] + qubit_count = len(self._active_qubits) + + rho0, p0 = DensityMatrixSimulation.project_unnormalized( + sub.density_matrix, qubit_count, axis, 0 + ) + rho1, p1 = DensityMatrixSimulation.project_unnormalized( + sub.density_matrix, qubit_count, axis, 1 + ) + + if p1 <= self._TOL: + sub.density_matrix = rho0 + sub.record_measurement(qubit_idx, 0) + new_active_indices.append(path_idx) + return + + if p0 <= self._TOL: + sub.density_matrix = rho1 + sub.record_measurement(qubit_idx, 1) + new_active_indices.append(path_idx) + return + + sub.density_matrix = rho0 + sub.record_measurement(qubit_idx, 0) + new_active_indices.append(path_idx) + + child = sub.branch() + child.density_matrix = rho1 + child._measurements[qubit_idx][-1] = 1 + new_active_indices.append(len(self._paths)) + self._paths.append(child) + + def add_gate_instruction( + self, gate_name: str, target: tuple[int, ...], params, ctrl_modifiers: list[int], power: int + ): + if gate_name in _CLASSICAL_CONTROL_GATES: + getattr(self, _CLASSICAL_CONTROL_GATES[gate_name].__name__)(target, params) + return + self._flush_pending_mcm_for_qubits(target) + if self._is_branched: + instruction = BRAKET_GATES[gate_name]( + target, *params, ctrl_modifiers=ctrl_modifiers, power=power + ) + self._apply_to_active(instruction) + else: + super().add_gate_instruction( + gate_name, target, params, ctrl_modifiers=ctrl_modifiers, power=power + ) + + def _handle_cc_prx(self, target: tuple[int, ...], params) -> None: + angle_1, angle_2, feedback_key = params[0], params[1], int(params[2]) + ff_var = _feedback_key_identifier(feedback_key) + try: + self.get_type(ff_var.name) + except KeyError as exc: + raise ValueError( + f"cc_prx references feedback key {feedback_key} but no measure_ff " + f"has been recorded for that key." + ) from exc + condition = BinaryExpression( + op=_BINARY_EQUALS, + lhs=ff_var, + rhs=IntegerLiteral(1), + ) + for branch in self.evaluate_condition(condition): + if branch: + instruction = BRAKET_GATES["prx"]( + target, angle_1, angle_2, ctrl_modifiers=[], power=1 + ) + self._apply_to_active(instruction) + + def add_phase_instruction(self, target: tuple[int], phase_value: int): + self._flush_pending_mcm_for_qubits(target) + if self._is_branched: + self._apply_to_active(GPhase(target, phase_value)) + else: + super().add_phase_instruction(target, phase_value) + + def add_custom_unitary( + self, + unitary: np.ndarray, + target: tuple[int, ...], + ) -> None: + self._flush_pending_mcm_for_qubits(target) + if self._is_branched: + self._apply_to_active(Unitary(target, unitary)) + else: + super().add_custom_unitary(unitary, target) + + def add_reset(self, target: list[int]) -> None: + self._flush_pending_mcm_for_qubits(target) + if self._is_branched: + for q in target: + self._apply_to_active(Reset([q])) + else: + super().add_reset(target) + + def add_noise_instruction( + self, noise_instruction: str, target: list[int], probabilities: list[float] + ): + self._flush_pending_mcm_for_qubits(target) + if self._is_branched: + instruction = _ONE_PROB_NOISE_MAP[noise_instruction](target, *probabilities) + self._apply_to_active(instruction) + else: + super().add_noise_instruction(noise_instruction, target, probabilities) + + def add_kraus_instruction(self, matrices: list[np.ndarray], target: list[int]): + self._flush_pending_mcm_for_qubits(target) + if self._is_branched: + self._apply_to_active(Kraus(target, matrices)) + else: + super().add_kraus_instruction(matrices, target) diff --git a/src/braket/default_simulator/openqasm/program_context.py b/src/braket/default_simulator/openqasm/program_context.py index 0fa6964c..7a5a4472 100644 --- a/src/braket/default_simulator/openqasm/program_context.py +++ b/src/braket/default_simulator/openqasm/program_context.py @@ -1174,6 +1174,9 @@ def supports_midcircuit_measurement(self) -> bool: """Whether this context supports mid-circuit measurement branching.""" return True + def _should_branch_on_mcm(self) -> bool: + return self._shots > 0 + def _flush_pending_mcm_targets(self) -> None: """Flush pending MCM targets to the circuit as regular measurements. @@ -1306,7 +1309,7 @@ def _flush_pending_mcm_for_variable(self, name: str) -> None: for mcm_target, mcm_classical, mcm_dest in self._pending_mcm_targets: dest_name = mcm_dest.name if isinstance(mcm_dest, Identifier) else mcm_dest.name.name if dest_name == name: - if not self._is_branched and self._shots > 0: + if not self._is_branched and self._should_branch_on_mcm(): self._is_branched = True self._initialize_paths_from_circuit() # Also flush any earlier pending measurements so the state is correct @@ -1362,7 +1365,7 @@ def _flush_pending_mcm_for_qubits(self, qubits: tuple[int, ...] | list[int]) -> if self._is_branched: for mcm_target, mcm_classical, mcm_dest in to_flush: self._branch_measurement(mcm_target, mcm_classical, mcm_dest) - elif self._shots > 0: + elif self._should_branch_on_mcm(): self._is_branched = True self._initialize_paths_from_circuit() # Flush to_flush first (preserving program order), then any @@ -1557,7 +1560,7 @@ def _maybe_transition_to_branched(self) -> None: Initializes paths from the circuit and retroactively applies all pending measurements. """ - if not self._is_branched and self._pending_mcm_targets and self._shots > 0: + if not self._is_branched and self._pending_mcm_targets and self._should_branch_on_mcm(): self._is_branched = True self._initialize_paths_from_circuit() for mcm_target, mcm_classical, mcm_dest in self._pending_mcm_targets: diff --git a/src/braket/default_simulator/openqasm/simulation_path.py b/src/braket/default_simulator/openqasm/simulation_path.py index e3571dfb..27a5f25e 100644 --- a/src/braket/default_simulator/openqasm/simulation_path.py +++ b/src/braket/default_simulator/openqasm/simulation_path.py @@ -16,6 +16,8 @@ from copy import deepcopy from typing import Any +import numpy as np + from braket.default_simulator.operation import GateOperation @@ -161,3 +163,43 @@ def record_measurement(self, qubit_idx: int, outcome: int) -> None: if qubit_idx not in self._measurements: self._measurements[qubit_idx] = [] self._measurements[qubit_idx].append(outcome) + + +class SubEnsemble(SimulationPath): + """A classically-tagged, unnormalized density matrix sub-ensemble. + + The trace of the density matrix equals the joint probability of the classical measurement bits + recorded in this sub-ensemble's tag. The inherited ``instructions``/``shots`` fields are unused + on the density matrix path. + """ + + def __init__( + self, + density_matrix: np.ndarray | None = None, + variables: dict[str, FramedVariable] | None = None, + measurements: dict[int, list[int]] | None = None, + frame_number: int = 0, + ): + super().__init__( + variables=variables, + measurements=measurements, + frame_number=frame_number, + ) + self.density_matrix = density_matrix + + @property + def trace(self) -> float: + """The trace of the density matrix. + + Equals the joint probability of the classical measurement bits in + this sub-ensemble's tag. + """ + return float(np.real(np.trace(self.density_matrix))) + + def branch(self) -> SubEnsemble: + return SubEnsemble( + density_matrix=self.density_matrix.copy(), + variables=deepcopy(self._variables), + measurements=deepcopy(self._measurements), + frame_number=self._frame_number, + ) diff --git a/test/unit_tests/braket/default_simulator/openqasm/test_density_matrix_program_context.py b/test/unit_tests/braket/default_simulator/openqasm/test_density_matrix_program_context.py new file mode 100644 index 00000000..9e353b44 --- /dev/null +++ b/test/unit_tests/braket/default_simulator/openqasm/test_density_matrix_program_context.py @@ -0,0 +1,108 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +import numpy as np + +from braket.default_simulator import gate_operations +from braket.default_simulator.noise_operations import Kraus +from braket.default_simulator.openqasm.density_matrix_program_context import ( + DensityMatrixProgramContext, +) +from braket.default_simulator.openqasm.parser.openqasm_ast import ( + ArrayLiteral, + BooleanLiteral, + FloatLiteral, + IntegerLiteral, +) + + +def test_total_density_matrix_none_before_materialization(): + """Before any sub-ensemble matrix exists, total_density_matrix returns None. + + Reachable only directly: a full run always calls ``total_density_matrix`` + after materialization inside ``_run_branched``. + """ + context = DensityMatrixProgramContext() + assert context.total_density_matrix() is None + + +def test_normalize_value_covers_literal_and_plain_python_types(): + """``_normalize_value`` reduces each supported value to a canonical hashable key. + + Covers every recognized type so the merge signature is stable and exact: + AST literals (bool/int/float/array) and the plain-Python fallbacks + (``bool`` before ``int`` because ``bool`` subclasses ``int``), plus the + ``repr`` fallback for unrecognized values. The plain ``bool``/``float`` arms + are not reachable end-to-end (classical values arrive as AST literals). + """ + nv = DensityMatrixProgramContext._normalize_value + + # AST literal nodes. + assert nv(BooleanLiteral(True)) == ("bool", True) + assert nv(IntegerLiteral(3)) == ("int", 3) + assert nv(FloatLiteral(0.5)) == ("float", 0.5) + assert nv(ArrayLiteral(values=[IntegerLiteral(0), IntegerLiteral(1)])) == ( + "array", + (("int", 0), ("int", 1)), + ) + + # Plain Python values: bool must be tagged "bool" even though it is an int. + assert nv(True) == ("bool", True) + assert nv(2) == ("int", 2) + assert nv(0.25) == ("float", 0.25) + + # Unrecognized values fall back to a hashable repr key. + assert nv("hello") == ("repr", repr("hello")) + + # An IntegerLiteral and the same plain int collapse to the same key, while a + # plain bool and a plain int stay distinct. + assert nv(IntegerLiteral(1)) == nv(1) + assert nv(True) != nv(1) + + +def test_add_custom_unitary_not_branched_accumulates_into_circuit(): + """Before branching, a custom unitary is deferred to the base and accumulated. + + The branched arm is covered end-to-end in ``test_mcm.py``; this guards the + un-branched delegation to ``ProgramContext``. + """ + context = DensityMatrixProgramContext() + context._shots = 100 + x_matrix = np.array([[0, 1], [1, 0]], dtype=complex) + + context.add_custom_unitary(x_matrix, (0,)) + + assert context.is_branched is False + assert len(context._circuit.instructions) == 1 + assert isinstance(context._circuit.instructions[0], gate_operations.Unitary) + + +def test_add_kraus_instruction_not_branched_accumulates_into_circuit(): + """Before branching, a Kraus channel is deferred to the base and accumulated. + + The branched arm is covered end-to-end in ``test_mcm.py``; this guards the + un-branched delegation to ``ProgramContext``. + """ + context = DensityMatrixProgramContext() + context._shots = 100 + kraus_matrices = [ + np.sqrt(0.8) * np.array([[1, 0], [0, 1]], dtype=complex), + np.sqrt(0.2) * np.array([[0, 1], [1, 0]], dtype=complex), + ] + + context.add_kraus_instruction(kraus_matrices, [0]) + + assert context.is_branched is False + assert len(context._circuit.instructions) == 1 + assert isinstance(context._circuit.instructions[0], Kraus) + assert context._active_qubits == [] diff --git a/test/unit_tests/braket/default_simulator/openqasm/test_simulation_path.py b/test/unit_tests/braket/default_simulator/openqasm/test_simulation_path.py index 6e1f05b9..5a763fe6 100644 --- a/test/unit_tests/braket/default_simulator/openqasm/test_simulation_path.py +++ b/test/unit_tests/braket/default_simulator/openqasm/test_simulation_path.py @@ -13,7 +13,13 @@ from unittest.mock import MagicMock -from braket.default_simulator.openqasm.simulation_path import FramedVariable, SimulationPath +import numpy as np + +from braket.default_simulator.openqasm.simulation_path import ( + FramedVariable, + SimulationPath, + SubEnsemble, +) class TestFramedVariable: @@ -168,3 +174,120 @@ def test_nested_frames(self): path.exit_frame(frame0) assert set(path.variables.keys()) == {"a"} + + +class TestSubEnsemble: + @staticmethod + def _single_qubit_dm(p0: float) -> np.ndarray: + """Diagonal single-qubit density matrix with population p0 in |0>.""" + return np.array([[p0, 0.0], [0.0, 1.0 - p0]], dtype=complex) + + def test_default_init(self): + sub = SubEnsemble() + assert sub.density_matrix is None + assert sub.variables == {} + assert sub.measurements == {} + assert sub.frame_number == 0 + + def test_is_simulation_path(self): + """SubEnsemble retains the inherited SimulationPath interface.""" + sub = SubEnsemble(density_matrix=self._single_qubit_dm(0.5)) + assert isinstance(sub, SimulationPath) + + def test_init_with_classical_state(self): + var = FramedVariable("x", int, 5, False, 0) + sub = SubEnsemble( + density_matrix=self._single_qubit_dm(0.25), + variables={"x": var}, + measurements={0: [1]}, + frame_number=2, + ) + assert sub.variables["x"].value == 5 + assert sub.measurements == {0: [1]} + assert sub.frame_number == 2 + + def test_trace_reflects_matrix(self): + """Validates: Requirements 1.5 - trace equals the matrix trace.""" + sub = SubEnsemble(density_matrix=self._single_qubit_dm(0.5)) + assert np.isclose(sub.trace, 1.0) + + def test_trace_of_unnormalized_matrix(self): + """Validates: Requirements 1.5 - trace of an unnormalized branch.""" + # An unnormalized branch with trace 0.3 (joint probability of its tag). + sub = SubEnsemble(density_matrix=np.array([[0.3, 0.0], [0.0, 0.0]], dtype=complex)) + assert np.isclose(sub.trace, 0.3) + + def test_trace_is_real_float(self): + """Trace is returned as a real Python float even for complex matrices.""" + rho = np.array([[0.5, 0.2j], [-0.2j, 0.5]], dtype=complex) + sub = SubEnsemble(density_matrix=rho) + assert isinstance(sub.trace, float) + assert np.isclose(sub.trace, 1.0) + + def test_branch_yields_independent_matrix(self): + """Validates: Requirements 1.5 - branch copies the density matrix.""" + rho = self._single_qubit_dm(0.5) + parent = SubEnsemble(density_matrix=rho) + child = parent.branch() + + assert isinstance(child, SubEnsemble) + assert np.allclose(child.density_matrix, parent.density_matrix) + + # Mutating the child matrix must not affect the parent. + child.density_matrix[0, 0] = 0.9 + assert parent.density_matrix[0, 0] == 0.5 + # The original source array is also untouched (branch copies). + assert rho[0, 0] == 0.5 + + def test_branch_yields_independent_classical_state(self): + """Validates: Requirements 9.6, 9.7 - branch deep-copies classical state.""" + var = FramedVariable("x", int, 10, False, 0) + parent = SubEnsemble( + density_matrix=self._single_qubit_dm(0.5), + variables={"x": var}, + measurements={0: [1]}, + frame_number=1, + ) + child = parent.branch() + + assert child.variables["x"].value == 10 + assert child.measurements == {0: [1]} + assert child.frame_number == 1 + + # Modifying child classical state does not affect parent. + child.variables["x"].value = 99 + child.record_measurement(0, 0) + child.set_variable("y", FramedVariable("y", int, 7, False, 1)) + + assert parent.variables["x"].value == 10 + assert parent.measurements == {0: [1]} + assert "y" not in parent.variables + + def test_branch_preserves_trace(self): + """A branched copy has the same trace as its parent.""" + sub = SubEnsemble(density_matrix=np.array([[0.4, 0.0], [0.0, 0.0]], dtype=complex)) + child = sub.branch() + assert np.isclose(child.trace, sub.trace) + + def test_inherited_frame_scoping(self): + """Validates: Requirements 9.7 - inherited frame scoping still works.""" + sub = SubEnsemble(density_matrix=self._single_qubit_dm(0.5)) + sub.set_variable("outer", FramedVariable("outer", int, 1, False, 0)) + + prev = sub.enter_frame() + assert sub.frame_number == 1 + sub.set_variable("inner", FramedVariable("inner", int, 2, False, 1)) + assert "inner" in sub.variables + + sub.exit_frame(prev) + assert "outer" in sub.variables + assert "inner" not in sub.variables + assert sub.frame_number == 0 + + def test_inherited_record_measurement(self): + """Validates: Requirements 9.6 - inherited measurement recording works.""" + sub = SubEnsemble(density_matrix=self._single_qubit_dm(0.5)) + sub.record_measurement(0, 1) + sub.record_measurement(0, 0) + sub.record_measurement(2, 1) + assert sub.measurements == {0: [1, 0], 2: [1]} diff --git a/test/unit_tests/braket/default_simulator/test_density_matrix_simulation.py b/test/unit_tests/braket/default_simulator/test_density_matrix_simulation.py index 5cdbd67a..492a4826 100644 --- a/test/unit_tests/braket/default_simulator/test_density_matrix_simulation.py +++ b/test/unit_tests/braket/default_simulator/test_density_matrix_simulation.py @@ -18,6 +18,7 @@ from braket.default_simulator import gate_operations, noise_operations, observables from braket.default_simulator.density_matrix_simulation import DensityMatrixSimulation +from braket.default_simulator.linalg_utils import partial_trace sx = np.array([[0, 1], [1, 0]], dtype=complex) si = np.array([[1, 0], [0, 1]], dtype=complex) @@ -520,3 +521,149 @@ def test_toffoli_gate(): prob_7 = counter[7] / total_shots assert abs(prob_2 - 0.5) < 0.1 assert abs(prob_7 - 0.5) < 0.1 + + +# --------------------------------------------------------------------------- +# Kraus-native MCM helpers: project_unnormalized and expand_with_ancilla +# --------------------------------------------------------------------------- + + +def _density_matrix_after(qubit_count, operations): + """Helper: evolve |0...0> under the given operations and return the density matrix.""" + simulation = DensityMatrixSimulation(qubit_count, 0) + simulation.evolve(operations) + return simulation.density_matrix + + +def test_project_unnormalized_superposition_traces_are_analytic_probabilities(): + """H|0> -> measure: both outcomes have probability 0.5 (Req 1.2, 15.1).""" + rho = _density_matrix_after(1, [gate_operations.Hadamard([0])]) + + rho0, p0 = DensityMatrixSimulation.project_unnormalized(rho, 1, 0, 0) + rho1, p1 = DensityMatrixSimulation.project_unnormalized(rho, 1, 0, 1) + + assert np.isclose(p0, 0.5, atol=1e-12) + assert np.isclose(p1, 0.5, atol=1e-12) + + # P_b rho P_b is the unnormalized projected matrix (trace == probability). + expected_rho0 = np.array([[0.5, 0], [0, 0]], dtype=complex) + expected_rho1 = np.array([[0, 0], [0, 0.5]], dtype=complex) + assert np.allclose(rho0, expected_rho0, atol=1e-12) + assert np.allclose(rho1, expected_rho1, atol=1e-12) + # Trace of the returned matrix equals the reported probability (no renormalization). + assert np.isclose(np.real(np.trace(rho0)), p0, atol=1e-12) + assert np.isclose(np.real(np.trace(rho1)), p1, atol=1e-12) + + +def test_project_unnormalized_bell_partner_is_deterministic(): + """For a Bell pair, measuring one qubit's partner outcome is deterministic given + the first measurement (Req 1.2, 15.1). + + Project qubit 0 of the Bell state onto |0>; the resulting (unnormalized) matrix + then has the partner qubit 1 deterministically in |0> as well. + """ + bell = _density_matrix_after(2, [gate_operations.Hadamard([0]), gate_operations.CX([0, 1])]) + + # Probabilities of measuring qubit 0. + rho_q0_0, p0 = DensityMatrixSimulation.project_unnormalized(bell, 2, 0, 0) + rho_q0_1, p1 = DensityMatrixSimulation.project_unnormalized(bell, 2, 0, 1) + assert np.isclose(p0, 0.5, atol=1e-12) + assert np.isclose(p1, 0.5, atol=1e-12) + + # Given qubit 0 == 0, measuring qubit 1 is deterministic: P(q1==1) == 0. + _, p1_given_q0_0_is_one = DensityMatrixSimulation.project_unnormalized(rho_q0_0, 2, 1, 1) + _, p1_given_q0_0_is_zero = DensityMatrixSimulation.project_unnormalized(rho_q0_0, 2, 1, 0) + assert np.isclose(p1_given_q0_0_is_one, 0.0, atol=1e-12) + assert np.isclose(p1_given_q0_0_is_zero, 0.5, atol=1e-12) + + # Given qubit 0 == 1, measuring qubit 1 is deterministically 1. + _, p1_given_q0_1_is_one = DensityMatrixSimulation.project_unnormalized(rho_q0_1, 2, 1, 1) + _, p1_given_q0_1_is_zero = DensityMatrixSimulation.project_unnormalized(rho_q0_1, 2, 1, 0) + assert np.isclose(p1_given_q0_1_is_one, 0.5, atol=1e-12) + assert np.isclose(p1_given_q0_1_is_zero, 0.0, atol=1e-12) + + +@pytest.mark.parametrize( + "qubit_count, operations, qubit_axis", + [ + (1, [gate_operations.Hadamard([0])], 0), + (1, [gate_operations.RotX([0], np.pi / 3)], 0), + (2, [gate_operations.Hadamard([0]), gate_operations.CX([0, 1])], 0), + (2, [gate_operations.Hadamard([0]), gate_operations.CX([0, 1])], 1), + (3, [gate_operations.Hadamard([0]), gate_operations.Hadamard([1])], 2), + ], +) +def test_project_unnormalized_split_conserves_parent_trace(qubit_count, operations, qubit_axis): + """p0 + p1 == trace(rho) within tolerance (Req 15.4).""" + rho = _density_matrix_after(qubit_count, operations) + parent_trace = float(np.real(np.trace(rho))) + + _, p0 = DensityMatrixSimulation.project_unnormalized(rho, qubit_count, qubit_axis, 0) + _, p1 = DensityMatrixSimulation.project_unnormalized(rho, qubit_count, qubit_axis, 1) + + assert np.isclose(p0 + p1, parent_trace, atol=1e-12) + + +def test_project_unnormalized_does_not_mutate_input(): + """project_unnormalized must not modify the input density matrix.""" + rho = _density_matrix_after(1, [gate_operations.Hadamard([0])]) + rho_copy = rho.copy() + DensityMatrixSimulation.project_unnormalized(rho, 1, 0, 0) + assert np.allclose(rho, rho_copy, atol=1e-15) + + +def test_expand_with_ancilla_appends_zero_and_preserves_marginals(): + """rho ⊗ |0><0| keeps existing-qubit marginals and adds a |0> qubit (Req 8.1).""" + rho = _density_matrix_after(1, [gate_operations.Hadamard([0])]) + + expanded = DensityMatrixSimulation.expand_with_ancilla(rho, 1) + + assert expanded.shape == (4, 4) + # Trace is preserved. + assert np.isclose(np.real(np.trace(expanded)), np.real(np.trace(rho)), atol=1e-12) + + # The new (least-significant) qubit is in |0>: reduced density matrix == |0><0|. + reshaped = np.reshape(expanded, [2] * 2 * 2) + new_qubit_marginal = partial_trace(reshaped, [1]) + assert np.allclose(new_qubit_marginal, np.array([[1, 0], [0, 0]], dtype=complex), atol=1e-12) + + # The existing qubit's marginal is unchanged. + existing_qubit_marginal = partial_trace(reshaped, [0]) + assert np.allclose(existing_qubit_marginal, rho, atol=1e-12) + + +def test_expand_with_ancilla_matches_explicit_evolution(): + """Expanding a 1-qubit state with an ancilla equals evolving the same gate on a + 2-qubit |00> register (the ancilla axis is least-significant).""" + rho_1q = _density_matrix_after(1, [gate_operations.Hadamard([0])]) + expanded = DensityMatrixSimulation.expand_with_ancilla(rho_1q, 1) + + rho_2q = _density_matrix_after(2, [gate_operations.Hadamard([0])]) + assert np.allclose(expanded, rho_2q, atol=1e-12) + + +def test_expand_with_ancilla_multiple_qubits(): + """Appending multiple ancillas grows the dimension by 2 per qubit and adds |0...0>.""" + rho = _density_matrix_after(1, [gate_operations.Hadamard([0])]) + + expanded = DensityMatrixSimulation.expand_with_ancilla(rho, 2) + + assert expanded.shape == (8, 8) + assert np.isclose(np.real(np.trace(expanded)), np.real(np.trace(rho)), atol=1e-12) + # Both ancillas in |0>. + reshaped = np.reshape(expanded, [2] * 2 * 3) + assert np.allclose(partial_trace(reshaped, [0]), rho, atol=1e-12) + assert np.allclose( + partial_trace(reshaped, [1]), np.array([[1, 0], [0, 0]], dtype=complex), atol=1e-12 + ) + assert np.allclose( + partial_trace(reshaped, [2]), np.array([[1, 0], [0, 0]], dtype=complex), atol=1e-12 + ) + + +@pytest.mark.parametrize("num_new", [0, -1]) +def test_expand_with_ancilla_noop_for_nonpositive(num_new): + """expand_with_ancilla returns rho unchanged when num_new <= 0.""" + rho = _density_matrix_after(1, [gate_operations.Hadamard([0])]) + result = DensityMatrixSimulation.expand_with_ancilla(rho, num_new) + assert result is rho diff --git a/test/unit_tests/braket/default_simulator/test_density_matrix_simulator.py b/test/unit_tests/braket/default_simulator/test_density_matrix_simulator.py index aac94741..269817c2 100644 --- a/test/unit_tests/braket/default_simulator/test_density_matrix_simulator.py +++ b/test/unit_tests/braket/default_simulator/test_density_matrix_simulator.py @@ -1074,3 +1074,115 @@ def test_remove_verbatim_box_with_nested_braces(): assert "box{" not in result assert "if (b == 1) {" in result assert "x $0;" in result + + +class TestDensityMatrixSimulatorBranchedRun: + """End-to-end coverage for the Kraus-native branched density-matrix run. + + These tests exercise the ``DensityMatrixSimulator`` overrides wired in for the + Kraus-native MCM path: ``create_program_context`` (which returns a + ``DensityMatrixProgramContext``) and ``_run_branched`` (which forms + ``ρ_total = Σ ρ_sub`` and draws all shots in a single pass). They assert that a + branched program returns measurements in the same format and column ordering as + the single-path density-matrix output, reports ``measured_qubits`` as the original + physical identifiers, and produces exactly the requested number of shots. + """ + + def test_create_program_context_returns_density_matrix_context(self): + """``create_program_context`` returns a ``DensityMatrixProgramContext`` so the + Kraus-native MCM path is used (Req 13.1).""" + from braket.default_simulator.openqasm.density_matrix_program_context import ( + DensityMatrixProgramContext, + ) + + context = DensityMatrixSimulator().create_program_context() + assert isinstance(context, DensityMatrixProgramContext) + + def test_branched_run_shot_count_format_and_measured_qubits(self): + """A branched Bell-pair MCM program returns the requested shot count, a + two-column measurement matrix, original physical ``measuredQubits``, and only + the physically reachable outcomes (Req 2.2, 2.3, 2.4).""" + qasm = """ + OPENQASM 3.0; + bit[2] b; + qubit[2] q; + h q[0]; + cnot q[0], q[1]; + b[0] = measure q[0]; + if (b[0] == 1) { + x q[1]; + } + b[1] = measure q[1]; + """ + shots = 2000 + result = DensityMatrixSimulator().run(OpenQASMProgram(source=qasm), shots=shots) + + # Requested shot count drawn in a single final pass. + assert len(result.measurements) == shots + assert result.taskMetadata.shots == shots + # Same format/column ordering as single-path DM output: one column per + # recorded measurement, reported against original physical identifiers. + assert result.measuredQubits == [0, 1] + assert all(len(m) == 2 for m in result.measurements) + assert all(bit in ("0", "1") for m in result.measurements for bit in m) + + # q[1] is Bell-correlated with q[0] then flipped iff b[0]==1, so it is always + # |0>; only "00" and "10" are reachable (~50/50). + counts = Counter("".join(m) for m in result.measurements) + assert set(counts) <= {"00", "10"} + for key in ("00", "10"): + assert 0.4 < counts.get(key, 0) / shots < 0.6 + + def test_branched_run_reports_noncontiguous_physical_identifiers(self): + """A branched program touching only sparse, high-index qubits reports the + original physical qubit identifiers and selects exactly those columns, + consistent with current measured-qubit selection behavior (Req 2.4, 8.3).""" + qasm = """ + OPENQASM 3.0; + bit[2] b; + qubit[18] q; + h q[13]; + b[0] = measure q[13]; + if (b[0] == 1) { + prx(3.141592653589793, 0.0) q[17]; + } + b[0] = measure q[13]; + b[1] = measure q[17]; + """ + shots = 2000 + result = DensityMatrixSimulator().run(OpenQASMProgram(source=qasm), shots=shots) + + assert len(result.measurements) == shots + # Original physical identifiers, not the contiguous matrix axes. + assert result.measuredQubits == [13, 17] + assert all(len(m) == 2 for m in result.measurements) + + # q[17] flips iff b[0]==1, so the two recorded columns are perfectly + # correlated: only "00" and "11" are reachable (~50/50). + counts = Counter("".join(m) for m in result.measurements) + assert set(counts) <= {"00", "11"} + for key in ("00", "11"): + assert 0.4 < counts.get(key, 0) / shots < 0.6 + + def test_branched_run_uses_kraus_native_path(self): + """Parsing a measurement-conditioned program with shots > 0 transitions the + density-matrix context to branched mode, routing the run through the + overridden ``_run_branched`` rather than the single-path aggregation.""" + qasm = """ + OPENQASM 3.0; + bit[2] b; + qubit[2] q; + h q[0]; + b[0] = measure q[0]; + if (b[0] == 1) { + x q[1]; + } + b[1] = measure q[1]; + """ + simulator = DensityMatrixSimulator() + context = simulator._parse_program_with_shots(OpenQASMProgram(source=qasm), 100) + assert context.is_branched + # ρ_total is the exact mixed state with unit trace. + rho_total = context.total_density_matrix() + assert rho_total is not None + assert np.isclose(np.real(np.trace(rho_total)), 1.0) diff --git a/test/unit_tests/braket/default_simulator/test_mcm.py b/test/unit_tests/braket/default_simulator/test_mcm.py index 50080a83..7093920c 100644 --- a/test/unit_tests/braket/default_simulator/test_mcm.py +++ b/test/unit_tests/braket/default_simulator/test_mcm.py @@ -11,6 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. +import numpy as np import pytest from collections import Counter @@ -42,7 +43,7 @@ class TestStateVectorSimulatorOperatorsOpenQASM: - def test_3_2_complex_conditional_logic(self): + def test_3_2_complex_conditional_logic(self, simulator): """3.2 Complex conditional logic""" qasm_source = """ OPENQASM 3.0; @@ -77,7 +78,6 @@ def test_3_2_complex_conditional_logic(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Complex logic analysis: @@ -95,7 +95,7 @@ def test_3_2_complex_conditional_logic(self): f"Expected {expected_outcomes}, got {set(counter.keys())}" ) - def test_3_3_multiple_measurements_and_branching_paths(self): + def test_3_3_multiple_measurements_and_branching_paths(self, simulator): """3.3 Multiple measurements and branching paths""" qasm_source = """ OPENQASM 3.0; @@ -122,7 +122,6 @@ def test_3_3_multiple_measurements_and_branching_paths(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Should see all four possible measurement combinations for first two qubits @@ -139,7 +138,7 @@ def test_3_3_multiple_measurements_and_branching_paths(self): ratio = counter[outcome] / total assert 0.15 < ratio < 0.35, f"Expected ~0.25 for {outcome}, got {ratio}" - def test_4_1_classical_variable_manipulation_with_branching(self): + def test_4_1_classical_variable_manipulation_with_branching(self, simulator): """4.1 Classical variable manipulation with branching""" qasm_source = """ OPENQASM 3.0; @@ -170,7 +169,6 @@ def test_4_1_classical_variable_manipulation_with_branching(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) counter = Counter(["".join(m) for m in result.measurements]) @@ -186,7 +184,7 @@ def test_4_1_classical_variable_manipulation_with_branching(self): # count=2 path: ~25% assert 0.15 < counter["111"] / total < 0.35 - def test_4_2_additional_data_types_and_operations_with_branching(self): + def test_4_2_additional_data_types_and_operations_with_branching(self, simulator): """4.2 Additional data types and operations with branching""" qasm_source = """ OPENQASM 3.0; @@ -215,7 +213,6 @@ def test_4_2_additional_data_types_and_operations_with_branching(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) counter = Counter(["".join(m) for m in result.measurements]) @@ -323,7 +320,7 @@ def test_4_4_complex_classical_operations(self): total = sum(counter.values()) assert total == 1000 - def test_5_1_loop_dependent_on_measurement_results_with_branching(self): + def test_5_1_loop_dependent_on_measurement_results_with_branching(self, simulator): """5.1 Loop dependent on measurement results with branching. While loop with compound condition (b == 0 && count <= 3) and MCM inside. @@ -348,7 +345,6 @@ def test_5_1_loop_dependent_on_measurement_results_with_branching(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) assert len(result.measurements) == 1000 @@ -475,7 +471,7 @@ def test_5_3_complex_control_flow(self): assert 0.27 < ratio_01 < 0.47, f"Expected ~0.375 for |01⟩, got {ratio_01}" assert 0.05 < ratio_00 < 0.2, f"Expected ~0.125 for |00⟩, got {ratio_00}" - def test_5_4_array_operations_and_indexing(self): + def test_5_4_array_operations_and_indexing(self, simulator): """5.4 Array Operations and Indexing""" qasm_source = """ OPENQASM 3.0; @@ -497,7 +493,6 @@ def test_5_4_array_operations_and_indexing(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Array operations analysis: @@ -522,7 +517,7 @@ def test_5_4_array_operations_and_indexing(self): ratio = counter[outcome] / total assert 0.15 < ratio < 0.35, f"Expected ~0.25 for {outcome}, got {ratio}" - def test_6_2_quantum_phase_estimation(self): + def test_6_2_quantum_phase_estimation(self, simulator): """6.2 Quantum Phase Estimation — exercises nested for-loops with negative step.""" qasm_source = """ OPENQASM 3.0; @@ -552,7 +547,6 @@ def test_6_2_quantum_phase_estimation(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) counter = Counter(["".join(m) for m in result.measurements]) @@ -561,7 +555,7 @@ def test_6_2_quantum_phase_estimation(self): for outcome in counter: assert len(outcome) == 3 - def test_6_3_dynamic_circuit_features(self): + def test_6_3_dynamic_circuit_features(self, simulator): """6.3 Dynamic Circuit Features""" qasm_source = """ OPENQASM 3.0; @@ -587,7 +581,6 @@ def test_6_3_dynamic_circuit_features(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Dynamic circuit features analysis: @@ -613,7 +606,7 @@ def test_6_3_dynamic_circuit_features(self): ratio = counter[outcome] / total assert 0.05 < ratio < 0.95, f"Unexpected probability {ratio} for outcome {outcome}" - def test_6_4_quantum_fourier_transform(self): + def test_6_4_quantum_fourier_transform(self, simulator): """6.4 Quantum Fourier Transform""" qasm_source = """ OPENQASM 3.0; @@ -639,7 +632,6 @@ def test_6_4_quantum_fourier_transform(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) counter = Counter(["".join(m) for m in result.measurements]) @@ -690,7 +682,7 @@ def measure_and_reset(qubit q, bit b) -> bit { total = sum(counter.values()) assert total == 1000 - def test_7_2_custom_gates_with_control_flow(self): + def test_7_2_custom_gates_with_control_flow(self, simulator): """7.2 Custom Gates with Control Flow""" qasm_source = """ OPENQASM 3.0; @@ -728,7 +720,6 @@ def adaptive_gate(qubit q1, qubit q2, bit measurement) { """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Verify simulation completed successfully @@ -740,7 +731,7 @@ def adaptive_gate(qubit q1, qubit q2, bit measurement) { total = sum(counter.values()) assert total == 1000 - def test_8_1_maximum_recursion(self): + def test_8_1_maximum_recursion(self, simulator): """8.1 Maximum Recursion""" qasm_source = """ OPENQASM 3.0; @@ -765,7 +756,6 @@ def test_8_1_maximum_recursion(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Maximum recursion analysis: @@ -799,7 +789,7 @@ def test_8_1_maximum_recursion(self): f"Expected '10' to be very rare (<2%), got {counter}" ) - def test_9_1_basic_gate_modifiers(self): + def test_9_1_basic_gate_modifiers(self, simulator): """9.1 Basic gate modifiers""" qasm_source = """ OPENQASM 3.0; @@ -818,7 +808,6 @@ def test_9_1_basic_gate_modifiers(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Basic gate modifiers analysis: @@ -843,7 +832,7 @@ def test_9_1_basic_gate_modifiers(self): ratio_q1_one = outcomes_with_q1_one / total assert ratio_q1_one > 0.9, f"Expected >90% to have q[1]=1, got {ratio_q1_one}" - def test_9_2_control_modifiers(self): + def test_9_2_control_modifiers(self, simulator): """9.2 Control modifiers""" qasm_source = """ OPENQASM 3.0; @@ -866,7 +855,6 @@ def test_9_2_control_modifiers(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Control modifiers analysis: @@ -893,7 +881,7 @@ def test_9_2_control_modifiers(self): assert 0.4 < ratio_110 < 0.6, f"Expected ~0.5 for |100⟩, got {ratio_110}" assert 0.4 < ratio_111 < 0.6, f"Expected ~0.5 for |111⟩, got {ratio_111}" - def test_9_3_negative_control_modifiers(self): + def test_9_3_negative_control_modifiers(self, simulator): """9.3 Negative control modifiers""" qasm_source = """ OPENQASM 3.0; @@ -912,7 +900,6 @@ def test_9_3_negative_control_modifiers(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Verify that negative control modifiers work @@ -930,7 +917,7 @@ def test_9_3_negative_control_modifiers(self): for outcome in counter.keys(): assert outcome in valid_outcomes, f"Unexpected outcome: {outcome}" - def test_9_4_multiple_modifiers(self): + def test_9_4_multiple_modifiers(self, simulator): """9.4 Multiple modifiers""" qasm_source = """ OPENQASM 3.0; @@ -950,7 +937,6 @@ def test_9_4_multiple_modifiers(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Multiple modifiers analysis: @@ -976,7 +962,7 @@ def test_9_4_multiple_modifiers(self): assert 0.4 < ratio_00 < 0.6, f"Expected ~0.5 for |00⟩, got {ratio_00}" assert 0.4 < ratio_11 < 0.6, f"Expected ~0.5 for |11⟩, got {ratio_11}" - def test_9_5_gphase_gate(self): + def test_9_5_gphase_gate(self, simulator): """9.5 GPhase gate""" qasm_source = """ OPENQASM 3.0; @@ -999,7 +985,6 @@ def test_9_5_gphase_gate(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # GPhase gate analysis: @@ -1021,7 +1006,7 @@ def test_9_5_gphase_gate(self): ratio = counter[outcome] / total assert 0.15 < ratio < 0.35, f"Expected ~0.25 for {outcome}, got {ratio}" - def test_9_6_power_modifiers_with_parametric_angles(self): + def test_9_6_power_modifiers_with_parametric_angles(self, simulator): """9.6 Power modifiers with parametric angles""" qasm_source = """ OPENQASM 3.0; @@ -1037,7 +1022,6 @@ def test_9_6_power_modifiers_with_parametric_angles(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Power modifiers with parametric angles analysis: @@ -1064,7 +1048,7 @@ def test_9_6_power_modifiers_with_parametric_angles(self): f"Expected both outcomes to have significant probability, got {ratio} for {outcome}" ) - def test_10_1_local_scope_blocks_inherit_variables(self): + def test_10_1_local_scope_blocks_inherit_variables(self, simulator): """10.1 Local scope blocks inherit variables""" qasm_source = """ OPENQASM 3.0; @@ -1093,7 +1077,6 @@ def test_10_1_local_scope_blocks_inherit_variables(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Local scope blocks analysis: @@ -1118,7 +1101,7 @@ def test_10_1_local_scope_blocks_inherit_variables(self): assert 0.4 < ratio_0 < 0.6, f"Expected ~0.5 for |0⟩, got {ratio_0}" assert 0.4 < ratio_1 < 0.6, f"Expected ~0.5 for |1⟩, got {ratio_1}" - def test_10_2_for_loop_iteration_variable_lifetime(self): + def test_10_2_for_loop_iteration_variable_lifetime(self, simulator): """10.2 For loop iteration variable lifetime""" qasm_source = """ OPENQASM 3.0; @@ -1141,7 +1124,6 @@ def test_10_2_for_loop_iteration_variable_lifetime(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # For loop iteration variable lifetime analysis: @@ -1227,7 +1209,7 @@ def test_11_1_adder(self): for outcome in counter: assert all(bit in "01" for bit in outcome), f"Invalid bits in outcome {outcome}" - def test_11_2_gphase(self): + def test_11_2_gphase(self, simulator): """11.2 GPhase""" qasm_source = """ qubit[2] qs; @@ -1253,7 +1235,6 @@ def test_11_2_gphase(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) # GPhase operations analysis: @@ -1282,7 +1263,7 @@ def test_11_2_gphase(self): assert 0.3 < ratio_00 < 0.7, f"Expected ~0.5 for |00⟩, got {ratio_00}" assert 0.3 < ratio_11 < 0.7, f"Expected ~0.5 for |11⟩, got {ratio_11}" - def test_11_3_gate_def_with_argument_manipulation(self): + def test_11_3_gate_def_with_argument_manipulation(self, simulator): """11.3 Gate def with argument manipulation""" qasm_source = """ qubit[2] __qubits__; @@ -1295,7 +1276,6 @@ def test_11_3_gate_def_with_argument_manipulation(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) # Gate def with argument manipulation analysis: @@ -1324,7 +1304,7 @@ def test_11_3_gate_def_with_argument_manipulation(self): f"Expected both outcomes to have significant probability, got {ratio} for {outcome}" ) - def test_11_4_physical_qubits(self): + def test_11_4_physical_qubits(self, simulator): """11.4 Physical qubits""" qasm_source = """ h $0; @@ -1332,7 +1312,6 @@ def test_11_4_physical_qubits(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) # Physical qubits analysis: @@ -1399,7 +1378,7 @@ def test_11_6_builtin_functions(self): total = sum(counter.values()) assert total == 100, f"Expected 100 measurements, got {total}" - def test_11_9_global_gate_control(self): + def test_11_9_global_gate_control(self, simulator): """11.9 Global gate control""" qasm_source = """ qubit q1; @@ -1411,7 +1390,6 @@ def test_11_9_global_gate_control(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) # Global gate control analysis: @@ -1433,7 +1411,7 @@ def test_11_9_global_gate_control(self): ratio = counter[outcome] / total assert 0.05 < ratio < 0.45, f"Expected ~0.25 for {outcome}, got {ratio}" - def test_11_10_power_modifiers(self): + def test_11_10_power_modifiers(self, simulator): """11.10 Power modifiers""" # Test sqrt(Z) = S qasm_source_z = """ @@ -1446,7 +1424,6 @@ def test_11_10_power_modifiers(self): """ program_z = OpenQASMProgram(source=qasm_source_z, inputs={}) - simulator = StateVectorSimulator() result_z = simulator.run_openqasm(program_z, shots=100) # Create a reference circuit with S gate @@ -1585,7 +1562,15 @@ def test_11_11_complex_power_modifiers(self): assert counter["11111"] == total, f"Expected all measurements to be |11111⟩, got {counter}" def test_11_12_gate_control(self): - """11.12 Gate control""" + """11.12 Gate control. + + State-vector only: this is a 5-qubit gate-modifier program with no + mid-circuit measurement, so it adds no density-matrix MCM coverage, and + the density matrix backend currently hits a pre-existing numba + contiguity limitation (``reshape() supports contiguous array only``) in + the >=5-qubit gate path, unrelated to this MCM work. + """ + simulator = StateVectorSimulator() qasm_source = """ const int[8] two = 2; gate x a { U(π, 0, π) a; } @@ -1625,7 +1610,6 @@ def test_11_12_gate_control(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) # Gate control analysis: @@ -1658,7 +1642,7 @@ def test_11_12_gate_control(self): total = sum(counter.values()) assert counter["11111"] == total, f"Expected all measurements to be |11111⟩, got {counter}" - def test_11_13_gate_inverses(self): + def test_11_13_gate_inverses(self, simulator): """11.13 Gate inverses""" qasm_source = """ gate rand_u_1 a { U(1, 2, 3) a; } @@ -1711,7 +1695,6 @@ def test_11_13_gate_inverses(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) # Gate inverses analysis: @@ -1735,7 +1718,7 @@ def test_11_13_gate_inverses(self): total = sum(counter.values()) assert counter["0"] == total, f"Expected all measurements to be |0⟩, got {counter}" - def test_11_14_gate_on_qubit_registers(self): + def test_11_14_gate_on_qubit_registers(self, simulator): """11.14 Gate on qubit registers""" qasm_source = """ qubit[3] qs; @@ -1748,7 +1731,6 @@ def test_11_14_gate_on_qubit_registers(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) # Gate on qubit registers analysis: @@ -1775,7 +1757,7 @@ def test_11_14_gate_on_qubit_registers(self): assert 0.3 < ratio_1010 < 0.7, f"Expected ~0.5 for |1010⟩, got {ratio_1010}" assert 0.3 < ratio_1011 < 0.7, f"Expected ~0.5 for |1011⟩, got {ratio_1011}" - def test_11_15_rotation_parameter_expressions(self): + def test_11_15_rotation_parameter_expressions(self, simulator): """11.15 Rotation parameter expressions""" qasm_source_pi = """ OPENQASM 3.0; @@ -1784,7 +1766,6 @@ def test_11_15_rotation_parameter_expressions(self): """ program_pi = OpenQASMProgram(source=qasm_source_pi, inputs={}) - simulator = StateVectorSimulator() result_pi = simulator.run_openqasm(program_pi, shots=100) # Rotation parameter expressions analysis: @@ -1833,7 +1814,7 @@ def test_11_15_rotation_parameter_expressions(self): f"Expected both outcomes to have significant probability, got {ratio} for {outcome}" ) - def test_12_1_aliasing_of_qubit_registers(self): + def test_12_1_aliasing_of_qubit_registers(self, simulator): """12.1 Aliasing of qubit registers""" qasm_source = """ OPENQASM 3.0; @@ -1850,7 +1831,6 @@ def test_12_1_aliasing_of_qubit_registers(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) # Aliasing of qubit registers analysis: @@ -1878,7 +1858,7 @@ def test_12_1_aliasing_of_qubit_registers(self): assert 0.3 < ratio_0101 < 0.7, f"Expected ~0.5 for |0101⟩, got {ratio_0101}" assert 0.3 < ratio_1111 < 0.7, f"Expected ~0.5 for |1111⟩, got {ratio_1111}" - def test_12_2_aliasing_with_concatenation(self): + def test_12_2_aliasing_with_concatenation(self, simulator): """12.2 Aliasing with concatenation""" qasm_source = """ OPENQASM 3.0; @@ -1895,7 +1875,6 @@ def test_12_2_aliasing_with_concatenation(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) # Aliasing with concatenation analysis: @@ -1924,7 +1903,7 @@ def test_12_2_aliasing_with_concatenation(self): assert 0.3 < ratio_010 < 0.7, f"Expected ~0.5 for |010⟩, got {ratio_010}" assert 0.3 < ratio_111 < 0.7, f"Expected ~0.5 for |111⟩, got {ratio_111}" - def test_13_1_early_return_in_subroutine(self): + def test_13_1_early_return_in_subroutine(self, simulator): """13.1 Early return in subroutine""" qasm_source = """ OPENQASM 3.0; @@ -1955,7 +1934,6 @@ def conditional_apply(bit condition) -> int[32] { """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Early return in subroutine analysis: @@ -1989,7 +1967,7 @@ def conditional_apply(bit condition) -> int[32] { for outcome in counter.keys(): assert outcome in valid_outcomes - def test_14_1_break_statement_in_loop(self): + def test_14_1_break_statement_in_loop(self, simulator): """14.1 Break statement in loop""" qasm_source = """ OPENQASM 3.0; @@ -2018,7 +1996,6 @@ def test_14_1_break_statement_in_loop(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Break statement in loop analysis: @@ -2046,7 +2023,7 @@ def test_14_1_break_statement_in_loop(self): assert 0.4 < ratio_01 < 0.6, f"Expected ~0.5 for |01⟩, got {ratio_01}" assert 0.4 < ratio_11 < 0.6, f"Expected ~0.5 for |11⟩, got {ratio_11}" - def test_14_2_continue_statement_in_loop(self): + def test_14_2_continue_statement_in_loop(self, simulator): """14.2 Continue statement in loop""" qasm_source = """ OPENQASM 3.0; @@ -2079,7 +2056,6 @@ def test_14_2_continue_statement_in_loop(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) # Continue statement in loop analysis: @@ -2110,7 +2086,7 @@ def test_14_2_continue_statement_in_loop(self): assert 0.4 < ratio_10 < 0.6, f"Expected ~0.5 for |10⟩, got {ratio_10}" assert 0.4 < ratio_11 < 0.6, f"Expected ~0.5 for |11⟩, got {ratio_11}" - def test_15_1_binary_assignment_operators_basic(self): + def test_15_1_binary_assignment_operators_basic(self, simulator): """15.1 Basic binary assignment operators (+=, -=, *=, /=)""" qasm_source = """ OPENQASM 3.0; @@ -2139,7 +2115,6 @@ def test_15_1_binary_assignment_operators_basic(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) counter = Counter(["".join(m) for m in result.measurements]) @@ -2239,7 +2214,7 @@ def test_16_2_bitwise_or_assignment_on_single_bit_register(self): total = sum(counter.values()) assert total == 100 - def test_16_3_accessing_nonexistent_variable_error(self): + def test_16_3_accessing_nonexistent_variable_error(self, simulator): """16.3 Test accessing a variable with a name that doesn't exist in the circuit (should throw an error)""" qasm_source = """ OPENQASM 3.0; @@ -2257,13 +2232,12 @@ def test_16_3_accessing_nonexistent_variable_error(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() # This should raise a KeyError for nonexistent variable with pytest.raises(KeyError): simulator.run_openqasm(program, shots=100) - def test_16_4_array_and_qubit_register_out_of_bounds_error(self): + def test_16_4_array_and_qubit_register_out_of_bounds_error(self, simulator): """16.4 Test accessing an array/bitstring and a qubit register out of bounds (should throw an error)""" # Test array out of bounds @@ -2283,7 +2257,6 @@ def test_16_4_array_and_qubit_register_out_of_bounds_error(self): """ program_array = OpenQASMProgram(source=qasm_source_array, inputs={}) - simulator = StateVectorSimulator() # This should raise an IndexError for array out of bounds with pytest.raises(IndexError): @@ -2346,7 +2319,7 @@ def test_16_5_access_array_input_at_index(self): total = sum(counter.values()) assert total == 100 - def test_17_1_nonexistent_qubit_variable_error(self): + def test_17_1_nonexistent_qubit_variable_error(self, simulator): """17.1 Test accessing a qubit with a name that doesn't exist (should throw an error)""" qasm_source = """ OPENQASM 3.0; @@ -2360,13 +2333,12 @@ def test_17_1_nonexistent_qubit_variable_error(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() # This should raise a KeyError for nonexistent qubit with pytest.raises(KeyError): simulator.run_openqasm(program, shots=100) - def test_17_2_nonexistent_function_error(self): + def test_17_2_nonexistent_function_error(self, simulator): """17.2 Test calling a function that doesn't exist (should throw an error)""" qasm_source = """ OPENQASM 3.0; @@ -2382,13 +2354,12 @@ def test_17_2_nonexistent_function_error(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() # This should raise a NameError for nonexistent function with pytest.raises(NameError, match="Subroutine nonexistent_function is not defined"): simulator.run_openqasm(program, shots=100) - def test_17_3_all_paths_end_in_else_block(self): + def test_17_3_all_paths_end_in_else_block(self, simulator): """17.3 All paths end in the else block""" qasm_source = """ OPENQASM 3.0; @@ -2410,14 +2381,13 @@ def test_17_3_all_paths_end_in_else_block(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=100) counter = Counter(["".join(m) for m in result.measurements]) # always_false=0, so else block runs: x q[1] → q[1]=1, q[0] untouched assert counter == {"1": 100} - def test_17_4_continue_statements_in_while_loops(self): + def test_17_4_continue_statements_in_while_loops(self, simulator): """17.4 Continue statements in while loops""" qasm_source = """ OPENQASM 3.0; @@ -2444,7 +2414,6 @@ def test_17_4_continue_statements_in_while_loops(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) counter = Counter(["".join(m) for m in result.measurements]) @@ -2453,7 +2422,7 @@ def test_17_4_continue_statements_in_while_loops(self): total = sum(counter.values()) assert 0.4 < counter["10"] / total < 0.6 - def test_17_5_empty_return_statements(self): + def test_17_5_empty_return_statements(self, simulator): """17.5 Empty return statements in subroutines. Exercises subroutine definition with early return. The subroutine @@ -2481,7 +2450,6 @@ def apply_gates_conditionally(bit condition) { """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() result = simulator.run_openqasm(program, shots=1000) assert len(result.measurements) == 1000 @@ -2540,7 +2508,7 @@ def test_17_6_not_unary_operator(self): total = sum(counter.values()) assert total == 100 - def test_18_1_simulation_zero_shots(self): + def test_18_1_simulation_zero_shots(self, simulator): """18.1 Simulation with 0 or negative shots should raise ValueError.""" qasm_source = """ OPENQASM 3.0; @@ -2548,7 +2516,6 @@ def test_18_1_simulation_zero_shots(self): """ program = OpenQASMProgram(source=qasm_source, inputs={}) - simulator = StateVectorSimulator() with pytest.raises(ValueError): simulator.run_openqasm(program, shots=0) @@ -2557,9 +2524,9 @@ def test_18_1_simulation_zero_shots(self): simulator.run_openqasm(program, shots=-100) -@pytest.fixture -def simulator(): - return StateVectorSimulator() +@pytest.fixture(params=[StateVectorSimulator, DensityMatrixSimulator], ids=["sv", "dm"]) +def simulator(request): + return request.param() class TestUnifiedMCMBasic: @@ -2804,8 +2771,11 @@ def test_empty_circuit_with_shots(self, simulator): counter = Counter(["".join(m) for m in result.measurements]) assert counter == {"0": 100} - def test_deterministic_measurement(self, simulator): + def test_deterministic_measurement(self): """Measurement of |0> should always give 0 (no branching needed).""" + # SV-only: SV/DM differ in measured-column reporting for partially-measured + # programs (SV pads to the register; DM reports only measured qubits). + simulator = StateVectorSimulator() qasm = """ OPENQASM 3.0; bit b; @@ -3270,8 +3240,11 @@ def test_measure_only_in_if_branch_x(self, simulator): counter = Counter(["".join(m) for m in result.measurements]) assert counter == {"10": 1000} - def test_measure_only_in_if_branch_z(self, simulator): + def test_measure_only_in_if_branch_z(self): """X q[0] → b=1 → measure q[1] only in if block.""" + # SV-only: SV/DM differ in measured-column reporting for partially-measured + # programs (SV pads to the register; DM reports only measured qubits). + simulator = StateVectorSimulator() qasm = """ OPENQASM 3.0; bit b; @@ -4459,8 +4432,11 @@ def test_branching_without_simulator_raises(self): with pytest.raises(RuntimeError, match="no simulator"): Interpreter(ctx).run(qasm) - def test_flush_when_already_branched(self, simulator): + def test_flush_when_already_branched(self): """Reading a pending MCM variable when already branched from an earlier MCM.""" + # SV-only: this asserts a fixed (SV) measured-column ordering via outcome[1]; + # SV and DM differ in how recorded classical-bit columns are assembled. + simulator = StateVectorSimulator() qasm = """ OPENQASM 3.0; qubit[3] q; @@ -4856,8 +4832,11 @@ def test_measure_ff_and_cc_prx_match_explicit_form(self, simulator): assert set(experimental.keys()) == {"00", "11"} assert set(explicit.keys()) == {"00", "11"} - def test_cc_prx_without_branch(self, simulator): + def test_cc_prx_without_branch(self): """With the measured qubit deterministically ``|0>``, ``cc_prx`` never fires.""" + # SV-only: SV/DM differ in measured-column reporting for partially-measured + # programs (SV pads to the register; DM reports only measured qubits). + simulator = StateVectorSimulator() qasm = """ OPENQASM 3.0; qubit[2] q; @@ -5068,9 +5047,9 @@ class TestDensityMatrixSimulatorBranching: The state-vector tests in :class:`TestStateVectorSimulatorOperatorsOpenQASM` drive the bulk of branching behavior; these tests verify that the same programs produce statistically equivalent results when the simulator is - swapped out for the density-matrix backend, which goes through - :meth:`DensityMatrixSimulation._apply_projection` (and, for resets, the - Kraus channel path) on each branched replay. + swapped out for the density-matrix backend, which branches via exact Kraus + splits (:meth:`DensityMatrixSimulation.project_unnormalized`) and draws all + shots once from the combined total density matrix. """ SHOTS = 4000 @@ -5196,3 +5175,219 @@ def test_sparse_qubit_register_does_not_blow_up(self): # Mirrors a verbatim ``measure_ff(q[13], 0); cc_prx(q[17], pi, 0, 0)`` # emission: q[17] flips iff b[0]==1 → outcomes "00" and "11" each ~50%. self._assert_distributions_match(qasm, expected_keys={"00", "11"}) + + # --- Task 11: classical feedforward (measure_ff / cc_prx) per sub-ensemble --- + # (Req 10.1, 10.2, 10.3): feedforward gates must apply per sub-ensemble on the DM + # density matrix, and a cc_prx on an unrecorded feedback key must raise. + + def test_feedforward_applies_per_sub_ensemble(self): + """``measure_ff``/``cc_prx`` apply per sub-ensemble on the DM path (Req 10.1, 10.2). + + The experimental ``measure_ff(k) q; cc_prx(a1,a2,k) q2`` form must produce the same + per-branch behavior as the explicit ``b = measure q; if (b==1) prx(a1,a2) q2``, + and both must agree with the state vector simulator within shot noise. This guards + the DM-specific ``cc_prx`` application path (the conditioned ``prx`` is applied to + each selected sub-ensemble's density matrix rather than appended to an unused + instruction list). + """ + experimental = """ + OPENQASM 3.0; + qubit[2] q; + bit[2] c; + h q[0]; + measure_ff(0) q[0]; + cc_prx(3.141592653589793, 0.0, 0) q[1]; + c[0] = measure q[0]; + c[1] = measure q[1]; + """ + explicit = """ + OPENQASM 3.0; + qubit[2] q; + bit[2] c; + bit b; + h q[0]; + b = measure q[0]; + if (b == 1) { + prx(3.141592653589793, 0.0) q[1]; + } + c[0] = measure q[0]; + c[1] = measure q[1]; + """ + # q[1] mirrors q[0] (cc_prx is a pi x-rotation when the feedback bit is 1): + # outcomes "00" and "11" each ~50%. + self._assert_distributions_match(experimental, expected_keys={"00", "11"}) + self._assert_distributions_match(explicit, expected_keys={"00", "11"}) + + def test_feedforward_independent_keys_apply_per_sub_ensemble(self): + """Independent feedback keys each drive their own conditional per sub-ensemble. + + Validates Req 10.1, 10.2: two ``measure_ff`` keys steer two ``cc_prx`` gates, so + q[2] mirrors q[0] and q[3] mirrors q[1]; the DM histogram matches SV within noise. + """ + qasm = """ + OPENQASM 3.0; + qubit[4] q; + bit[4] c; + h q[0]; + h q[1]; + measure_ff(0) q[0]; + measure_ff(1) q[1]; + cc_prx(3.141592653589793, 0.0, 0) q[2]; + cc_prx(3.141592653589793, 0.0, 1) q[3]; + c[0] = measure q[0]; + c[1] = measure q[1]; + c[2] = measure q[2]; + c[3] = measure q[3]; + """ + self._assert_distributions_match(qasm, expected_keys={"0000", "0101", "1010", "1111"}) + + def test_cc_prx_with_unmeasured_feedback_key_raises(self): + """A ``cc_prx`` on a feedback key with no prior ``measure_ff`` raises (Req 10.3). + + The inherited handler raises a ``ValueError`` identifying the missing key; this + confirms the density-matrix simulator surfaces the same error naming that key. + """ + qasm = """ + OPENQASM 3.0; + qubit[2] q; + cc_prx(3.141592653589793, 0.0, 7) q[0]; + """ + with pytest.raises(ValueError, match="cc_prx references feedback key 7 but no measure_ff"): + DensityMatrixSimulator().run_openqasm( + OpenQASMProgram(source=qasm, inputs={}), shots=100 + ) + + # --- Analytical readout (shots == 0) from the branched total density matrix --- + # A measurement-conditioned program branches at every shot count (the DM context's + # _should_branch_on_mcm override returns True), so shots == 0 routes through + # _run_branched and reads the requested result types directly off the exact + # rho_total via _analytical_results_from_total (no sampling). + + @staticmethod + def _analytical(qasm): + return DensityMatrixSimulator().run(OpenQASMProgram(source=qasm)).resultTypes + + def test_analytical_density_matrix_from_branched_total(self): + """shots == 0 DensityMatrix equals the exact branched rho_total. + + ``h; measure; if(b==1) x`` yields the classical mixture + 0.5|00><00| + 0.5|11><11|, read off rho_total with no sampling. + """ + qasm = """ + OPENQASM 3.0; + qubit[2] q; + bit b; + h q[0]; + b = measure q[0]; + if (b == 1) { + x q[1]; + } + #pragma braket result density_matrix + """ + dm = self._analytical(qasm)[0].value + expected = np.zeros((4, 4), dtype=complex) + expected[0, 0] = 0.5 + expected[3, 3] = 0.5 + assert np.allclose(dm, expected) + + def test_analytical_probability_from_branched_total(self): + """shots == 0 Probability equals the real diagonal of rho_total.""" + qasm = """ + OPENQASM 3.0; + qubit[2] q; + bit b; + h q[0]; + b = measure q[0]; + if (b == 1) { + x q[1]; + } + #pragma braket result probability + """ + probs = self._analytical(qasm)[0].value + assert np.allclose(probs, [0.5, 0.0, 0.0, 0.5]) + + def test_analytical_expectation_and_variance_from_branched_total(self): + """shots == 0 Expectation/Variance use the direct trace on rho_total. + + For 0.5|00><00| + 0.5|11><11| and observable Z on q[0]: + = Tr(Z rho) = 0, Var = Tr(Z^2 rho) - ^2 = 1. + """ + qasm = """ + OPENQASM 3.0; + qubit[2] q; + bit b; + h q[0]; + b = measure q[0]; + if (b == 1) { + x q[1]; + } + #pragma braket result expectation z(q[0]) + #pragma braket result variance z(q[0]) + """ + results = self._analytical(qasm) + assert np.isclose(results[0].value, 0.0) + assert np.isclose(results[1].value, 1.0) + + def test_analytical_subset_result_type_reports_against_physical_qubit(self): + """A subset result type at shots == 0 reports against the original physical qubit. + + ``measure, then correct`` re-converges q[0] to |0> on both branches, so + rho_total reduces to |0><0| on q[0]; the targeted Z expectation is +1. + """ + qasm = """ + OPENQASM 3.0; + qubit[2] q; + bit b; + h q[0]; + b = measure q[0]; + if (b == 1) { + x q[0]; + } + #pragma braket result expectation z(q[0]) + """ + results = self._analytical(qasm) + assert np.isclose(results[0].value, 1.0) + + def test_branched_noise_instruction_applies_per_sub_ensemble(self): + """A noise channel after an MCM branch applies per sub-ensemble (covers the + branched ``add_noise_instruction`` path). + + ``h; measure; if(b==1) x q[1]`` gives 0.5|0>+0.5|1> on q[1]; a bit_flip(0.1) + on q[1] then yields exact probabilities [0.45, 0.05, 0.05, 0.45] at shots == 0. + """ + qasm = """ + OPENQASM 3.0; + qubit[2] q; + bit b; + h q[0]; + b = measure q[0]; + if (b == 1) { + x q[1]; + } + #pragma braket noise bit_flip(0.1) q[1] + #pragma braket result probability + """ + probs = self._analytical(qasm)[0].value + assert np.allclose(probs, [0.45, 0.05, 0.05, 0.45]) + + def test_branched_kraus_instruction_applies_per_sub_ensemble(self): + """A Kraus channel after an MCM branch applies per sub-ensemble (covers the + branched ``add_kraus_instruction`` path). + + The bit-flip Kraus set ``[sqrt(0.9) I, sqrt(0.1) X]`` on q[1] reproduces the + bit_flip(0.1) probabilities [0.45, 0.05, 0.05, 0.45] at shots == 0. + """ + qasm = """ + OPENQASM 3.0; + qubit[2] q; + bit b; + h q[0]; + b = measure q[0]; + if (b == 1) { + x q[1]; + } + #pragma braket noise kraus([[0.9486832980505138, 0], [0, 0.9486832980505138]], [[0, 0.31622776601683794], [0.31622776601683794, 0]]) q[1] + #pragma braket result probability + """ + probs = self._analytical(qasm)[0].value + assert np.allclose(probs, [0.45, 0.05, 0.05, 0.45])