Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/modifier.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Pattern Manipulation

.. automethod:: extract_gflow

.. automethod:: extract_pauli_flow

.. automethod:: extract_opengraph

.. automethod:: extract_measurement_commands
Expand Down
235 changes: 235 additions & 0 deletions graphix/flow/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from typing import TYPE_CHECKING, Generic, TypeVar

import networkx as nx
import numpy as np

# `override` introduced in Python 3.12, `assert_never` introduced in Python 3.11
from typing_extensions import assert_never, override

from graphix._linalg import MatGF2, solve_f2_linear_system
from graphix.circ_ext.extraction import (
CliffordMap,
ExtractionResult,
Expand Down Expand Up @@ -266,6 +268,46 @@ def to_gflow(self: XZCorrections[_PM_co]) -> GFlow[_PM_co]:
gf.check_well_formed() # Raises a `FlowError` if the partial order and the correction function are not compatible.
return gf

def to_pauli_flow(self) -> PauliFlow[_AM_co]:
r"""Extract a Pauli flow from XZ-corrections.

This method does not invoke the Pauli-flow extraction routine on the underlying open graph.
Instead, it reconstructs a correction function compatible with the XZ-corrections and the
intrinsic partial order.

Returns
-------
PauliFlow[_AM_co]

Raises
------
FlowError
If no Pauli-flow correction function is compatible with the XZ-corrections and partial order.

Notes
-----
XZ-corrections only record corrections applied to nodes in the future of a measurement. For
Pauli flow, a correcting set can also contain nodes in the measurement's past or present when
their measurement labels allow it. The missing part of each correcting set is therefore obtained
by solving the Pauli-flow parity constraints over GF(2).
"""
self.check_well_formed()
if not self.og.measurements:
return PauliFlow(self.og, {}, self.partial_order_layers)

future: set[int] = set(self.partial_order_layers[0])
correction_function: dict[int, AbstractSet[int]] = {}

for layer in self.partial_order_layers[1:]:
for measured_node in layer:
correction_function[measured_node] = _xz_corrections_to_pauli_correction_set(
self, measured_node, future
)

future |= set(layer)

return PauliFlow(self.og, correction_function, self.partial_order_layers)

def to_bloch(self: XZCorrections[Measurement]) -> XZCorrections[BlochMeasurement]:
"""Return the XZ-corrections where all measurements in the open graph are converted to Bloch.

Expand Down Expand Up @@ -1341,6 +1383,199 @@ def _corrections_to_partial_order_layers(
return generations[::-1]


def _xz_corrections_to_pauli_correction_set(
xz_corrections: XZCorrections[_AM_co], measured_node: int, future: AbstractSet[int]
) -> frozenset[int]:
fixed_correction_nodes = set(xz_corrections.x_corrections.get(measured_node, set()))
expected_odd_future = set(xz_corrections.z_corrections.get(measured_node, set()))
input_nodes = set(xz_corrections.og.input_nodes)
if fixed_correction_nodes & input_nodes:
raise FlowGenericError(FlowGenericErrorReason.NoPauliFlow)

nonfuture_measured_nodes = set(xz_corrections.og.measurements) - set(future)
measured_label = xz_corrections.og.measurements[measured_node].to_plane_or_axis()
variable_nodes = [
node
for node in sorted(nonfuture_measured_nodes - input_nodes)
if xz_corrections.og.measurements[node].to_plane_or_axis() in {Axis.X, Axis.Y}
and (node != measured_node or measured_label in {Axis.X, Axis.Y})
]
variable_index = {node: index for index, node in enumerate(variable_nodes)}
equations: list[tuple[set[int], int]] = []

if measured_node not in variable_index:
if measured_label in {Plane.XZ, Plane.YZ, Axis.Z}:
if measured_node in input_nodes:
raise FlowGenericError(FlowGenericErrorReason.NoPauliFlow)
fixed_correction_nodes.add(measured_node)
elif measured_label == Plane.XY:
fixed_correction_nodes.discard(measured_node)

for node in future:
_append_odd_neighborhood_equation(
equations,
xz_corrections.og.graph,
node,
variable_index,
fixed_correction_nodes,
int(node in expected_odd_future),
)

for node in nonfuture_measured_nodes - {measured_node}:
meas = xz_corrections.og.measurements[node].to_plane_or_axis()
if meas not in {Axis.X, Axis.Y}:
fixed_correction_nodes.discard(node)
if meas not in {Axis.Y, Axis.Z}:
_append_odd_neighborhood_equation(
equations, xz_corrections.og.graph, node, variable_index, fixed_correction_nodes, 0
)
if meas == Axis.Y:
_append_closed_odd_neighborhood_equation(
equations, xz_corrections.og.graph, node, variable_index, fixed_correction_nodes, 0
)

_append_pauli_flow_self_condition(
equations,
xz_corrections.og.graph,
measured_node,
xz_corrections.og.measurements[measured_node].to_plane_or_axis(),
variable_index,
fixed_correction_nodes,
)

solution = _solve_f2_equations(equations, len(variable_nodes))
if solution is None:
raise FlowGenericError(FlowGenericErrorReason.NoPauliFlow)

correction_set = set(fixed_correction_nodes)
correction_set.update(node for node, index in variable_index.items() if solution[index])
return frozenset(correction_set)


def _append_pauli_flow_self_condition(
equations: list[tuple[set[int], int]],
graph: nx.Graph[int],
node: int,
meas: Plane | Axis,
variable_index: Mapping[int, int],
fixed_correction_nodes: AbstractSet[int],
) -> None:
match meas:
case Plane.XY:
_append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 0)
_append_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 1)
case Plane.XZ:
_append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 1)
_append_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 1)
case Plane.YZ:
_append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 1)
_append_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 0)
case Axis.X:
_append_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 1)
case Axis.Z:
_append_membership_equation(equations, node, variable_index, fixed_correction_nodes, 1)
case Axis.Y:
_append_closed_odd_neighborhood_equation(equations, graph, node, variable_index, fixed_correction_nodes, 1)
case _:
assert_never(meas)


def _append_membership_equation(
equations: list[tuple[set[int], int]],
node: int,
variable_index: Mapping[int, int],
fixed_correction_nodes: AbstractSet[int],
value: int,
) -> None:
coefficients, fixed_value = _membership_terms(node, variable_index, fixed_correction_nodes)
equations.append((coefficients, value ^ fixed_value))


def _append_odd_neighborhood_equation(
equations: list[tuple[set[int], int]],
graph: nx.Graph[int],
node: int,
variable_index: Mapping[int, int],
fixed_correction_nodes: AbstractSet[int],
value: int,
) -> None:
coefficients, fixed_value = _odd_neighborhood_terms(graph, node, variable_index, fixed_correction_nodes)
equations.append((coefficients, value ^ fixed_value))


def _append_closed_odd_neighborhood_equation(
equations: list[tuple[set[int], int]],
graph: nx.Graph[int],
node: int,
variable_index: Mapping[int, int],
fixed_correction_nodes: AbstractSet[int],
value: int,
) -> None:
coefficients, fixed_value = _membership_terms(node, variable_index, fixed_correction_nodes)
odd_coefficients, odd_fixed_value = _odd_neighborhood_terms(graph, node, variable_index, fixed_correction_nodes)

for coefficient in odd_coefficients:
_toggle_coefficient(coefficients, coefficient)

equations.append((coefficients, value ^ fixed_value ^ odd_fixed_value))


def _membership_terms(
node: int, variable_index: Mapping[int, int], fixed_correction_nodes: AbstractSet[int]
) -> tuple[set[int], int]:
if node in variable_index:
return {variable_index[node]}, 0
return set(), int(node in fixed_correction_nodes)


def _odd_neighborhood_terms(
graph: nx.Graph[int],
node: int,
variable_index: Mapping[int, int],
fixed_correction_nodes: AbstractSet[int],
) -> tuple[set[int], int]:
coefficients: set[int] = set()
fixed_value = 0

for neighbor in graph.neighbors(node):
if neighbor in variable_index:
_toggle_coefficient(coefficients, variable_index[neighbor])
elif neighbor in fixed_correction_nodes:
fixed_value ^= 1

return coefficients, fixed_value


def _toggle_coefficient(coefficients: set[int], coefficient: int) -> None:
if coefficient in coefficients:
coefficients.remove(coefficient)
else:
coefficients.add(coefficient)


def _solve_f2_equations(equations: Sequence[tuple[set[int], int]], n_variables: int) -> list[int] | None:
if n_variables == 0:
return [] if all(not coefficients and value == 0 for coefficients, value in equations) else None
if not equations:
return [0] * n_variables

augmented = np.zeros((len(equations), n_variables + 1), dtype=np.uint8).view(MatGF2)
for row, (equation_coefficients, value) in enumerate(equations):
for coefficient in equation_coefficients:
augmented[row, coefficient] = 1
augmented[row, n_variables] = value

reduced = augmented.gauss_elimination(ncols=n_variables, copy=False)
matrix = MatGF2(reduced[:, :n_variables])
constants = MatGF2(reduced[:, n_variables])

if np.any((~matrix.any(axis=1)) & constants.astype(bool)):
return None

solution = solve_f2_linear_system(matrix, constants)
return [int(bit) for bit in solution]


def _check_correction_function_domain(
og: OpenGraph[_AM_co], correction_function: Mapping[int, AbstractSet[int]]
) -> bool:
Expand Down
5 changes: 5 additions & 0 deletions graphix/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class FlowGenericErrorReason(Enum):
XYPlane = enum.auto()
"A causal flow is defined on an open graphs with non-XY measurements."

NoPauliFlow = enum.auto()
"""No Pauli flow is compatible with the requested XZ-corrections."""


class XZCorrectionsOrderErrorReason(Enum):
"""Describe the reason of an `XZCorrectionsOrderError` exception."""
Expand Down Expand Up @@ -224,6 +227,8 @@ def __str__(self) -> str:
return "The image of the correction function must be a subset of non-input nodes (prepared qubits) of the open graph."
case FlowGenericErrorReason.XYPlane:
return "Causal flow is only defined on open graphs with XY measurements."
case FlowGenericErrorReason.NoPauliFlow:
return "No Pauli flow is compatible with the requested XZ-corrections."
case _:
assert_never(self.reason)

Expand Down
25 changes: 25 additions & 0 deletions graphix/pattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,31 @@ def extract_gflow(self) -> GFlow[BlochMeasurement]:
"""
return self.extract_xzcorrections().downcast_bloch().to_gflow()

def extract_pauli_flow(self) -> PauliFlow[Measurement]:
r"""Extract the Pauli flow structure from the current measurement pattern.

This method does not call the flow-extraction routine on the underlying open graph, but constructs
the Pauli flow from the pattern corrections instead.

Returns
-------
PauliFlow[Measurement]
The Pauli flow associated with the current pattern.

Raises
------
FlowError
If the pattern is empty or if the extracted structure does not satisfy
the well-formedness conditions required for a valid Pauli flow.
ValueError
If `N` commands in the pattern do not represent a :math:`|+\rangle` state or if the pattern corrections form closed loops.

Notes
-----
The notes provided in :func:`self.extract_causal_flow` apply here as well.
"""
return self.extract_xzcorrections().to_pauli_flow()

def extract_xzcorrections(self) -> XZCorrections[Measurement]:
"""Extract the XZ-corrections from the current measurement pattern.

Expand Down
Loading
Loading