diff --git a/refresh_versions.toml b/refresh_versions.toml index 25bb7b2..c25bcbc 100644 --- a/refresh_versions.toml +++ b/refresh_versions.toml @@ -1,11 +1,11 @@ charm_major = 1 -workload = "16.13" +workload = "16.14" [snap] name = "charmed-postgresql" [snap.revisions] # amd64 -x86_64 = "332" +x86_64 = "360" # arm64 -aarch64 = "331" +aarch64 = "359" diff --git a/spread.yaml b/spread.yaml index fa77fac..1a10d2d 100644 --- a/spread.yaml +++ b/spread.yaml @@ -43,7 +43,7 @@ backends: CONCIERGE_EXTRA_SNAPS: charmcraft CONCIERGE_EXTRA_DEBS: pipx systems: - - ubuntu-22.04: + - ubuntu-24.04: username: runner prepare: | systemctl disable --now unattended-upgrades.service @@ -89,15 +89,8 @@ backends: # Manually pass specific environment variables environment: CI: '$(HOST: echo $CI)' - AWS_ACCESS_KEY: '$(HOST: echo $AWS_ACCESS_KEY)' - AWS_SECRET_KEY: '$(HOST: echo $AWS_SECRET_KEY)' - GCP_ACCESS_KEY: '$(HOST: echo $GCP_ACCESS_KEY)' - GCP_SECRET_KEY: '$(HOST: echo $GCP_SECRET_KEY)' - UBUNTU_PRO_TOKEN: '$(HOST: echo $UBUNTU_PRO_TOKEN)' - LANDSCAPE_ACCOUNT_NAME: '$(HOST: echo $LANDSCAPE_ACCOUNT_NAME)' - LANDSCAPE_REGISTRATION_KEY: '$(HOST: echo $LANDSCAPE_REGISTRATION_KEY)' systems: - - ubuntu-22.04: + - ubuntu-24.04: username: runner - ubuntu-24.04-arm: username: runner diff --git a/src/charm.py b/src/charm.py index b94bef8..8312d1f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -17,15 +17,19 @@ from charmlibs import snap from charms.data_platform_libs.v1.data_models import TypedCharmBase from ops import ( + ActiveStatus, BlockedStatus, JujuVersion, + MaintenanceStatus, Relation, SecretRemoveEvent, main, ) -from single_kernel_postgresql.config.literals import PEER +from single_kernel_postgresql.config.literals import PEER, Substrates +from single_kernel_postgresql.utils import _change_owner from config import CharmConfig +from constants import SNAP_COMMON_PATH from raft_controller import install_service from relations.watcher_requirer import WatcherRequirerHandler @@ -81,7 +85,13 @@ def is_compatible( def refresh_snap( self, *, snap_name: str, snap_revision: str, refresh: charm_refresh.Machines ) -> None: - pass + self._charm.set_unit_status(MaintenanceStatus("refreshing the snap"), refresh=refresh) + self._charm.watcher_requirer.stop_services() + # Compatibility with storage refactoring + _change_owner(Substrates.VM, SNAP_COMMON_PATH) + self._charm._install_snap_package(revision=snap_revision, refresh=refresh) + + self._charm._post_snap_refresh(refresh) class PostgresqlWatcherCharm(TypedCharmBase[CharmConfig]): @@ -102,6 +112,8 @@ def __init__(self, *args): # Set tracing_endpoint for @trace_charm decorator compatibility self.tracing_endpoint = None + self.framework.observe(self.on.collect_unit_status, self._reconcile_refresh_status) + self.refresh: charm_refresh.Machines | None try: self.refresh = charm_refresh.Machines( @@ -125,6 +137,8 @@ def _post_snap_refresh(self, refresh: charm_refresh.Machines): Called after snap refresh """ install_service() + self.watcher_requirer.start_services() + self.set_unit_status(ActiveStatus(), refresh=refresh) refresh.next_unit_allowed_to_refresh = True def set_unit_status( @@ -147,9 +161,20 @@ def set_unit_status( return self.unit.status = status + def set_app_status(self) -> None: + """Set the app status.""" + if self.refresh is not None and self.refresh.app_status_higher_priority: + self.app.status = self.refresh.app_status_higher_priority + return + if self._peers is None: + return + self.app.status = ActiveStatus() + def _reconcile_refresh_status(self, _=None): # Workaround for other unit statuses being set in a stateful way (i.e. unable to recompute # status on every event) + if self.unit.is_leader(): + self.set_app_status() path = pathlib.Path(".last_refresh_unit_status.json") try: last_refresh_unit_status = json.loads(path.read_text()) diff --git a/src/constants.py b/src/constants.py index ec8b813..6ab1efd 100644 --- a/src/constants.py +++ b/src/constants.py @@ -7,6 +7,9 @@ PATRONI_CLUSTER_STATUS_ENDPOINT = "cluster" PEER = "database-peers" +# Snap constants. +SNAP_COMMON_PATH = "/var/snap/charmed-postgresql/common" + # # Watcher constants WATCHER_RELATION = "watcher" diff --git a/src/raft_controller.py b/src/raft_controller.py index 2048ed8..1576af3 100644 --- a/src/raft_controller.py +++ b/src/raft_controller.py @@ -184,6 +184,8 @@ def configure( render_file(Substrates.VM, self.ca_file, cas, 0o600) logger.info(f"Raft controller configured: self={self_addr}, partners={partner_addrs}") + self.restart() + self.check_watcher_connection(f"{self_addr}:{self_port}", password, partner_addrs) return True def check_watcher_connection( diff --git a/src/relations/watcher_requirer.py b/src/relations/watcher_requirer.py index 104a55b..71307b9 100644 --- a/src/relations/watcher_requirer.py +++ b/src/relations/watcher_requirer.py @@ -38,8 +38,10 @@ UpdateStatusEvent, WaitingStatus, ) +from single_kernel_postgresql.config.literals import Substrates +from single_kernel_postgresql.utils import _change_owner -from constants import RAFT_PORT, WATCHER_RELATION +from constants import RAFT_PORT, SNAP_COMMON_PATH, WATCHER_RELATION from raft_controller import RaftController, install_service if typing.TYPE_CHECKING: @@ -262,6 +264,7 @@ def _on_install(self, event: InstallEvent) -> None: # Install the charmed PostgreSQL snap. self.charm._install_snap_package(revision=None) + _change_owner(Substrates.VM, SNAP_COMMON_PATH) install_service() def _on_start(self, event: StartEvent) -> None: @@ -279,6 +282,32 @@ def _on_config_changed(self, _) -> None: def _on_leader_elected(self, _) -> None: self._update_unit_address_if_changed() + def stop_services(self) -> None: + """Stop all services.""" + for relation in self.model.relations.get(WATCHER_RELATION, []): + raft_controller = RaftController(self.charm, f"rel{relation.id}") + raft_controller.remove_service() + + def start_services(self) -> None: + """Start all services.""" + for relation in self.model.relations.get(WATCHER_RELATION, []): + if ( + not self._is_disabled(relation) + and (raft_password := self._get_raft_password(relation)) + and (partner_addrs := self._get_raft_partner_addrs(relation)) + and self._should_watcher_vote(partner_addrs) + ): + port = self._get_port_for_relation(relation.id) + + raft_controller = RaftController(self.charm, f"rel{relation.id}") + raft_controller.configure( + port, + self.unit_ip, + partner_addrs, + raft_password, + self._get_patroni_cas(relation), + ) + def _update_unit_address_if_changed(self) -> None: """Update unit-address in relation data if IP has changed, for ALL relations.""" if not (new_address := self.unit_ip) or not self.charm.unit.is_leader(): @@ -307,27 +336,21 @@ def _update_unit_address_if_changed(self) -> None: if ( address_changed + and not self._is_disabled(relation) and (raft_password := self._get_raft_password(relation)) and (partner_addrs := self._get_raft_partner_addrs(relation)) + and self._should_watcher_vote(partner_addrs) ): port = self._get_port_for_relation(relation.id) watcher_addr = f"{new_address}:{port}" raft_controller = RaftController(self.charm, f"rel{relation.id}") - changed = raft_controller.configure( + raft_controller.configure( port, new_address, partner_addrs, raft_password, self._get_patroni_cas(relation), ) - if changed and service_running(raft_controller.service_name): - logger.info( - f"Restarting Raft controller for relation {relation.id} due to IP change" - ) - raft_controller.restart() - raft_controller.check_watcher_connection( - watcher_addr, raft_password, partner_addrs - ) raft_controller.cleanup_raft_cluster(watcher_addr, raft_password, partner_addrs) self.charm.app_peer_data["unit-address"] = new_address @@ -488,17 +511,9 @@ def _on_watcher_relation_changed( relation.data[self.charm.app]["raft-status"] = "disabled" return - if raft_controller.configure( + raft_controller.configure( port, unit_ip, partner_addrs, raft_password, self._get_patroni_cas(relation) - ): - logger.info( - f"Restarting Raft controller for relation {relation.id} to apply config changes" - ) - raft_controller.restart() - raft_controller.check_watcher_connection( - watcher_addr, raft_password, partner_addrs - ) - + ) relation.data[self.charm.unit]["unit-address"] = unit_ip relation.data[self.charm.app]["watcher-raft-port"] = str(port) if unit_az := os.environ.get("JUJU_AVAILABILITY_ZONE"): diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 580a011..0b2e553 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -2,6 +2,7 @@ # See LICENSE file for licensing details. import logging +import jubilant import pytest from . import architecture @@ -15,3 +16,20 @@ def charm(): # juju bundle files expect local charms to begin with `./` or `/` to distinguish them from # Charmhub charms. return f"./postgresql-watcher_ubuntu@24.04-{architecture.architecture}.charm" + + +@pytest.fixture(scope="module") +def juju(request: pytest.FixtureRequest): + """Pytest fixture that wraps :meth:`jubilant.with_model`. + + This adds command line parameter ``--keep-models`` (see help for details). + """ + model = request.config.getoption("--model") + keep_models = bool(request.config.getoption("--keep-models")) + + if model: + juju = jubilant.Juju(model=model) + yield juju + else: + with jubilant.temp_model(keep=keep_models) as juju: + yield juju diff --git a/tests/integration/ha_tests/test_stereo_mode.py b/tests/integration/ha_tests/test_stereo_mode.py index f24f986..c0aee5b 100644 --- a/tests/integration/ha_tests/test_stereo_mode.py +++ b/tests/integration/ha_tests/test_stereo_mode.py @@ -18,13 +18,14 @@ import logging import pytest -from constants import RAFT_PARTNER_PREFIX from pysyncobj.utility import TcpUtility from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_fixed from yaml import safe_load -from ..helpers import APPLICATION_NAME, DATABASE_APP_NAME +from constants import RAFT_PARTNER_PREFIX + +from ..helpers import APPLICATION_NAME, DATABASE_APP_NAME, get_machine_from_unit, stop_machine from .helpers import APPLICATION_NAME as TEST_APP_NAME from .helpers import ( are_writes_increasing, @@ -38,6 +39,7 @@ ) WATCHER_APP_NAME = "postgresql-watcher" +SECOND_PG_APP_NAME = "postgresql-b" async def start_writes(ops_test: OpsTest) -> None: @@ -190,11 +192,14 @@ async def test_build_and_deploy_stereo_mode(ops_test: OpsTest, charm) -> None: channel="edge", ) - # Wait for initial deployment - await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], - timeout=1200, - raise_on_error=False, # Watcher may be waiting for relation + logger.info("Deploying second PostgreSQL cluster for multi-cluster watcher test") + await ops_test.model.deploy( + DATABASE_APP_NAME, + application_name=SECOND_PG_APP_NAME, + num_units=2, + series="noble", + channel="16/edge", + config={"profile": "testing", "synchronous-mode-strict": False}, ) # Relate PostgreSQL (watcher-offer) to watcher (watcher) @@ -210,13 +215,6 @@ async def test_build_and_deploy_stereo_mode(ops_test: OpsTest, charm) -> None: else: raise - # Wait for watcher to join Raft cluster - await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME, WATCHER_APP_NAME], - status="active", - timeout=600, - ) - # Relate PostgreSQL to test app try: await ops_test.model.integrate(DATABASE_APP_NAME, f"{APPLICATION_NAME}:database") @@ -260,6 +258,7 @@ async def test_replica_shutdown_with_watcher(ops_test: OpsTest, continuous_write logger.info(f"Shutting down replica: {replica}") # Shutdown the replica + await stop_machine(ops_test, await get_machine_from_unit(ops_test, replica)) await ops_test.model.destroy_unit(replica, force=True, destroy_storage=False, max_wait=1500) # Wait for the cluster to stabilize after unit removal @@ -339,6 +338,7 @@ async def test_primary_shutdown_with_watcher(ops_test: OpsTest, continuous_write logger.info(f"Shutting down primary: {original_primary}") # Shutdown the primary + await stop_machine(ops_test, await get_machine_from_unit(ops_test, original_primary)) await ops_test.model.destroy_unit( original_primary, force=True, destroy_storage=False, max_wait=1500 ) @@ -433,6 +433,7 @@ async def test_watcher_shutdown_no_outage(ops_test: OpsTest, continuous_writes) # Remove the watcher watcher_unit = ops_test.model.applications[WATCHER_APP_NAME].units[0] + await stop_machine(ops_test, await get_machine_from_unit(ops_test, watcher_unit.name)) await ops_test.model.destroy_unit(watcher_unit.name, force=True, max_wait=300) # Verify writes continue without interruption @@ -519,12 +520,13 @@ async def test_primary_network_isolation_with_watcher( # Wait for cluster to stabilize with restored network # The old primary may take time to rejoin after getting a new IP address, # so we use raise_on_error=False and wait longer - await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], - timeout=900, - idle_period=30, - raise_on_error=False, # Old primary may be in error while rejoining - ) + async with ops_test.fast_forward(fast_interval="60s"): + await ops_test.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + timeout=900, + idle_period=30, + raise_on_error=False, # Old primary may be in error while rejoining + ) # Wait for the old primary to rejoin as replica # This can take a while as it needs to recover with a new IP @@ -674,29 +676,14 @@ async def test_multi_cluster_watcher(ops_test: OpsTest, charm) -> None: to multiple PostgreSQL clusters simultaneously. Each relation gets its own Raft instance with a dedicated port and data directory. """ - second_pg_app = "postgresql-b" - try: # Deploy a second PostgreSQL cluster - logger.info("Deploying second PostgreSQL cluster for multi-cluster watcher test") - await ops_test.model.deploy( - DATABASE_APP_NAME, - application_name=second_pg_app, - num_units=2, - series="noble", - channel="16/edge", - config={"profile": "testing", "synchronous-mode-strict": False}, - ) - await ops_test.model.wait_for_idle( - apps=[second_pg_app], - status="active", - timeout=1200, - ) + await ops_test.model.wait_for_idle(apps=[SECOND_PG_APP_NAME], status="active", timeout=330) # Relate the watcher to the second cluster logger.info("Relating watcher to second PostgreSQL cluster") await ops_test.model.integrate( - f"{second_pg_app}:watcher-offer", f"{WATCHER_APP_NAME}:watcher" + f"{SECOND_PG_APP_NAME}:watcher-offer", f"{WATCHER_APP_NAME}:watcher" ) # Use fast_forward to trigger update_status quickly, which runs @@ -704,7 +691,7 @@ async def test_multi_cluster_watcher(ops_test: OpsTest, charm) -> None: async with ops_test.fast_forward(): # Wait for the watcher to connect to both clusters await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME, second_pg_app, WATCHER_APP_NAME], + apps=[DATABASE_APP_NAME, SECOND_PG_APP_NAME, WATCHER_APP_NAME], status="active", timeout=600, ) @@ -716,14 +703,14 @@ async def test_multi_cluster_watcher(ops_test: OpsTest, charm) -> None: ) # Check second cluster await verify_raft_cluster_health( - ops_test, second_pg_app, WATCHER_APP_NAME, expected_members=3 + ops_test, SECOND_PG_APP_NAME, WATCHER_APP_NAME, expected_members=3 ) finally: # Clean up the second cluster relation and app - if second_pg_app in ops_test.model.applications: + if SECOND_PG_APP_NAME in ops_test.model.applications: await ops_test.model.remove_application( - second_pg_app, block_until_done=True, force=True + SECOND_PG_APP_NAME, block_until_done=True, force=True ) # Verify original watcher is still healthy after removing the second cluster diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 8a1266d..7a4c0db 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -4,12 +4,12 @@ import itertools import json import logging +import subprocess from pathlib import Path import psycopg2 import requests import yaml -from constants import PEER from juju.model import Model from pytest_operator.plugin import OpsTest from tenacity import ( @@ -18,6 +18,8 @@ wait_exponential, ) +from constants import PEER + CHARM_BASE = "ubuntu@22.04" METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) DATABASE_APP_NAME = "postgresql" @@ -194,3 +196,45 @@ async def run_command_on_unit(ops_test: OpsTest, unit_name: str, command: str) - f"Expected command '{command}' to succeed instead it failed: {return_code}" ) return stdout + + +async def stop_machine(ops_test: OpsTest, machine_name: str) -> None: + """Stop the machine where a unit run on. + + Args: + ops_test: The ops test framework instance + machine_name: The name of the machine to stop + """ + stop_machine_command = f"lxc stop {machine_name}" + subprocess.check_call(stop_machine_command.split()) + + +### Ported Mysql jubilant helpers + + +def execute_queries_on_unit( + unit_address: str, username: str, password: str, queries: list[str], database: str +) -> list: + """Execute given PostgreSQL queries on a unit. + + Args: + unit_address: The public IP address of the unit to execute the queries on + username: The PostgreSQL username + password: The PostgreSQL password + queries: A list of queries to execute + database: Database to execute in + + Returns: + A list of rows that were potentially queried + """ + with ( + psycopg2.connect( + f"dbname='{database}' user='{username}' host='{unit_address}' password='{password}' connect_timeout=10" + ) as connection, + connection.cursor() as cursor, + ): + for query in queries: + cursor.execute(query) + output = list(itertools.chain(*cursor.fetchall())) + + return output diff --git a/tests/integration/high_availability/__init__.py b/tests/integration/high_availability/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/high_availability/conftest.py b/tests/integration/high_availability/conftest.py new file mode 100644 index 0000000..7d02b89 --- /dev/null +++ b/tests/integration/high_availability/conftest.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging + +import pytest +from tenacity import Retrying, stop_after_attempt + +from .high_availability_helpers_new import get_app_leader + +logger = logging.getLogger(__name__) + +DB_TEST_APP_NAME = "postgresql-test-app" + + +@pytest.fixture() +def continuous_writes(juju): + """Starts continuous writes to the MySQL cluster for a test and clear the writes at the end.""" + application_unit = get_app_leader(juju, DB_TEST_APP_NAME) + + logger.info("Clearing continuous writes") + juju.run(unit=application_unit, action="clear-continuous-writes", wait=120).raise_on_failure() + + logger.info("Starting continuous writes") + + for attempt in Retrying(stop=stop_after_attempt(10), reraise=True): + with attempt: + result = juju.run(unit=application_unit, action="start-continuous-writes") + result.raise_on_failure() + + assert result.results["result"] == "True" + + yield + + logger.info("Clearing continuous writes") + juju.run(unit=application_unit, action="clear-continuous-writes", wait=120).raise_on_failure() diff --git a/tests/integration/high_availability/high_availability_helpers_new.py b/tests/integration/high_availability/high_availability_helpers_new.py new file mode 100644 index 0000000..34b29d8 --- /dev/null +++ b/tests/integration/high_availability/high_availability_helpers_new.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. +import logging +from collections.abc import Callable + +import jubilant +from jubilant import Juju +from jubilant.statustypes import Status, UnitStatus +from pysyncobj.utility import TcpUtility +from tenacity import Retrying, stop_after_delay, wait_fixed +from yaml import safe_load + +from constants import PEER, RAFT_PARTNER_PREFIX + +from ..helpers import execute_queries_on_unit + +MINUTE_SECS = 60 +SERVER_CONFIG_USERNAME = "operator" + +JujuModelStatusFn = Callable[[Status], bool] +JujuAppsStatusFn = Callable[[Status, str], bool] + + +def check_db_units_writes_increment( + juju: Juju, + app_name: str, + app_units: list[str] | None = None, + db_name: str = "postgresql_test_app_database", +) -> None: + """Ensure that continuous writes is incrementing on all units. + + Also, ensure that all continuous writes up to the max written value is available + on all units (ensure that no committed data is lost). + """ + if not app_units: + app_units = get_app_units(juju, app_name) + + app_primary = get_db_primary_unit(juju, app_name) + app_max_value = get_db_max_written_value(juju, app_name, app_primary, db_name) + + for unit_name in app_units: + for attempt in Retrying( + reraise=True, + stop=stop_after_delay(5 * MINUTE_SECS), + wait=wait_fixed(10), + ): + with attempt: + unit_max_value = get_db_max_written_value(juju, app_name, unit_name, db_name) + assert unit_max_value > app_max_value, "Writes not incrementing" + app_max_value = unit_max_value + + +def get_app_leader(juju: Juju, app_name: str) -> str: + """Get the leader unit for the given application.""" + model_status = juju.status() + app_status = model_status.apps[app_name] + for name, status in app_status.units.items(): + if status.leader: + return name + + raise Exception("No leader unit found") + + +def get_app_units(juju: Juju, app_name: str) -> dict[str, UnitStatus]: + """Get the units for the given application.""" + model_status = juju.status() + app_status = model_status.apps[app_name] + return app_status.units + + +def get_unit_ip(juju: Juju, app_name: str, unit_name: str) -> str: + """Get the application unit IP.""" + model_status = juju.status() + app_status = model_status.apps[app_name] + for name, status in app_status.units.items(): + if name == unit_name: + return status.public_address + + raise Exception("No application unit found") + + +def get_db_primary_unit(juju: Juju, app_name: str) -> str: + """Get the current primary node of the cluster.""" + postgresql_primary = get_app_leader(juju, app_name) + task = juju.run(unit=postgresql_primary, action="get-primary", wait=5 * MINUTE_SECS) + task.raise_on_failure() + + primary = task.results.get("primary") + if primary != "None": + return primary + + raise Exception("No primary node found") + + +def get_db_max_written_value( + juju: Juju, app_name: str, unit_name: str, db_name: str = "postgresql_test_app_database" +) -> int: + """Retrieve the max written value in the PostgreSQL database. + + Args: + juju: The Juju model. + app_name: The application name. + unit_name: The unit name. + db_name: The database to connect to. + """ + password = get_user_password(juju, app_name, SERVER_CONFIG_USERNAME) + + output = execute_queries_on_unit( + get_unit_ip(juju, app_name, unit_name), + SERVER_CONFIG_USERNAME, + password, + ["SELECT MAX(number) FROM continuous_writes;"], + db_name, + ) + return output[0] + + +def wait_for_apps_status(jubilant_status_func: JujuAppsStatusFn, *apps: str) -> JujuModelStatusFn: + """Waits for Juju agents to be idle, and for applications to reach a certain status. + + Args: + jubilant_status_func: The Juju apps status function to wait for. + apps: The applications to wait for. + + Returns: + Juju model status function. + """ + return lambda status: all(( + jubilant.all_agents_idle(status, *apps), + jubilant_status_func(status, *apps), + )) + + +# PG helpers + + +def get_user_password(juju: Juju, app_name: str, user: str) -> str | None: + """Get a system user's password.""" + for secret in juju.secrets(): + if secret.label == f"{PEER}.{app_name}.app": + revealed_secret = juju.show_secret(secret.uri, reveal=True) + return revealed_secret.content.get(f"{user}-password") + + +def verify_raft_cluster_health( + juju: Juju, + db_app_name: str, + watcher_app_name: str, + expected_members: int = 3, + check_watcher_ip: bool = True, +) -> None: + """Verify that the Raft cluster has the expected number of members and quorum.""" + logging.info(f"Verifying Raft cluster health with {expected_members} expected members") + + # Get watcher address for verification using juju exec to avoid cached IPs + model_status = juju.status() + watcher_unit = next(unit for unit in model_status.apps[watcher_app_name].units) + ip_task = juju.exec("unit-get private-address", unit=watcher_unit) + assert ip_task.return_code == 0, f"Failed to get watcher address from {watcher_unit}" + watcher_ip = ip_task.stdout.strip() + + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(5), reraise=True): + with attempt: + for unit in model_status.apps[db_app_name].units: + # Get the Raft password from Patroni config using juju exec directly + # We need to avoid shell interpretation issues with run_command_on_unit + complete_command = ( + "cat /var/snap/charmed-postgresql/current/etc/patroni/patroni.yaml" + ) + exec_task = juju.exec(complete_command, unit=unit) + assert exec_task.return_code == 0, f"Failed to read patroni.yaml on {unit}" + + conf = safe_load(exec_task.stdout) + password = conf.get("raft", {}).get("password") + self_addr = conf.get("raft", {}).get("self_addr") + assert password, f"Could not find Raft password in patroni.yaml on {unit}" + + # Check Raft status using the password + syncobj_util = TcpUtility(password=password, timeout=3) + status = syncobj_util.executeCommand(self_addr, ["status"]) + logging.info(f"Raft status on {unit}: {status}") + + # Verify quorum + assert status["has_quorum"] is True, f"Unit {unit} does not have Raft quorum" + + assert status["partner_nodes_count"] + 1 == expected_members + + # Verify watcher is in the cluster (if requested) + # After network isolation tests, the watcher may have been redeployed + # with a new IP that isn't yet updated in the Raft configuration + if check_watcher_ip: + assert watcher_ip in [ + key.split(":")[0].split(RAFT_PARTNER_PREFIX)[-1] + for key in status + if key.startswith(RAFT_PARTNER_PREFIX) + ], f"Watcher {watcher_ip} not found in Raft cluster on {unit}" + + logging.info("Raft cluster health verified successfully") diff --git a/tests/integration/high_availability/test_upgrade.py b/tests/integration/high_availability/test_upgrade.py new file mode 100644 index 0000000..d649468 --- /dev/null +++ b/tests/integration/high_availability/test_upgrade.py @@ -0,0 +1,181 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import platform +import shutil +import zipfile +from pathlib import Path + +import jubilant +import tomli +import tomli_w +from jubilant import Juju + +from .high_availability_helpers_new import ( + check_db_units_writes_increment, + get_app_leader, + get_app_units, + verify_raft_cluster_health, + wait_for_apps_status, +) + +DB_APP_NAME = "postgresql" +WATCHER_APP_NAME = "postgresql-watcher" +DB_TEST_APP_NAME = "postgresql-test-app" + +MINUTE_SECS = 60 + +logging.getLogger("jubilant.wait").setLevel(logging.WARNING) + + +def test_deploy_latest(juju: Juju) -> None: + """Simple test to ensure that the PostgreSQL and application charms get deployed.""" + logging.info("Deploying PostgreSQL cluster") + juju.deploy( + charm=DB_APP_NAME, + app=DB_APP_NAME, + base="ubuntu@24.04", + channel="16/edge", + config={"profile": "testing", "synchronous-mode-strict": False}, + num_units=2, + ) + juju.deploy( + charm=WATCHER_APP_NAME, + app=WATCHER_APP_NAME, + base="ubuntu@24.04", + channel="16/edge", + config={"profile": "testing"}, + num_units=1, + ) + juju.deploy( + charm=DB_TEST_APP_NAME, + app=DB_TEST_APP_NAME, + base="ubuntu@24.04", + channel="latest/edge", + num_units=1, + ) + + juju.integrate( + f"{DB_APP_NAME}:watcher-offer", + f"{WATCHER_APP_NAME}:watcher", + ) + juju.integrate( + f"{DB_APP_NAME}:database", + f"{DB_TEST_APP_NAME}:database", + ) + + logging.info("Wait for applications to become active") + juju.wait( + ready=wait_for_apps_status( + jubilant.all_active, DB_APP_NAME, DB_TEST_APP_NAME, WATCHER_APP_NAME + ), + timeout=20 * MINUTE_SECS, + ) + + +def test_pre_refresh_check(juju: Juju) -> None: + """Test that the pre-refresh-check action runs successfully.""" + watcher_leader = get_app_leader(juju, WATCHER_APP_NAME) + + logging.info("Run pre-refresh-check action") + juju.run(unit=watcher_leader, action="pre-refresh-check") + + juju.wait(jubilant.all_agents_idle, timeout=5 * MINUTE_SECS) + + +def test_upgrade_from_edge(juju: Juju, charm: str, continuous_writes) -> None: + """Update the second cluster.""" + logging.info("Ensure continuous writes are incrementing") + check_db_units_writes_increment(juju, DB_APP_NAME) + + logging.info("Refresh the charm") + juju.refresh(app=WATCHER_APP_NAME, path=charm) + logging.info("Wait for refresh to block as paused or incompatible") + try: + juju.wait(lambda status: status.apps[WATCHER_APP_NAME].is_blocked, timeout=5 * MINUTE_SECS) + + units = get_app_units(juju, WATCHER_APP_NAME) + unit_names = sorted(units.keys()) + + if "Refresh incompatible" in juju.status().apps[WATCHER_APP_NAME].app_status.message: + logging.info("Application refresh is blocked due to incompatibility") + juju.run( + unit=unit_names[-1], + action="force-refresh-start", + params={"check-compatibility": False}, + wait=5 * MINUTE_SECS, + ) + + juju.wait(jubilant.all_agents_idle, timeout=5 * MINUTE_SECS) + except TimeoutError: + logging.info("Upgrade completed without snap refresh (charm.py upgrade only)") + assert juju.status().apps[WATCHER_APP_NAME].is_active + + logging.info("Wait for upgrade to complete") + juju.wait( + ready=wait_for_apps_status(jubilant.all_active, WATCHER_APP_NAME), timeout=20 * MINUTE_SECS + ) + + logging.info("Ensure continuous writes are incrementing") + check_db_units_writes_increment(juju, DB_APP_NAME) + verify_raft_cluster_health(juju, DB_APP_NAME, WATCHER_APP_NAME) + + +def test_fail_and_rollback(juju: Juju, charm: str, continuous_writes) -> None: + """Test an upgrade failure and its rollback.""" + watcher_app_leader = get_app_leader(juju, WATCHER_APP_NAME) + + logging.info("Run pre-refresh-check action") + juju.run(unit=watcher_app_leader, action="pre-refresh-check") + + juju.wait(jubilant.all_agents_idle, timeout=5 * MINUTE_SECS) + + tmp_folder = Path("tmp") + tmp_folder.mkdir(exist_ok=True) + tmp_folder_charm = Path(tmp_folder, charm).absolute() + + shutil.copy(charm, tmp_folder_charm) + + logging.info("Inject dependency fault") + inject_dependency_fault(juju, WATCHER_APP_NAME, tmp_folder_charm) + + logging.info("Refresh the charm") + juju.refresh(app=WATCHER_APP_NAME, path=tmp_folder_charm) + + logging.info("Wait for upgrade to fail on leader") + juju.wait( + ready=wait_for_apps_status(jubilant.any_blocked, WATCHER_APP_NAME), + timeout=10 * MINUTE_SECS, + ) + + logging.info("Ensure continuous writes on all units") + check_db_units_writes_increment(juju, DB_APP_NAME) + + logging.info("Re-refresh the charm") + juju.refresh(app=WATCHER_APP_NAME, path=charm) + + logging.info("Wait for upgrade to complete") + juju.wait( + ready=wait_for_apps_status(jubilant.all_active, WATCHER_APP_NAME), timeout=20 * MINUTE_SECS + ) + + logging.info("Ensure continuous writes after rollback procedure") + check_db_units_writes_increment(juju, DB_APP_NAME) + verify_raft_cluster_health(juju, DB_APP_NAME, WATCHER_APP_NAME) + + # Remove fault charm file + tmp_folder_charm.unlink() + + +def inject_dependency_fault(juju: Juju, app_name: str, charm_file: str | Path) -> None: + """Inject a dependency fault into the PostgreSQL charm.""" + with Path("refresh_versions.toml").open("rb") as file: + versions = tomli.load(file) + + versions["charm"] = "16/0.0.0" + versions["snap"]["revisions"][platform.machine()] = "1" + + # Overwrite refresh_versions.toml with incompatible version. + with zipfile.ZipFile(charm_file, mode="a") as charm_zip: + charm_zip.writestr("refresh_versions.toml", tomli_w.dumps(versions)) diff --git a/tests/integration/pyproject.toml b/tests/integration/pyproject.toml deleted file mode 100644 index f2bc1ab..0000000 --- a/tests/integration/pyproject.toml +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright 2026 Canonical Ltd. -# See LICENSE file for licensing details. - -# Linting tools configuration -[tool.ruff] -# preview and explicit preview are enabled for CPY001 -preview = true -target-version = "py310" -src = ["."] -line-length = 99 - -[tool.ruff.lint] -explicit-preview-rules = true -select = [ - "A", - "E", - "W", - "F", - "C", - "N", - "D", - "I001", - "B", - "CPY001", - "RUF", - "S", - "SIM", - "UP", - "TC", -] -extend-ignore = [ - "D203", - "D204", - "D213", - "D215", - "D400", - "D404", - "D406", - "D407", - "D408", - "D409", - "D413", - "B904", -] -# Ignore E501 because using black creates errors with this -# Ignore D107 Missing docstring in __init__ -ignore = ["E501", "D107"] - -[tool.ruff.lint.per-file-ignores] -"*" = [ - "D100", - "D101", - "D102", - "D103", - "D104", - # Asserts - "B011", - # Disable security checks for tests - "S", -] - -[tool.ruff.lint.flake8-copyright] -# Check for properly formatted copyright header in each file -author = "Canonical Ltd." -notice-rgx = "Copyright\\s\\d{4}([-,]\\d{4})*\\s+" -min-file-size = 1 - -[tool.ruff.lint.mccabe] -max-complexity = 10 - -[tool.ruff.lint.pydocstyle] -convention = "google" diff --git a/tests/spread/test_upgrade.py/task.yaml b/tests/spread/test_upgrade.py/task.yaml new file mode 100644 index 0000000..f99ac69 --- /dev/null +++ b/tests/spread/test_upgrade.py/task.yaml @@ -0,0 +1,7 @@ +summary: test_upgrade.py +environment: + TEST_MODULE: high_availability/test_upgrade.py +execute: | + tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" +artifacts: + - allure-results diff --git a/tests/unit/test_raft_controller.py b/tests/unit/test_raft_controller.py index 543c7a2..718028f 100644 --- a/tests/unit/test_raft_controller.py +++ b/tests/unit/test_raft_controller.py @@ -38,6 +38,10 @@ def test_configure(tmp_path: Path, controller: RaftController): with ( patch("raft_controller.render_file") as _render_file, patch("raft_controller.create_directory") as _create_directory, + patch("raft_controller.RaftController.restart") as _restart, + patch( + "raft_controller.RaftController.check_watcher_connection" + ) as _check_watcher_connection, ): assert controller.configure(2222, "10.0.0.1", ["10.0.0.2"], "secret") @@ -52,6 +56,8 @@ def test_configure(tmp_path: Path, controller: RaftController): expected_content, 0o600, ) + _restart.assert_called_once_with() + _check_watcher_connection.assert_called_once_with("10.0.0.1:2222", "secret", ["10.0.0.2"]) def test_remove_service_disables_unit_and_deletes_dir(tmp_path: Path, controller: RaftController):