From 913c74c12c4d73306b3b2e38a3f01c1813a31685 Mon Sep 17 00:00:00 2001 From: Travis Crew Date: Thu, 11 Jun 2026 19:49:14 -0500 Subject: [PATCH 1/3] Add Pauli flow extraction from corrections --- docs/source/modifier.rst | 2 + graphix/flow/core.py | 226 +++++++++++++++++++++++++++++++++++++ graphix/flow/exceptions.py | 5 + graphix/pattern.py | 25 ++++ tests/test_flow_core.py | 16 +++ tests/test_pattern.py | 31 +++++ 6 files changed, 305 insertions(+) diff --git a/docs/source/modifier.rst b/docs/source/modifier.rst index 4fe4f5727..f1e87e09f 100644 --- a/docs/source/modifier.rst +++ b/docs/source/modifier.rst @@ -56,6 +56,8 @@ Pattern Manipulation .. automethod:: extract_gflow + .. automethod:: extract_pauli_flow + .. automethod:: extract_opengraph .. automethod:: extract_measurement_commands diff --git a/graphix/flow/core.py b/graphix/flow/core.py index fa674c3fd..383a12ed3 100644 --- a/graphix/flow/core.py +++ b/graphix/flow/core.py @@ -266,6 +266,49 @@ def to_gflow(self: XZCorrections[_PM_co]) -> GFlow[_PM_co]: gf.check_well_formed() # Raises a `FlowError` if the partial order and the correction function are not compatible. return gf + def to_pauli_flow(self) -> PauliFlow[_AM_co]: + r"""Extract a Pauli flow from XZ-corrections. + + This method does not invoke the Pauli-flow extraction routine on the underlying open graph. + Instead, it reconstructs a correction function compatible with the XZ-corrections and the + intrinsic partial order, then verifies the result using :meth:`PauliFlow.check_well_formed`. + + Returns + ------- + PauliFlow[_AM_co] + + Raises + ------ + FlowError + If no Pauli-flow correction function is compatible with the XZ-corrections and partial order. + + Notes + ----- + XZ-corrections only record corrections applied to nodes in the future of a measurement. For + Pauli flow, a correcting set can also contain nodes in the measurement's past or present when + their measurement labels allow it. The missing part of each correcting set is therefore obtained + by solving the Pauli-flow parity constraints over GF(2). + """ + self.check_well_formed() + + if len(self.partial_order_layers) == 0: + raise PartialOrderError(PartialOrderErrorReason.Empty) + + future: set[int] = set(self.partial_order_layers[0]) + correction_function: dict[int, AbstractSet[int]] = {} + + for layer in self.partial_order_layers[1:]: + for measured_node in layer: + correction_function[measured_node] = _xz_corrections_to_pauli_correction_set( + self, measured_node, future + ) + + future |= set(layer) + + pf = PauliFlow(self.og, correction_function, self.partial_order_layers) + pf.check_well_formed() + return pf + def to_bloch(self: XZCorrections[Measurement]) -> XZCorrections[BlochMeasurement]: """Return the XZ-corrections where all measurements in the open graph are converted to Bloch. @@ -1341,6 +1384,189 @@ def _corrections_to_partial_order_layers( return generations[::-1] +def _xz_corrections_to_pauli_correction_set( + xz_corrections: XZCorrections[_AM_co], measured_node: int, future: AbstractSet[int] +) -> frozenset[int]: + fixed_correction_nodes = set(xz_corrections.x_corrections.get(measured_node, set())) + expected_odd_future = set(xz_corrections.z_corrections.get(measured_node, set())) + candidate_nodes = sorted(set(xz_corrections.og.graph.nodes) - set(xz_corrections.og.input_nodes) - set(future)) + variable_index = {node: index for index, node in enumerate(candidate_nodes)} + equations: list[tuple[set[int], int]] = [] + + for node in future: + _append_odd_neighborhood_equation( + equations, + xz_corrections.og.graph, + node, + variable_index, + fixed_correction_nodes, + int(node in expected_odd_future), + ) + + past_and_present_nodes = set(xz_corrections.og.measurements) - set(future) + + for node in past_and_present_nodes - {measured_node}: + meas = xz_corrections.og.measurements[node].to_plane_or_axis() + if meas not in {Axis.X, Axis.Y}: + _append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 0) + if meas not in {Axis.Y, Axis.Z}: + _append_odd_neighborhood_equation( + equations, xz_corrections.og.graph, node, variable_index, fixed_correction_nodes, 0 + ) + if meas == Axis.Y: + _append_closed_odd_neighborhood_equation( + equations, xz_corrections.og.graph, node, variable_index, fixed_correction_nodes, 0 + ) + + _append_pauli_flow_self_condition( + equations, + xz_corrections.og.graph, + measured_node, + xz_corrections.og.measurements[measured_node].to_plane_or_axis(), + variable_index, + fixed_correction_nodes, + ) + + solution = _solve_binary_linear_system(equations, len(candidate_nodes)) + if solution is None: + raise FlowGenericError(FlowGenericErrorReason.NoCorrectionFunction) + + correction_set = set(fixed_correction_nodes) + correction_set.update(node for node, index in variable_index.items() if solution[index]) + return frozenset(correction_set) + + +def _append_pauli_flow_self_condition( + equations: list[tuple[set[int], int]], + graph: nx.Graph[int], + node: int, + meas: Plane | Axis, + variable_index: Mapping[int, int], + fixed_correction_nodes: AbstractSet[int], +) -> None: + match meas: + case Plane.XY: + _append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 0) + _append_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 1) + case Plane.XZ: + _append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 1) + _append_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 1) + case Plane.YZ: + _append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 1) + _append_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 0) + case Axis.X: + _append_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 1) + case Axis.Z: + _append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 1) + case Axis.Y: + _append_closed_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 1) + case _: + assert_never(meas) + + +def _append_membership_equation( + equations: list[tuple[set[int], int]], + node: int, + variable_index: Mapping[int, int], + fixed_correction_nodes: AbstractSet[int], + value: int, +) -> None: + coefficients, fixed_value = _membership_terms(node, variable_index, fixed_correction_nodes) + equations.append((coefficients, value ^ fixed_value)) + + +def _append_odd_neighborhood_equation( + equations: list[tuple[set[int], int]], + graph: nx.Graph[int], + node: int, + variable_index: Mapping[int, int], + fixed_correction_nodes: AbstractSet[int], + value: int, +) -> None: + coefficients, fixed_value = _odd_neighborhood_terms(graph, node, variable_index, fixed_correction_nodes) + equations.append((coefficients, value ^ fixed_value)) + + +def _append_closed_odd_neighborhood_equation( + equations: list[tuple[set[int], int]], + graph: nx.Graph[int], + node: int, + variable_index: Mapping[int, int], + fixed_correction_nodes: AbstractSet[int], + value: int, +) -> None: + coefficients, fixed_value = _membership_terms(node, variable_index, fixed_correction_nodes) + odd_coefficients, odd_fixed_value = _odd_neighborhood_terms(graph, node, variable_index, fixed_correction_nodes) + + for coefficient in odd_coefficients: + _toggle_coefficient(coefficients, coefficient) + + equations.append((coefficients, value ^ fixed_value ^ odd_fixed_value)) + + +def _membership_terms( + node: int, variable_index: Mapping[int, int], fixed_correction_nodes: AbstractSet[int] +) -> tuple[set[int], int]: + if node in variable_index: + return {variable_index[node]}, 0 + return set(), int(node in fixed_correction_nodes) + + +def _odd_neighborhood_terms( + graph: nx.Graph[int], + node: int, + variable_index: Mapping[int, int], + fixed_correction_nodes: AbstractSet[int], +) -> tuple[set[int], int]: + coefficients: set[int] = set() + fixed_value = 0 + + for neighbor in graph.neighbors(node): + if neighbor in variable_index: + _toggle_coefficient(coefficients, variable_index[neighbor]) + elif neighbor in fixed_correction_nodes: + fixed_value ^= 1 + + return coefficients, fixed_value + + +def _toggle_coefficient(coefficients: set[int], coefficient: int) -> None: + if coefficient in coefficients: + coefficients.remove(coefficient) + else: + coefficients.add(coefficient) + + +def _solve_binary_linear_system(equations: Sequence[tuple[set[int], int]], n_variables: int) -> list[int] | None: + matrix = [[sum(1 << coefficient for coefficient in coefficients), value] for coefficients, value in equations] + pivot_columns: list[int] = [] + pivot_row = 0 + + for column in range(n_variables): + pivot = next((row for row in range(pivot_row, len(matrix)) if matrix[row][0] & (1 << column)), None) + if pivot is None: + continue + + matrix[pivot_row], matrix[pivot] = matrix[pivot], matrix[pivot_row] + + for row, _ in enumerate(matrix): + if row != pivot_row and matrix[row][0] & (1 << column): + matrix[row][0] ^= matrix[pivot_row][0] + matrix[row][1] ^= matrix[pivot_row][1] + + pivot_columns.append(column) + pivot_row += 1 + + if any(mask == 0 and value for mask, value in matrix): + return None + + solution = [0] * n_variables + for row, column in enumerate(pivot_columns): + solution[column] = matrix[row][1] + + return solution + + def _check_correction_function_domain( og: OpenGraph[_AM_co], correction_function: Mapping[int, AbstractSet[int]] ) -> bool: diff --git a/graphix/flow/exceptions.py b/graphix/flow/exceptions.py index 0c76e8f05..8537673cc 100644 --- a/graphix/flow/exceptions.py +++ b/graphix/flow/exceptions.py @@ -88,6 +88,9 @@ class FlowGenericErrorReason(Enum): XYPlane = enum.auto() "A causal flow is defined on an open graphs with non-XY measurements." + NoCorrectionFunction = enum.auto() + """No correction function satisfies the requested flow constraints.""" + class XZCorrectionsOrderErrorReason(Enum): """Describe the reason of an `XZCorrectionsOrderError` exception.""" @@ -224,6 +227,8 @@ def __str__(self) -> str: return "The image of the correction function must be a subset of non-input nodes (prepared qubits) of the open graph." case FlowGenericErrorReason.XYPlane: return "Causal flow is only defined on open graphs with XY measurements." + case FlowGenericErrorReason.NoCorrectionFunction: + return "No correction function satisfies the requested flow constraints." case _: assert_never(self.reason) diff --git a/graphix/pattern.py b/graphix/pattern.py index bcc18035b..5d6af43b1 100644 --- a/graphix/pattern.py +++ b/graphix/pattern.py @@ -991,6 +991,31 @@ def extract_gflow(self) -> GFlow[BlochMeasurement]: """ return self.extract_xzcorrections().downcast_bloch().to_gflow() + def extract_pauli_flow(self) -> PauliFlow[Measurement]: + r"""Extract the Pauli flow structure from the current measurement pattern. + + This method does not call the flow-extraction routine on the underlying open graph, but constructs + the Pauli flow from the pattern corrections instead. + + Returns + ------- + PauliFlow[Measurement] + The Pauli flow associated with the current pattern. + + Raises + ------ + FlowError + If the pattern is empty or if the extracted structure does not satisfy + the well-formedness conditions required for a valid Pauli flow. + ValueError + If `N` commands in the pattern do not represent a :math:`|+\rangle` state or if the pattern corrections form closed loops. + + Notes + ----- + The notes provided in :func:`self.extract_causal_flow` apply here as well. + """ + return self.extract_xzcorrections().to_pauli_flow() + def extract_xzcorrections(self) -> XZCorrections[Measurement]: """Extract the XZ-corrections from the current measurement pattern. diff --git a/tests/test_flow_core.py b/tests/test_flow_core.py index 8e722bd93..1dbddb9d1 100644 --- a/tests/test_flow_core.py +++ b/tests/test_flow_core.py @@ -384,6 +384,10 @@ def prepare_test_xzcorrections() -> list[XZCorrectionsTestCase]: return test_cases +def prepare_test_pauli_flow_xzcorrections() -> list[XZCorrectionsTestCase]: + return [test_case for test_case in prepare_test_xzcorrections() if isinstance(test_case.flow, PauliFlow)] + + class TestFlowPatternConversion: """Bundle for unit tests of the flow to XZ-corrections to pattern methods. @@ -399,6 +403,18 @@ def test_flow_to_corrections(self, test_case: XZCorrectionsTestCase) -> None: assert corrections.z_corrections == test_case.z_corr assert corrections.x_corrections == test_case.x_corr + @pytest.mark.parametrize("test_case", prepare_test_pauli_flow_xzcorrections()) + def test_corrections_to_pauli_flow(self, test_case: XZCorrectionsTestCase) -> None: + corrections = test_case.flow.to_corrections() + flow = corrections.to_pauli_flow() + + flow.check_well_formed() + extracted_corrections = flow.to_corrections() + + assert extracted_corrections.x_corrections == corrections.x_corrections + assert extracted_corrections.z_corrections == corrections.z_corrections + assert flow.partial_order_layers == corrections.partial_order_layers + @pytest.mark.parametrize("test_case", prepare_test_xzcorrections()) def test_corrections_to_pattern(self, test_case: XZCorrectionsTestCase, fx_rng: Generator) -> None: if test_case.pattern is not None: diff --git a/tests/test_pattern.py b/tests/test_pattern.py index 6a87c4016..c97de9f5e 100644 --- a/tests/test_pattern.py +++ b/tests/test_pattern.py @@ -972,6 +972,37 @@ def test_extract_gflow(self, fx_rng: Generator, test_case: PatternFlowTestCase) with pytest.raises(FlowError): test_case.pattern.extract_gflow() + def test_extract_pauli_flow(self) -> None: + pattern = Pattern( + input_nodes=[0], + cmds=[ + N(1), + N(2), + N(3), + E((0, 1)), + E((1, 2)), + E((2, 3)), + M(0, Measurement.X), + X(3, {0}), + M(1, Measurement.X), + Z(3, {1}), + M(2, Measurement.X), + X(3, {2}), + ], + output_nodes=[3], + ) + + corrections = pattern.extract_xzcorrections() + flow = pattern.extract_pauli_flow() + + flow.check_well_formed() + extracted_corrections = flow.to_corrections() + + assert extracted_corrections.x_corrections == corrections.x_corrections + assert extracted_corrections.z_corrections == corrections.z_corrections + assert flow.partial_order_layers == corrections.partial_order_layers + assert flow.correction_function == {0: {1, 3}, 1: {2}, 2: {3}} + # From open graph def test_extract_cflow_og(self, fx_rng: Generator) -> None: alpha = 2 * np.pi * fx_rng.random() From 555bb90c1146a4a4f6a157fc888ca31f05fcf35f Mon Sep 17 00:00:00 2001 From: Travis Crew <171891419+CrewRiz@users.noreply.github.com> Date: Sat, 13 Jun 2026 23:08:36 -0500 Subject: [PATCH 2/3] Refine Pauli-flow reconstruction --- graphix/flow/core.py | 85 +++++++++++++++++++++----------------- graphix/flow/exceptions.py | 8 ++-- tests/test_flow_core.py | 39 +++++++++++++++++ 3 files changed, 90 insertions(+), 42 deletions(-) diff --git a/graphix/flow/core.py b/graphix/flow/core.py index 383a12ed3..1a967f881 100644 --- a/graphix/flow/core.py +++ b/graphix/flow/core.py @@ -11,10 +11,12 @@ from typing import TYPE_CHECKING, Generic, TypeVar import networkx as nx +import numpy as np # `override` introduced in Python 3.12, `assert_never` introduced in Python 3.11 from typing_extensions import assert_never, override +from graphix._linalg import MatGF2, solve_f2_linear_system from graphix.circ_ext.extraction import ( CliffordMap, ExtractionResult, @@ -271,7 +273,7 @@ def to_pauli_flow(self) -> PauliFlow[_AM_co]: This method does not invoke the Pauli-flow extraction routine on the underlying open graph. Instead, it reconstructs a correction function compatible with the XZ-corrections and the - intrinsic partial order, then verifies the result using :meth:`PauliFlow.check_well_formed`. + intrinsic partial order. Returns ------- @@ -290,9 +292,8 @@ def to_pauli_flow(self) -> PauliFlow[_AM_co]: by solving the Pauli-flow parity constraints over GF(2). """ self.check_well_formed() - - if len(self.partial_order_layers) == 0: - raise PartialOrderError(PartialOrderErrorReason.Empty) + if not self.og.measurements: + return PauliFlow(self.og, {}, self.partial_order_layers) future: set[int] = set(self.partial_order_layers[0]) correction_function: dict[int, AbstractSet[int]] = {} @@ -305,9 +306,7 @@ def to_pauli_flow(self) -> PauliFlow[_AM_co]: future |= set(layer) - pf = PauliFlow(self.og, correction_function, self.partial_order_layers) - pf.check_well_formed() - return pf + return PauliFlow(self.og, correction_function, self.partial_order_layers) def to_bloch(self: XZCorrections[Measurement]) -> XZCorrections[BlochMeasurement]: """Return the XZ-corrections where all measurements in the open graph are converted to Bloch. @@ -1389,10 +1388,29 @@ def _xz_corrections_to_pauli_correction_set( ) -> frozenset[int]: fixed_correction_nodes = set(xz_corrections.x_corrections.get(measured_node, set())) expected_odd_future = set(xz_corrections.z_corrections.get(measured_node, set())) - candidate_nodes = sorted(set(xz_corrections.og.graph.nodes) - set(xz_corrections.og.input_nodes) - set(future)) - variable_index = {node: index for index, node in enumerate(candidate_nodes)} + input_nodes = set(xz_corrections.og.input_nodes) + if fixed_correction_nodes & input_nodes: + raise FlowGenericError(FlowGenericErrorReason.NoPauliFlow) + + nonfuture_measured_nodes = set(xz_corrections.og.measurements) - set(future) + measured_label = xz_corrections.og.measurements[measured_node].to_plane_or_axis() + variable_nodes = [ + node + for node in sorted(nonfuture_measured_nodes - input_nodes) + if xz_corrections.og.measurements[node].to_plane_or_axis() in {Axis.X, Axis.Y} + and (node != measured_node or measured_label in {Axis.X, Axis.Y}) + ] + variable_index = {node: index for index, node in enumerate(variable_nodes)} equations: list[tuple[set[int], int]] = [] + if measured_node not in variable_index: + if measured_label in {Plane.XZ, Plane.YZ, Axis.Z}: + if measured_node in input_nodes: + raise FlowGenericError(FlowGenericErrorReason.NoPauliFlow) + fixed_correction_nodes.add(measured_node) + elif measured_label == Plane.XY: + fixed_correction_nodes.discard(measured_node) + for node in future: _append_odd_neighborhood_equation( equations, @@ -1403,12 +1421,10 @@ def _xz_corrections_to_pauli_correction_set( int(node in expected_odd_future), ) - past_and_present_nodes = set(xz_corrections.og.measurements) - set(future) - - for node in past_and_present_nodes - {measured_node}: + for node in nonfuture_measured_nodes - {measured_node}: meas = xz_corrections.og.measurements[node].to_plane_or_axis() if meas not in {Axis.X, Axis.Y}: - _append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 0) + fixed_correction_nodes.discard(node) if meas not in {Axis.Y, Axis.Z}: _append_odd_neighborhood_equation( equations, xz_corrections.og.graph, node, variable_index, fixed_correction_nodes, 0 @@ -1427,9 +1443,9 @@ def _xz_corrections_to_pauli_correction_set( fixed_correction_nodes, ) - solution = _solve_binary_linear_system(equations, len(candidate_nodes)) + solution = _solve_f2_equations(equations, len(variable_nodes)) if solution is None: - raise FlowGenericError(FlowGenericErrorReason.NoCorrectionFunction) + raise FlowGenericError(FlowGenericErrorReason.NoPauliFlow) correction_set = set(fixed_correction_nodes) correction_set.update(node for node, index in variable_index.items() if solution[index]) @@ -1537,34 +1553,27 @@ def _toggle_coefficient(coefficients: set[int], coefficient: int) -> None: coefficients.add(coefficient) -def _solve_binary_linear_system(equations: Sequence[tuple[set[int], int]], n_variables: int) -> list[int] | None: - matrix = [[sum(1 << coefficient for coefficient in coefficients), value] for coefficients, value in equations] - pivot_columns: list[int] = [] - pivot_row = 0 - - for column in range(n_variables): - pivot = next((row for row in range(pivot_row, len(matrix)) if matrix[row][0] & (1 << column)), None) - if pivot is None: - continue +def _solve_f2_equations(equations: Sequence[tuple[set[int], int]], n_variables: int) -> list[int] | None: + if n_variables == 0: + return [] if all(not coefficients and value == 0 for coefficients, value in equations) else None + if not equations: + return [0] * n_variables - matrix[pivot_row], matrix[pivot] = matrix[pivot], matrix[pivot_row] + augmented = np.zeros((len(equations), n_variables + 1), dtype=np.uint8).view(MatGF2) + for row, (equation_coefficients, value) in enumerate(equations): + for coefficient in equation_coefficients: + augmented[row, coefficient] = 1 + augmented[row, n_variables] = value - for row, _ in enumerate(matrix): - if row != pivot_row and matrix[row][0] & (1 << column): - matrix[row][0] ^= matrix[pivot_row][0] - matrix[row][1] ^= matrix[pivot_row][1] + reduced = augmented.gauss_elimination(ncols=n_variables, copy=False) + matrix = MatGF2(reduced[:, :n_variables]) + constants = MatGF2(reduced[:, n_variables]) - pivot_columns.append(column) - pivot_row += 1 - - if any(mask == 0 and value for mask, value in matrix): + if np.any((~matrix.any(axis=1)) & constants.astype(bool)): return None - solution = [0] * n_variables - for row, column in enumerate(pivot_columns): - solution[column] = matrix[row][1] - - return solution + solution = solve_f2_linear_system(matrix, constants) + return [int(bit) for bit in solution] def _check_correction_function_domain( diff --git a/graphix/flow/exceptions.py b/graphix/flow/exceptions.py index 8537673cc..90380de8e 100644 --- a/graphix/flow/exceptions.py +++ b/graphix/flow/exceptions.py @@ -88,8 +88,8 @@ class FlowGenericErrorReason(Enum): XYPlane = enum.auto() "A causal flow is defined on an open graphs with non-XY measurements." - NoCorrectionFunction = enum.auto() - """No correction function satisfies the requested flow constraints.""" + NoPauliFlow = enum.auto() + """No Pauli flow is compatible with the requested XZ-corrections.""" class XZCorrectionsOrderErrorReason(Enum): @@ -227,8 +227,8 @@ def __str__(self) -> str: return "The image of the correction function must be a subset of non-input nodes (prepared qubits) of the open graph." case FlowGenericErrorReason.XYPlane: return "Causal flow is only defined on open graphs with XY measurements." - case FlowGenericErrorReason.NoCorrectionFunction: - return "No correction function satisfies the requested flow constraints." + case FlowGenericErrorReason.NoPauliFlow: + return "No Pauli flow is compatible with the requested XZ-corrections." case _: assert_never(self.reason) diff --git a/tests/test_flow_core.py b/tests/test_flow_core.py index 1dbddb9d1..d2ba4e24f 100644 --- a/tests/test_flow_core.py +++ b/tests/test_flow_core.py @@ -415,6 +415,45 @@ def test_corrections_to_pauli_flow(self, test_case: XZCorrectionsTestCase) -> No assert extracted_corrections.z_corrections == corrections.z_corrections assert flow.partial_order_layers == corrections.partial_order_layers + def test_corrections_to_pauli_flow_empty_pattern(self) -> None: + flow = Pattern().extract_xzcorrections().to_pauli_flow() + + assert flow.correction_function == {} + + @pytest.mark.parametrize( + ("og", "x_corrections"), + [ + ( + OpenGraph( + graph=nx.Graph([(0, 1)]), + input_nodes=[0], + output_nodes=[1], + measurements={0: Measurement.Z}, + ), + {0: {1}}, + ), + ( + OpenGraph( + graph=nx.Graph([(0, 1)]), + input_nodes=[], + output_nodes=[1], + measurements={0: Measurement.X}, + ), + {}, + ), + ], + ) + def test_corrections_to_pauli_flow_infeasible( + self, og: OpenGraph[Measurement], x_corrections: dict[int, set[int]] + ) -> None: + corrections = XZCorrections.from_measured_nodes_mapping(og=og, x_corrections=x_corrections) + + with pytest.raises(FlowGenericError) as exc_info: + corrections.to_pauli_flow() + + assert exc_info.value.reason == FlowGenericErrorReason.NoPauliFlow + assert str(exc_info.value) == "No Pauli flow is compatible with the requested XZ-corrections." + @pytest.mark.parametrize("test_case", prepare_test_xzcorrections()) def test_corrections_to_pattern(self, test_case: XZCorrectionsTestCase, fx_rng: Generator) -> None: if test_case.pattern is not None: From 6671969006862c3d64ac4ece087b76766a7d198b Mon Sep 17 00:00:00 2001 From: Travis Crew Date: Sun, 14 Jun 2026 00:28:32 -0500 Subject: [PATCH 3/3] Cover Pauli-flow conversion edge cases --- tests/test_flow_core.py | 46 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/test_flow_core.py b/tests/test_flow_core.py index d2ba4e24f..5663f3283 100644 --- a/tests/test_flow_core.py +++ b/tests/test_flow_core.py @@ -12,6 +12,7 @@ GFlow, PauliFlow, XZCorrections, + _solve_f2_equations, ) from graphix.flow.exceptions import ( FlowError, @@ -441,6 +442,15 @@ def test_corrections_to_pauli_flow_empty_pattern(self) -> None: ), {}, ), + ( + OpenGraph( + graph=nx.Graph([(0, 1), (1, 2)]), + input_nodes=[1], + output_nodes=[2], + measurements={0: Measurement.X, 1: Measurement.X}, + ), + {0: {1}}, + ), ], ) def test_corrections_to_pauli_flow_infeasible( @@ -454,6 +464,42 @@ def test_corrections_to_pauli_flow_infeasible( assert exc_info.value.reason == FlowGenericErrorReason.NoPauliFlow assert str(exc_info.value) == "No Pauli flow is compatible with the requested XZ-corrections." + def test_no_pauli_flow_error_message(self) -> None: + error = FlowGenericError(FlowGenericErrorReason.NoPauliFlow) + + assert str(error) == "No Pauli flow is compatible with the requested XZ-corrections." + + @pytest.mark.parametrize( + ("measurement", "correction_set"), + [ + (Measurement.XY(0.1), {1}), + (Measurement.XZ(0.1), {0, 1}), + (Measurement.YZ(0.1), {0}), + (Measurement.X, {1}), + (Measurement.Y, {0}), + (Measurement.Z, {0}), + ], + ) + def test_corrections_to_pauli_flow_self_conditions( + self, measurement: Measurement, correction_set: set[int] + ) -> None: + flow = PauliFlow( + og=OpenGraph(graph=nx.Graph([(0, 1)]), input_nodes=[], output_nodes=[1], measurements={0: measurement}), + correction_function={0: correction_set}, + partial_order_layers=[{1}, {0}], + ) + + corrections = flow.to_corrections() + extracted_flow = corrections.to_pauli_flow() + + extracted_flow.check_well_formed() + assert extracted_flow.correction_function == flow.correction_function + + def test_solve_f2_equations_degenerate_cases(self) -> None: + assert _solve_f2_equations([], 3) == [0, 0, 0] + assert _solve_f2_equations([], 0) == [] + assert _solve_f2_equations([(set(), 1)], 0) is None + @pytest.mark.parametrize("test_case", prepare_test_xzcorrections()) def test_corrections_to_pattern(self, test_case: XZCorrectionsTestCase, fx_rng: Generator) -> None: if test_case.pattern is not None: