diff --git a/.github/workflows/python_detailed.yml b/.github/workflows/python_detailed.yml index db2b8bd..8c16a94 100644 --- a/.github/workflows/python_detailed.yml +++ b/.github/workflows/python_detailed.yml @@ -15,6 +15,7 @@ env: BUILD_TYPE: Debug LD_LIBRARY_PATH: /usr/local/lib WIN_LIBOQS_INSTALL_PATH: C:\liboqs + VERSION: 0.14.0 jobs: build: @@ -42,8 +43,11 @@ jobs: - name: Install liboqs POSIX if: matrix.os != 'windows-latest' run: | - git clone --branch main --single-branch --depth 1 https://github.com/open-quantum-safe/liboqs - cmake -S liboqs -B liboqs/build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DBUILD_SHARED_LIBS=ON -DOQS_BUILD_ONLY_LIB=ON + git clone --branch ${{env.VERSION}} --single-branch --depth 1 https://github.com/open-quantum-safe/liboqs + cmake -S liboqs -B liboqs/build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DBUILD_SHARED_LIBS=ON \ + -DOQS_BUILD_ONLY_LIB=ON -DOQS_BUILD_ONLY_LIB=ON -DOQS_ENABLE_SIG_STFL_LMS=ON \ + -DOQS_ENABLE_SIG_STFL_XMSS=ON -DOQS_HAZARDOUS_EXPERIMENTAL_ENABLE_SIG_STFL_KEY_SIG_GEN=ON \ + -DOQS_ALLOW_STFL_KEY_AND_SIG_GEN=ON cmake --build liboqs/build --parallel 4 sudo cmake --build liboqs/build --target install @@ -56,6 +60,8 @@ jobs: uv run examples/sig.py echo uv run examples/rand.py + echo + uv run examples/stfl_sig.py - name: Run unit tests POSIX if: matrix.os != 'windows-latest' @@ -66,8 +72,8 @@ jobs: if: matrix.os == 'windows-latest' shell: cmd run: | - git clone --branch main --single-branch --depth 1 https://github.com/open-quantum-safe/liboqs - cmake -S liboqs -B liboqs\build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DCMAKE_INSTALL_PREFIX=${{env.WIN_LIBOQS_INSTALL_PATH}} -DBUILD_SHARED_LIBS=ON -DOQS_BUILD_ONLY_LIB=ON + git clone --branch ${{env.VERSION}} --single-branch --depth 1 https://github.com/open-quantum-safe/liboqs + cmake -S liboqs -B liboqs\build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DCMAKE_INSTALL_PREFIX=${{env.WIN_LIBOQS_INSTALL_PATH}} -DBUILD_SHARED_LIBS=ON -DOQS_BUILD_ONLY_LIB=ON -DOQS_BUILD_ONLY_LIB=ON -DOQS_ENABLE_SIG_STFL_LMS=ON -DOQS_ENABLE_SIG_STFL_XMSS=ON -DOQS_HAZARDOUS_EXPERIMENTAL_ENABLE_SIG_STFL_KEY_SIG_GEN=ON ‑DOQS_ALLOW_STFL_KEY_AND_SIG_GEN=ON cmake --build liboqs\build --parallel 4 cmake --build liboqs\build --target install @@ -82,6 +88,8 @@ jobs: uv run examples/sig.py echo. uv run examples/rand.py + echo. + uv run examples/stfl_sig.py - name: Run unit tests Windows shell: cmd diff --git a/.github/workflows/python_simplified.yml b/.github/workflows/python_simplified.yml index c1fed63..18a0f8f 100644 --- a/.github/workflows/python_simplified.yml +++ b/.github/workflows/python_simplified.yml @@ -37,6 +37,7 @@ jobs: uv run examples/kem.py uv run examples/sig.py uv run examples/rand.py + uv run examples/stfl_sig.py - name: Run unit tests run: | diff --git a/Dockerfile b/Dockerfile index e443fd8..0423c08 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,8 +7,12 @@ RUN apt-get -y update && \ # Get liboqs RUN git clone --depth 1 --branch main https://github.com/open-quantum-safe/liboqs -# Install liboqs -RUN cmake -S liboqs -B liboqs/build -DBUILD_SHARED_LIBS=ON && \ +# Install liboqs with stateful-signature algorithms enabled +RUN cmake -S liboqs -B liboqs/build \ + -DBUILD_SHARED_LIBS=ON \ + -DOQS_ENABLE_SIG_STFL_LMS=ON \ + -DOQS_ENABLE_SIG_STFL_XMSS=ON \ + -DOQS_HAZARDOUS_EXPERIMENTAL_ENABLE_SIG_STFL_KEY_SIG_GEN=ON && \ cmake --build liboqs/build --parallel 4 && \ cmake --build liboqs/build --target install diff --git a/README.md b/README.md index 2a5d267..6d79839 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ The project contains the following files and directories - `examples/kem.py`: key encapsulation example - `examples/rand.py`: RNG example - `examples/sig.py`: signature example +- `examples/stfl_sig.py`: stateful signature example - `tests`: unit tests --- @@ -141,6 +142,7 @@ Execute ```shell python3 liboqs-python/examples/kem.py python3 liboqs-python/examples/sig.py +python3 liboqs-python/examples/stfl_sig.py python3 liboqs-python/examples/rand.py ``` @@ -162,11 +164,14 @@ liboqs-python can be imported into Python programs with import oqs ``` -liboqs-python defines two main classes: `KeyEncapsulation` and `Signature`, -providing post-quantum key encapsulation and signature mechanisms, -respectively. Each must be instantiated with a string identifying one of -mechanisms supported by liboqs; these can be enumerated using the -`get_enabled_KEM_mechanisms()` and `get_enabled_sig_mechanisms()` functions. +liboqs-python defines three main classes: `KeyEncapsulation`, `Signature`, and +`StatefulSignature`, providing post-quantum key encapsulation as well as +stateless and stateful signature mechanisms. Each must be instantiated with a +string identifying one of the mechanisms supported by liboqs; these can be +enumerated using the `get_enabled_kem_mechanisms()`, +`get_enabled_sig_mechanisms()` and `get_enabled_stateful_sig_mechanisms()` +functions. ML-KEM key pairs can also be deterministically generated from a +seed using `KeyEncapsulation.generate_keypair_seed()`. The files in `examples/` demonstrate the wrapper's API. Support for alternative RNGs is provided via the `randombytes_*()` functions. diff --git a/RELEASE.md b/RELEASE.md index 65de4f2..bdebfa6 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,14 @@ -# liboqs-python version 0.12.0 +# liboqs-python version 0.14.0 --- +# Added in version 0.14.0 July 2025 + +- Added stateful signature support via the `StatefulSignature` class. +- New enumeration helpers `get_enabled_stateful_sig_mechanisms()` and + `get_supported_stateful_sig_mechanisms()`. +- Updated to liboqs 0.14.0. +- ML-KEM keys can be generated from a seed via + `KeyEncapsulation.generate_keypair_seed()`. ## About @@ -24,9 +32,9 @@ See in particular limitations on intended use. ## Release notes -This release of liboqs-python was released on January 15, 2025. Its release +This release of liboqs-python was released on July 10, 2025. Its release page on GitHub is -https://github.com/open-quantum-safe/liboqs-python/releases/tag/0.12.0. +https://github.com/open-quantum-safe/liboqs-python/releases/tag/0.14.0. --- diff --git a/docker/Dockerfile b/docker/Dockerfile index cc7739f..8a2ce86 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,7 +1,9 @@ # Multi-stage build: First the full builder image: # liboqs build type variant; maximum portability of image; no openssl dependency: -ARG LIBOQS_BUILD_DEFINES="-DOQS_DIST_BUILD=ON -DBUILD_SHARED_LIBS=ON -DOQS_USE_OPENSSL=OFF" +ARG LIBOQS_BUILD_DEFINES="-DOQS_DIST_BUILD=ON -DBUILD_SHARED_LIBS=ON -DOQS_USE_OPENSSL=OFF \ + -DOQS_ENABLE_SIG_STFL_LMS=ON -DOQS_ENABLE_SIG_STFL_XMSS=ON \ + -DOQS_HAZARDOUS_EXPERIMENTAL_ENABLE_SIG_STFL_KEY_SIG_GEN=ON" # make build arguments: Adding -j here speeds up build but may tax hardware ARG MAKE_DEFINES="-j 2" diff --git a/docker/Dockerfile-simple b/docker/Dockerfile-simple index 4df6a6c..931639f 100644 --- a/docker/Dockerfile-simple +++ b/docker/Dockerfile-simple @@ -1,7 +1,9 @@ # Multi-stage build: First the full builder image: # liboqs build type variant; maximum portability of image; no openssl dependency: -ARG LIBOQS_BUILD_DEFINES="-DOQS_DIST_BUILD=ON -DBUILD_SHARED_LIBS=ON -DOQS_USE_OPENSSL=OFF" +ARG LIBOQS_BUILD_DEFINES="-DOQS_DIST_BUILD=ON -DBUILD_SHARED_LIBS=ON -DOQS_USE_OPENSSL=OFF \ + -DOQS_ENABLE_SIG_STFL_LMS=ON -DOQS_ENABLE_SIG_STFL_XMSS=ON \ + -DOQS_HAZARDOUS_EXPERIMENTAL_ENABLE_SIG_STFL_KEY_SIG_GEN=ON" FROM alpine:3.16 as intermediate # Take in all global args diff --git a/docker/minitest.py b/docker/minitest.py index 13d14a7..5cc7bb1 100644 --- a/docker/minitest.py +++ b/docker/minitest.py @@ -1,4 +1,5 @@ import ssl +import sys import urllib.request import json import os @@ -7,7 +8,7 @@ # Example code testing oqs signature functionality. See more example code at # https://github.com/open-quantum-safe/liboqs-python/tree/main/examples -message = "This is the message to sign".encode() +message = b"This is the message to sign" # create signer and verifier with sample signature mechanisms sigalg = "Dilithium2" @@ -17,28 +18,32 @@ signature = signer.sign(message) is_valid = verifier.verify(message, signature, signer_public_key) -if (not is_valid): +if not is_valid: print("Failed to validate signature. Exiting.") - exit(1) + sys.exit(1) else: print("Validated signature for OQS algorithm %s" % (sigalg)) # Example code iterating over all supported OQS algorithms integrated into TLS -sslContext= ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +sslContext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) sslContext.verify_mode = ssl.CERT_REQUIRED # Trust LetsEncrypt root CA: sslContext.load_verify_locations(cafile="isrgrootx1.pem") # Retrieve interop test server root CA -with urllib.request.urlopen('https://test.openquantumsafe.org/CA.crt', context=sslContext) as response: - data=response.read() +with urllib.request.urlopen( + "https://test.openquantumsafe.org/CA.crt", context=sslContext +) as response: + data = response.read() with open("CA.crt", "w+b") as f: f.write(data) # Retrieve JSON structure of all alg/port combinations: -with urllib.request.urlopen('https://test.openquantumsafe.org/assignments.json', context=sslContext) as response: - assignments=json.loads(response.read()) +with urllib.request.urlopen( + "https://test.openquantumsafe.org/assignments.json", context=sslContext +) as response: + assignments = json.loads(response.read()) # Trust test.openquantumsafe.org root CA: sslContext.load_verify_locations(cafile="CA.crt") @@ -46,17 +51,23 @@ # Iterate over all algorithm/port combinations: for sigs, kexs in assignments.items(): for kex, port in kexs.items(): - if (kex != "*"): # '*' denoting any classic KEX alg + if kex != "*": # '*' denoting any classic KEX alg # Enable use of the specific QSC KEX algorithm - os.environ["TLS_DEFAULT_GROUPS"]=kex - try: - with urllib.request.urlopen('https://test.openquantumsafe.org:'+str(port), context=sslContext) as response: - if response.getcode() != 200: - print("Failed to test %s successfully" % (kex)) - else: - print("Success testing %s at port %d" % (kex, port)) - except: - print("Test of algorithm combination SIG %s/KEX %s failed. Are all algorithms supported by current OQS library?" % (sigs, kex)) + os.environ["TLS_DEFAULT_GROUPS"] = kex + try: + with urllib.request.urlopen( + "https://test.openquantumsafe.org:" + str(port), context=sslContext + ) as response: + if response.getcode() != 200: + print("Failed to test %s successfully" % (kex)) + else: + print("Success testing %s at port %d" % (kex, port)) + except: + print( + "Test of algorithm combination SIG %s/KEX %s failed. " + "Are all algorithms supported by current OQS library?" + % (sigs, kex) + ) if "SHORT_TEST" in os.environ: - exit(0) + sys.exit(0) diff --git a/examples/kem.py b/examples/kem.py index 799f68b..7a382f4 100644 --- a/examples/kem.py +++ b/examples/kem.py @@ -42,3 +42,30 @@ "Shared secretes coincide: %s", shared_secret_client == shared_secret_server, ) + +# Example for using a seed to generate a keypair. +kemalg = "ML-KEM-512" +seed = b"This is a 64-byte seed for key generation" + b"\x00" * 23 +with oqs.KeyEncapsulation(kemalg) as client: + with oqs.KeyEncapsulation(kemalg) as server: + logger.info("Key encapsulation details:\n%s", pformat(client.details)) + + # Client generates its keypair + public_key_client = client.generate_keypair_seed(seed) + # Optionally, the secret key can be obtained by calling export_secret_key() + # and the client can later be re-instantiated with the key pair: + # secret_key_client = client.export_secret_key() + + # Store key pair, wait... (session resumption): + # client = oqs.KeyEncapsulation(kemalg, secret_key_client) + + # The server encapsulates its secret using the client's public key + ciphertext, shared_secret_server = server.encap_secret(public_key_client) + + # The client decapsulates the server's ciphertext to obtain the shared secret + shared_secret_client = client.decap_secret(ciphertext) + + logger.info( + "Shared secretes coincide: %s", + shared_secret_client == shared_secret_server, + ) diff --git a/examples/stfl_sig.py b/examples/stfl_sig.py new file mode 100644 index 0000000..da312a2 --- /dev/null +++ b/examples/stfl_sig.py @@ -0,0 +1,44 @@ +# Stateful signature examples + +import logging +from pprint import pformat +from sys import stdout + +import oqs +from oqs import StatefulSignature + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +logger.addHandler(logging.StreamHandler(stdout)) + +logger.info("liboqs version: %s", oqs.oqs_version()) +logger.info("liboqs-python version: %s", oqs.oqs_python_version()) +logger.info( + "Enabled stateful signature mechanisms:\n%s", + pformat(oqs.get_enabled_stateful_sig_mechanisms(), compact=True), +) + +message = b"This is the message to sign" + +# Create signer and verifier with sample signature mechanisms +stfl_sigalg = "XMSS-SHA2_10_256" +with StatefulSignature(stfl_sigalg) as signer, StatefulSignature(stfl_sigalg) as verifier: + logger.info("Signature details:\n%s", pformat(signer.details)) + + # Signer generates its keypair + signer_public_key = signer.generate_keypair() + logger.info("Generated public key:\n%s", signer_public_key.hex()) + # Optionally, the secret key can be obtained by calling export_secret_key() + # and the signer can later be re-instantiated with the key pair: + # secret_key = signer.export_secret_key() + + # Store key pair, wait... (session resumption): + # signer = oqs.Signature(sigalg, secret_key) + + # Signer signs the message + signature = signer.sign(message) + + # Verifier verifies the signature + is_valid = verifier.verify(message, signature, signer_public_key) + + logger.info("Valid signature? %s", is_valid) diff --git a/oqs/__init__.py b/oqs/__init__.py index f48ffe3..0fc5f83 100644 --- a/oqs/__init__.py +++ b/oqs/__init__.py @@ -5,12 +5,16 @@ MechanismNotEnabledError, MechanismNotSupportedError, Signature, + StatefulSignature, get_enabled_kem_mechanisms, get_enabled_sig_mechanisms, + get_enabled_stateful_sig_mechanisms, get_supported_kem_mechanisms, get_supported_sig_mechanisms, + get_supported_stateful_sig_mechanisms, is_kem_enabled, is_sig_enabled, + sig_supports_context, native, oqs_python_version, oqs_version, @@ -23,13 +27,17 @@ "MechanismNotEnabledError", "MechanismNotSupportedError", "Signature", + "StatefulSignature", "get_enabled_kem_mechanisms", "get_enabled_sig_mechanisms", + "get_enabled_stateful_sig_mechanisms", "get_supported_kem_mechanisms", "get_supported_sig_mechanisms", + "get_supported_stateful_sig_mechanisms", "is_kem_enabled", "is_sig_enabled", "native", "oqs_python_version", "oqs_version", + "sig_supports_context", ) diff --git a/oqs/oqs.py b/oqs/oqs.py index 1320a37..df5bc20 100644 --- a/oqs/oqs.py +++ b/oqs/oqs.py @@ -17,18 +17,32 @@ import subprocess import tempfile # to install liboqs on demand import time + +try: + import tomllib # Python 3.11+ +except ImportError: # Fallback for older versions + import tomli as tomllib import warnings from os import environ from pathlib import Path from sys import stdout -from typing import TYPE_CHECKING, Any, ClassVar, Final, TypeVar, Union, cast - +from typing import ( + TYPE_CHECKING, + Any, + ClassVar, + Final, + TypeVar, + Union, + cast, + Optional, +) if TYPE_CHECKING: - from collections.abc import Sequence + from collections.abc import Sequence, Iterable from types import TracebackType TKeyEncapsulation = TypeVar("TKeyEncapsulation", bound="KeyEncapsulation") TSignature = TypeVar("TSignature", bound="Signature") +TStatefulSignature = TypeVar("TStatefulSignature", bound="StatefulSignature") logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -45,7 +59,20 @@ def oqs_python_version() -> Union[str, None]: result = importlib.metadata.version("liboqs-python") except importlib.metadata.PackageNotFoundError: warnings.warn("Please install liboqs-python using pip install", stacklevel=2) - return None + pyproject = Path(__file__).resolve().parent.parent / "pyproject.toml" + try: + # Fallback to version specified in pyproject.toml when running from the + # source tree. This allows workflows to use the correct liboqs version + # before the package is installed. + with pyproject.open("rb") as f: + data = tomllib.load(f) + return data["project"]["version"] + except (FileNotFoundError, KeyError, tomllib.TOMLDecodeError): + warnings.warn( + "Please install liboqs-python using pip install", + stacklevel=2, + ) + return None return result @@ -119,6 +146,10 @@ def _install_liboqs( oqs_version_to_install: Union[str, None] = None, ) -> None: """Install liboqs version oqs_version (if None, installs latest at HEAD) in the target_directory.""" # noqa: E501 + if "rc" in oqs_version_to_install: + # removed the "-" from the version string + tmp = oqs_version_to_install.split("rc") + oqs_version_to_install = tmp[0] + "-rc" + tmp[1] with tempfile.TemporaryDirectory() as tmpdirname: oqs_install_cmd = [ "cd", @@ -143,6 +174,11 @@ def _install_liboqs( "liboqs/build", "-DBUILD_SHARED_LIBS=ON", "-DOQS_BUILD_ONLY_LIB=ON", + # Stateful signature algorithms: + "-DOQS_ENABLE_SIG_STFL_LMS=ON", # LMS family + "-DOQS_ENABLE_SIG_STFL_XMSS=ON", # XMSS family + # To support key-generation. + "-DOQS_HAZARDOUS_EXPERIMENTAL_ENABLE_SIG_STFL_KEY_SIG_GEN=ON", f"-DCMAKE_INSTALL_PREFIX={target_directory}", ], ) @@ -254,19 +290,40 @@ def oqs_version() -> str: class MechanismNotSupportedError(Exception): """Exception raised when an algorithm is not supported by OQS.""" - def __init__(self, alg_name: str) -> None: - """:param alg_name: requested algorithm name.""" + def __init__(self, alg_name: str, supported: Optional[Iterable[str]] = None) -> None: + """ + Initialize the exception. + + :param alg_name: Requested algorithm name. + :param supported: A list of supported algorithms to include in the message. + Defaults to `None`. + """ + supported_str = "" + if supported is not None: + supported_str = ", ".join(supported) + supported_str = f". Supported algorithms: {supported_str}" + self.alg_name = alg_name - self.message = f"{alg_name} is not supported by OQS" + self.message = f"{alg_name} is not supported by OQS" + supported_str class MechanismNotEnabledError(MechanismNotSupportedError): """Exception raised when an algorithm is supported but not enabled by OQS.""" - def __init__(self, alg_name: str) -> None: - """:param alg_name: requested algorithm name.""" + def __init__(self, alg_name: str, enabled: Optional[Iterable[str]] = None) -> None: + """ + Initialize the exception. + + :param alg_name: Requested algorithm name. + :param enabled: A list of enabled algorithms to include in the message. Defaults to `None`. + """ + enabled_str = "" + if enabled is not None: + enabled_str = ", ".join(enabled) + enabled_str = f". Enabled algorithms: {enabled_str}" + self.alg_name = alg_name - self.message = f"{alg_name} is supported but not enabled by OQS" + self.message = f"{alg_name} is supported but not enabled by OQS" + enabled_str class KeyEncapsulation(ct.Structure): @@ -275,23 +332,26 @@ class KeyEncapsulation(ct.Structure): The wrapper maps methods to the C equivalent as follows: - Python | C liboqs + Python | C liboqs ------------------------------- - generate_keypair | keypair - encap_secret | encaps - decap_secret | decaps - free | OQS_KEM_free + generate_keypair | keypair + generate_keypair_seed | keypair + encap_secret | encaps + decap_secret | decaps + free | OQS_KEM_free """ _fields_: ClassVar[Sequence[tuple[str, Any]]] = [ ("method_name", ct.c_char_p), ("alg_version", ct.c_char_p), ("claimed_nist_level", ct.c_ubyte), - ("ind_cca", ct.c_ubyte), + ("ind_cca", ct.c_bool), ("length_public_key", ct.c_size_t), ("length_secret_key", ct.c_size_t), ("length_ciphertext", ct.c_size_t), ("length_shared_secret", ct.c_size_t), + ("length_keypair_seed", ct.c_size_t), + ("keypair_derand_cb", ct.c_void_p), ("keypair_cb", ct.c_void_p), ("encaps_cb", ct.c_void_p), ("decaps_cb", ct.c_void_p), @@ -323,6 +383,7 @@ def __init__(self, alg_name: str, secret_key: Union[int, bytes, None] = None) -> self.length_secret_key = self._kem.contents.length_secret_key self.length_ciphertext = self._kem.contents.length_ciphertext self.length_shared_secret = self._kem.contents.length_shared_secret + self.length_keypair_seed = self._kem.contents.length_keypair_seed self.details = { "name": self.method_name.decode(), @@ -333,6 +394,7 @@ def __init__(self, alg_name: str, secret_key: Union[int, bytes, None] = None) -> "length_secret_key": int(self.length_secret_key), "length_ciphertext": int(self.length_ciphertext), "length_shared_secret": int(self.length_shared_secret), + "length_keypair_seed": int(self.length_keypair_seed), } if secret_key: @@ -352,6 +414,39 @@ def __exit__( ) -> None: self.free() + def generate_keypair_seed(self, seed: bytes) -> bytes: + """ + Generate a new keypair using the provided seed and returns the public key. + + :param seed: A seed to use for key generation. + If the seed is None, a random seed will be generated. + """ + if self.length_keypair_seed == 0: + msg = f"Key generation with seed is not supported. Got {self.alg_name}." + raise RuntimeError(msg) + + if len(seed) != self._kem.contents.length_keypair_seed: + msg = ( + f"Seed length must be {self._kem.contents.length_keypair_seed} bytes, " + f"got {len(seed)} bytes" + ) + raise ValueError(msg) + + c_seed = ct.create_string_buffer(seed, self._kem.contents.length_keypair_seed) + public_key = ct.create_string_buffer(self._kem.contents.length_public_key) + self.secret_key = ct.create_string_buffer(self._kem.contents.length_secret_key) + + rv = native().OQS_KEM_keypair_derand( + self._kem, + ct.byref(public_key), + ct.byref(self.secret_key), + c_seed, + ) + if rv == OQS_SUCCESS: + return bytes(public_key) + msg = "Can not generate keypair with provided seed" + raise RuntimeError(msg) + def generate_keypair(self) -> bytes: """ Generate a new keypair and returns the public key. @@ -484,14 +579,17 @@ class Signature(ct.Structure): ("method_name", ct.c_char_p), ("alg_version", ct.c_char_p), ("claimed_nist_level", ct.c_ubyte), - ("euf_cma", ct.c_ubyte), - ("sig_with_ctx_support", ct.c_ubyte), + ("euf_cma", ct.c_bool), + ("suf_cma", ct.c_bool), + ("sig_with_ctx_support", ct.c_bool), ("length_public_key", ct.c_size_t), ("length_secret_key", ct.c_size_t), ("length_signature", ct.c_size_t), ("keypair_cb", ct.c_void_p), ("sign_cb", ct.c_void_p), + ("sign_with_ctx_cb", ct.c_void_p), ("verify_cb", ct.c_void_p), + ("verify_with_ctx_cb", ct.c_void_p), ] def __init__(self, alg_name: str, secret_key: Union[int, bytes, None] = None) -> None: @@ -515,7 +613,7 @@ def __init__(self, alg_name: str, secret_key: Union[int, bytes, None] = None) -> self.alg_version = self._sig.contents.alg_version self.claimed_nist_level = self._sig.contents.claimed_nist_level self.euf_cma = self._sig.contents.euf_cma - self.sig_with_ctx_support = self._sig.contents.sig_with_ctx_support + self.sig_with_ctx_support = bool(self._sig.contents.sig_with_ctx_support) self.length_public_key = self._sig.contents.length_public_key self.length_secret_key = self._sig.contents.length_secret_key self.length_signature = self._sig.contents.length_signature @@ -525,6 +623,8 @@ def __init__(self, alg_name: str, secret_key: Union[int, bytes, None] = None) -> "version": self.alg_version.decode(), "claimed_nist_level": int(self.claimed_nist_level), "is_euf_cma": bool(self.euf_cma), + "is_suf_cma": bool(self.suf_cma), + "supports_context_signing": bool(self.sig_with_ctx_support), "sig_with_ctx_support": bool(self.sig_with_ctx_support), "length_public_key": int(self.length_public_key), "length_secret_key": int(self.length_secret_key), @@ -595,7 +695,7 @@ def sign(self, message: bytes) -> bytes: self.secret_key, ) if rv == OQS_SUCCESS: - return bytes(cast(bytes, c_signature[: c_signature_len.value])) + return bytes(cast("bytes", c_signature[: c_signature_len.value])) msg = "Can not sign message" raise RuntimeError(msg) @@ -635,7 +735,8 @@ def sign_with_ctx_str(self, message: bytes, context: bytes) -> bytes: :param message: the message to sign. """ if context and not self._sig.contents.sig_with_ctx_support: - msg = "Signing with context string not supported" + msg = (f"Signing with context is not supported for: " + f"{self._sig.contents.method_name.decode()}") raise RuntimeError(msg) # Provide length to avoid extra null char @@ -662,7 +763,7 @@ def sign_with_ctx_str(self, message: bytes, context: bytes) -> bytes: self.secret_key, ) if rv == OQS_SUCCESS: - return bytes(cast(bytes, c_signature[: c_signature_len.value])) + return bytes(cast("bytes", c_signature[: c_signature_len.value])) msg = "Can not sign message with context string" raise RuntimeError(msg) @@ -729,16 +830,27 @@ def __repr__(self) -> str: native().OQS_SIG_new.restype = ct.POINTER(Signature) native().OQS_SIG_alg_identifier.restype = ct.c_char_p +native().OQS_SIG_supports_ctx_str.restype = ct.c_bool + def is_sig_enabled(alg_name: str) -> bool: """ - Return True if the signature algorithm is enabled. + Return `True` if the signature algorithm is enabled. - :param alg_name: a signature mechanism algorithm name. + :param alg_name: A signature mechanism algorithm name. """ return native().OQS_SIG_alg_is_enabled(ct.create_string_buffer(alg_name.encode())) +def sig_supports_context(alg_name: str) -> bool: + """ + Return `True` if the signature algorithm supports signing with a context string. + + :param alg_name: A signature mechanism algorithm name. + """ + return bool(native().OQS_SIG_supports_ctx_str(ct.create_string_buffer(alg_name.encode()))) + + _sig_alg_ids = [native().OQS_SIG_alg_identifier(i) for i in range(native().OQS_SIG_alg_count())] _supported_sigs: tuple[str, ...] = tuple([i.decode() for i in _sig_alg_ids]) _enabled_sigs: tuple[str, ...] = tuple([i for i in _supported_sigs if is_sig_enabled(i)]) @@ -752,3 +864,329 @@ def get_enabled_sig_mechanisms() -> tuple[str, ...]: def get_supported_sig_mechanisms() -> tuple[str, ...]: """Return the list of supported signature mechanisms.""" return _supported_sigs + + +# Check enabled algorithms +native().OQS_SIG_STFL_alg_identifier.restype = ct.c_char_p + + +def is_stateful_sig_enabled(alg_name: str) -> bool: + """Check if a stateful signature algorithm is enabled.""" + return native().OQS_SIG_STFL_alg_is_enabled(ct.create_string_buffer(alg_name.encode())) + + +_supported_stateful_sigs: tuple[str, ...] = tuple( + native().OQS_SIG_STFL_alg_identifier(i).decode() + for i in range(native().OQS_SIG_STFL_alg_count()) +) +_enabled_stateful_sigs: tuple[str, ...] = tuple( + alg for alg in _supported_stateful_sigs if is_stateful_sig_enabled(alg) +) + + +def get_enabled_stateful_sig_mechanisms() -> tuple[str, ...]: + """Return a list of enabled stateful signature mechanisms.""" + return _enabled_stateful_sigs + + +def get_supported_stateful_sig_mechanisms() -> tuple[str, ...]: + """Return a list of supported stateful signature mechanisms.""" + return _supported_stateful_sigs + + +def _filter_stfl_names(alg_name: str, alg_names: Iterable[str]) -> Optional[list[str]]: + """Filter and return only stateful signature algorithm names.""" + if alg_name.startswith("LMS"): + return [name for name in alg_names if name.startswith("LMS")] + if alg_name.startswith("XMSS"): + return [name for name in alg_names if name.startswith("XMSS")] + if alg_name.startswith("XMSSMT"): + return [name for name in alg_names if name.startswith("XMSSMT")] + return None + + +def _check_alg(alg_name: str) -> None: + """Check if the algorithm is supported and enabled.""" + if alg_name not in _supported_stateful_sigs: + _filtered_names = _filter_stfl_names(alg_name, _supported_stateful_sigs) + if _filtered_names is None: + raise MechanismNotSupportedError(alg_name) + raise MechanismNotSupportedError(alg_name, supported=_filtered_names) + if alg_name not in _enabled_stateful_sigs: + sup = _filter_stfl_names(alg_name, _enabled_stateful_sigs) + raise MechanismNotEnabledError(alg_name, enabled=sup) + + +class StatefulSignature(ct.Structure): + """ + An OQS StatefulSignature wraps native/C liboqs OQS_SIG structs. + + The wrapper maps methods to the C equivalent as follows: + + Python | C liboqs + ------------------------------- + generate_keypair | keypair + sign | sign + verify | verify + free | OQS_SIG_STFL_free + sigs_remaining | OQS_SIG_STFL_sigs_remaining + sigs_total | OQS_SIG_STFL_sigs_total + + """ + + _fields_: ClassVar[Sequence[tuple[str, Any]]] = [ + ("oid", ct.c_uint32), + ("method_name", ct.c_char_p), + ("alg_version", ct.c_char_p), + ("euf_cma", ct.c_bool), + ("suf_cma", ct.c_bool), + ("length_public_key", ct.c_size_t), + ("length_secret_key", ct.c_size_t), + ("length_signature", ct.c_size_t), + ("keypair_cb", ct.c_void_p), + ("sign_cb", ct.c_void_p), + ("verify_cb", ct.c_void_p), + ("sigs_remaining_cb", ct.c_void_p), + ("sigs_total_cb", ct.c_void_p), + ] + + def __init__(self, alg_name: str, secret_key: Optional[bytes] = None) -> None: + """ + Create a new stateful signature instance with the given algorithm. + + :param alg_name: A stateful signature mechanism algorithm name. + :param secret_key: Optional secret key to load. + """ + super().__init__() + + _check_alg(alg_name) + self._sig = native().OQS_SIG_STFL_new(ct.create_string_buffer(alg_name.encode())) + if not self._sig: + msg = f"Could not allocate OQS_SIG_STFL for {alg_name}" + raise RuntimeError(msg) + + for field, _ctype in self._fields_: + if field == "oid" or field.endswith("cb"): + continue + setattr(self, field, getattr(self._sig.contents, field)) + + self._secret_key: ct.c_void_p | None = None + self._owns_secret = False + self._used_keys: list[bytes] = [] + self._store_cb: Optional[ct.CFUNCTYPE] = None + + if secret_key is not None: + self._load_secret_key(secret_key) + + self.details = { + "name": self.method_name.decode(), + "version": self.alg_version.decode(), + "is_euf_cma": bool(self.euf_cma), + "is_suf_cma": bool(self.suf_cma), + "length_public_key": int(self.length_public_key), + "length_secret_key": int(self.length_secret_key), + "length_signature": int(self.length_signature), + } + + def _attach_store_cb(self) -> None: + """Attach a callback to store used keys in the stateful signature.""" + + @ct.CFUNCTYPE(ct.c_int, ct.POINTER(ct.c_uint8), ct.c_size_t, ct.c_void_p) + def _cb(buf: bytes, length: int, _: ct.c_void_p) -> int: + self._used_keys.append(ct.string_at(buf, length)) + return OQS_SUCCESS + + self._store_cb = _cb # keep ref + native().OQS_SIG_STFL_SECRET_KEY_SET_store_cb(self._secret_key, self._store_cb, None) + + def _new_secret_key(self) -> None: + """Create a new secret key for the stateful signature.""" + self._secret_key = native().OQS_SIG_STFL_SECRET_KEY_new(self.method_name) + if not self._secret_key: + msg = "Could not allocate OQS_SIG_STFL_SECRET_KEY" + raise MemoryError(msg) + self._attach_store_cb() + + def _load_secret_key(self, data: bytes) -> None: + """Load a secret key from bytes.""" + self._new_secret_key() + buf = ct.create_string_buffer(data, len(data)) + rc = native().OQS_SIG_STFL_SECRET_KEY_deserialize(self._secret_key, buf, len(data), None) + if rc != OQS_SUCCESS: + msg = "Secret‑key deserialization failed" + raise RuntimeError(msg) + + def generate_keypair(self) -> bytes: + """ + Generate a new keypair for the stateful signature. + + :raise ValueError: If the keypair has already been generated. + :raise RuntimeError: If the keypair generation fails or if a keypair already exists. + :return: The generated public key as bytes. + """ + if self._secret_key is not None: + msg = "Keypair already generated, call free() to release the secret key" + raise ValueError(msg) + + self._secret_key = native().OQS_SIG_STFL_SECRET_KEY_new(self.method_name) + if not self._secret_key: + msg = "Could not allocate OQS_SIG_STFL_SECRET_KEY" + raise RuntimeError(msg) + self._attach_store_cb() + + sig_struct = self._sig.contents + pk_buf = ct.create_string_buffer(sig_struct.length_public_key) + + rc = native().OQS_SIG_STFL_keypair(self._sig, pk_buf, self._secret_key) + if rc != OQS_SUCCESS: + msg = "Keypair generation failed" + raise RuntimeError(msg) + return pk_buf.raw + + def sign(self, message: bytes) -> bytes: + """ + Sign the provided message and return the signature. + + :param message: The message to sign. + :raises NotImplementedError: If the method is LMS-based, as it is verify-only supported. + :raises RuntimeError: If the secret key is not initialized. + :raises ValueError: If the signing fails. + :return: The signature on the message as bytes. + """ + if self.method_name.startswith(b"LMS"): + msg = "LMS algorithms are verify‑only supported." + raise NotImplementedError(msg) + if self._secret_key is None: + msg = "Secret key not initialised – call generate_keypair() first" + raise RuntimeError(msg) + c_signature = ct.create_string_buffer(self.length_signature) + c_signature_len = ct.c_size_t(self.length_signature) + msg_buf = ct.create_string_buffer(message, len(message)) + rc = native().OQS_SIG_STFL_sign( + self._sig, + c_signature, + ct.byref(c_signature_len), + msg_buf, + len(message), + self._secret_key, + ) + if rc != OQS_SUCCESS: + msg = "Signing failed" + raise ValueError(msg) + return bytes(cast("bytes", c_signature[: c_signature_len.value])) + + def verify(self, message: bytes, signature: bytes, public_key: bytes) -> bool: + """ + Verify the provided signature on the message; returns True if valid. + + :param message: The signed message. + :param signature: The signature on the message. + :param public_key: The signer's public key. + :return: `True` if the signature is valid, `False` otherwise. + """ + msg = ct.create_string_buffer(message, len(message)) + sig = ct.create_string_buffer(signature, len(signature)) + pk = ct.create_string_buffer(public_key, len(public_key)) + rc = native().OQS_SIG_STFL_verify(self._sig, msg, len(message), sig, len(signature), pk) + return rc == OQS_SUCCESS + + def export_secret_key(self) -> bytes: + """ + Serialize the secret key to bytes. + + :return: The serialized secret key as bytes. + :raises ValueError: If the secret key is not initialized. + """ + if self._secret_key is None: + msg = "Secret key not initialised – call generate_keypair() first" + raise ValueError(msg) + buf_ptr = ct.POINTER(ct.c_uint8)() + buf_len = ct.c_size_t() + rc = native().OQS_SIG_STFL_SECRET_KEY_serialize( + ct.byref(buf_ptr), ct.byref(buf_len), self._secret_key + ) + if rc != OQS_SUCCESS: + msg = "Secret‑key serialization failed" + raise ValueError(msg) + data = ct.string_at(buf_ptr, buf_len.value) + ct.CDLL(ct.util.find_library("c")).free(buf_ptr) + return data + + def sigs_total(self) -> int: + """Get the total number of signatures that can be made with the secret key.""" + total = ct.c_uint64() + rc = native().OQS_SIG_STFL_sigs_total(self._sig, ct.byref(total), self._secret_key) + if rc != OQS_SUCCESS: + msg = "Failed to get total signature count" + raise RuntimeError(msg) + return total.value + + def sigs_remaining(self) -> int: + """Get the number of remaining signatures that can be made with the secret key.""" + if self._secret_key is None: + msg = "Secret key not initialised – call generate_keypair() first" + raise ValueError(msg) + remain = ct.c_uint64() + rc = native().OQS_SIG_STFL_sigs_remaining(self._sig, ct.byref(remain), self._secret_key) + if rc != OQS_SUCCESS: + msg = "Failed to get remaining signature count" + raise ValueError(msg) + return remain.value + + def export_used_keys(self) -> list[bytes]: + """Export the list of used keys.""" + return self._used_keys.copy() + + def __enter__(self) -> TStatefulSignature: + """Enter the context and return the StatefulSignature instance.""" + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Free the resources when exiting the context.""" + self.free() + + def free(self) -> None: + """Free the native resources.""" + if self._store_cb and self._secret_key: + native().OQS_SIG_STFL_SECRET_KEY_SET_store_cb(self._secret_key, None, None) + self._store_cb = None + if self._secret_key and self._owns_secret: + native().OQS_SIG_STFL_SECRET_KEY_free(self._secret_key) + + +native().OQS_SIG_STFL_new.restype = ct.POINTER(StatefulSignature) +native().OQS_SIG_STFL_SECRET_KEY_new.restype = ct.c_void_p +native().OQS_SIG_STFL_SECRET_KEY_new.argtypes = [ct.c_char_p] +native().OQS_SIG_STFL_SECRET_KEY_SET_store_cb.argtypes = [ct.c_void_p, ct.c_void_p, ct.c_void_p] +native().OQS_SIG_STFL_keypair.argtypes = [ct.POINTER(StatefulSignature), ct.c_void_p, ct.c_void_p] +native().OQS_SIG_STFL_sign.argtypes = [ + ct.POINTER(StatefulSignature), + ct.c_void_p, + ct.POINTER(ct.c_size_t), + ct.c_void_p, + ct.c_size_t, + ct.c_void_p, +] +native().OQS_SIG_STFL_verify.argtypes = [ + ct.POINTER(StatefulSignature), + ct.c_void_p, + ct.c_size_t, + ct.c_void_p, + ct.c_size_t, + ct.c_void_p, +] +native().OQS_SIG_STFL_sigs_remaining.argtypes = [ + ct.POINTER(StatefulSignature), + ct.POINTER(ct.c_uint64), + ct.c_void_p, +] +native().OQS_SIG_STFL_sigs_total.argtypes = [ + ct.POINTER(StatefulSignature), + ct.POINTER(ct.c_uint64), + ct.c_void_p, +] diff --git a/pyproject.toml b/pyproject.toml index cdeda47..86e31a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,14 +1,16 @@ [project] name = "liboqs-python" requires-python = ">=3.9" -version = "0.12.0" +version = "0.14.0" description = "Python bindings for liboqs, providing post-quantum public key cryptography algorithms" authors = [ { name = "Open Quantum Safe project", email = "contact@openquantumsafe.org" }, ] readme = "README.md" license = { file = "LICENSE.txt" } -dependencies = [] +dependencies = [ + "tomli>=2; python_version < '3.11'", +] [tool.uv] package = true diff --git a/tests/test_kem.py b/tests/test_kem.py index fb0e482..a9b9c58 100644 --- a/tests/test_kem.py +++ b/tests/test_kem.py @@ -1,3 +1,4 @@ +import os import platform # to learn the OS we're on import random @@ -10,6 +11,28 @@ disabled_KEM_patterns = [""] # noqa: N816 +def test_seed_generation() -> tuple[None, str]: + for alg_name in oqs.get_enabled_kem_mechanisms(): + if any(item in alg_name for item in disabled_KEM_patterns): + continue + + if oqs.KeyEncapsulation(alg_name).length_keypair_seed == 0: + # Skip KEMs that do not support seed generation + continue + + yield check_seed_generation, alg_name + + +def check_seed_generation(alg_name: str) -> None: + with oqs.KeyEncapsulation(alg_name) as kem: + length = kem.length_keypair_seed + seed = os.urandom(length) # Ensure the seed can be generated + public_key = kem.generate_keypair_seed(seed) + ciphertext, shared_secret_server = kem.encap_secret(public_key) + shared_secret_client = kem.decap_secret(ciphertext) + assert shared_secret_client == shared_secret_server # noqa: S101 + + def test_correctness() -> tuple[None, str]: for alg_name in oqs.get_enabled_kem_mechanisms(): if any(item in alg_name for item in disabled_KEM_patterns): @@ -102,6 +125,10 @@ def test_python_attributes() -> None: if kem.length_shared_secret == 0: msg = "Incorrect oqs.KeyEncapsulation.length_shared_secret" raise AssertionError(msg) + # Just to check that the property exists. + if kem.length_keypair_seed is None: + msg = "Undefined oqs.KeyEncapsulation.length_keypair_seed" + raise AssertionError(msg) if __name__ == "__main__": diff --git a/tests/test_stfl_sig.py b/tests/test_stfl_sig.py new file mode 100644 index 0000000..c2459ed --- /dev/null +++ b/tests/test_stfl_sig.py @@ -0,0 +1,142 @@ +import logging +import platform # to learn the OS we're on +import random + +import oqs + +_skip_names = ["LMS_SHA256_H20_W8_H10_W8", "LMS_SHA256_H20_W8_H15_W8", "LMS_SHA256_H20_W8_H20_W8"] + + +# Sigs for which unit testing is disabled +disabled_sig_patterns = [] + +if platform.system() == "Windows": + disabled_sig_patterns = [""] + + +def test_correctness() -> tuple[None, str]: + for alg_name in oqs.get_enabled_stateful_sig_mechanisms(): + if any(item in alg_name for item in disabled_sig_patterns): + continue + yield check_correctness, alg_name + + +def check_correctness(alg_name: str) -> None: + with oqs.StatefulSignature(alg_name) as sig: + message = bytes(random.getrandbits(8) for _ in range(100)) + public_key = sig.generate_keypair() + signature = sig.sign(message) + assert sig.verify(message, signature, public_key) # noqa: S101 + + +def test_wrong_message() -> tuple[None, str]: + for alg_name in oqs.get_enabled_stateful_sig_mechanisms(): + if any(item in alg_name for item in disabled_sig_patterns): + continue + yield check_wrong_message, alg_name + + +def check_wrong_message(alg_name: str) -> None: + with oqs.StatefulSignature(alg_name) as sig: + message = bytes(random.getrandbits(8) for _ in range(100)) + public_key = sig.generate_keypair() + signature = sig.sign(message) + wrong_message = bytes(random.getrandbits(8) for _ in range(len(message))) + assert not (sig.verify(wrong_message, signature, public_key)) # noqa: S101 + + +def test_wrong_signature() -> tuple[None, str]: + for alg_name in oqs.get_enabled_stateful_sig_mechanisms(): + if any(item in alg_name for item in disabled_sig_patterns): + continue + yield check_wrong_signature, alg_name + + +def check_wrong_signature(alg_name: str) -> None: + with oqs.StatefulSignature(alg_name) as sig: + message = bytes(random.getrandbits(8) for _ in range(100)) + public_key = sig.generate_keypair() + signature = sig.sign(message) + wrong_signature = bytes(random.getrandbits(8) for _ in range(len(signature))) + assert not (sig.verify(message, wrong_signature, public_key)) # noqa: S101 + + +def test_wrong_public_key() -> tuple[None, str]: + for alg_name in oqs.get_enabled_stateful_sig_mechanisms(): + if any(item in alg_name for item in disabled_sig_patterns): + continue + yield check_wrong_public_key, alg_name + + +def check_wrong_public_key(alg_name: str) -> None: + with oqs.StatefulSignature(alg_name) as sig: + message = bytes(random.getrandbits(8) for _ in range(100)) + public_key = sig.generate_keypair() + signature = sig.sign(message) + wrong_public_key = bytes(random.getrandbits(8) for _ in range(len(public_key))) + assert not (sig.verify(message, signature, wrong_public_key)) # noqa: S101 + + +def test_not_supported() -> None: + try: + with oqs.StatefulSignature("unsupported_sig"): + pass + except oqs.MechanismNotSupportedError: + pass + except Exception as ex: + msg = f"An unexpected exception was raised: {ex}" + raise AssertionError(msg) from ex + else: + msg = "oqs.MechanismNotSupportedError was not raised." + raise AssertionError(msg) + + +def test_not_enabled() -> None: + for alg_name in oqs.get_supported_stateful_sig_mechanisms(): + if alg_name not in oqs.get_enabled_stateful_sig_mechanisms(): + # Found a non-enabled but supported alg + try: + with oqs.StatefulSignature(alg_name): + pass + except oqs.MechanismNotEnabledError: + pass + except Exception as ex: + msg = f"An unexpected exception was raised: {ex}" + raise AssertionError(msg) from ex + else: + msg = "oqs.MechanismNotEnabledError was not raised." + raise AssertionError(msg) + + +def test_python_attributes() -> None: + for alg_name in oqs.get_enabled_stateful_sig_mechanisms(): + if alg_name in _skip_names: + logging.info("Skipping %s as it is in the skip list.", alg_name) + continue + + with oqs.StatefulSignature(alg_name) as sig: + if sig.method_name.decode() != alg_name: + msg = "Incorrect oqs.StatefulSignature.method_name" + raise AssertionError(msg) + if sig.alg_version is None: + msg = "Undefined oqs.StatefulSignature.alg_version" + raise AssertionError(msg) + if sig.length_public_key == 0: + msg = "Incorrect oqs.StatefulSignature.length_public_key" + raise AssertionError(msg) + if sig.length_secret_key == 0: + msg = "Incorrect oqs.StatefulSignature.length_secret_key" + raise AssertionError(msg) + if sig.length_signature == 0: + msg = "Incorrect oqs.StatefulSignature.length_signature" + raise AssertionError(msg) + + +if __name__ == "__main__": + try: + import nose2 + + nose2.main() + except ImportError: + msg_ = "nose2 module not found. Please install it with 'pip install nose2'." + raise RuntimeError(msg_) from None