From 804629086a6cd4ecfd441cf2aa63040f4e8aa552 Mon Sep 17 00:00:00 2001 From: Gautier DI FOLCO Date: Tue, 28 Apr 2026 12:52:49 +0200 Subject: [PATCH 1/7] WPB-24076: Add meeting cleaner job in `background-worker` --- changelog.d/5-internal/WPB-24076 | 1 + .../background-worker/configmap.yaml | 4 + charts/wire-server/values.yaml | 114 ++++++++------- integration/test/Test/Meetings.hs | 96 +++++++++++++ .../wire-subsystems/src/Wire/MeetingsStore.hs | 8 ++ .../src/Wire/MeetingsStore/Postgres.hs | 57 ++++++++ .../src/Wire/MeetingsSubsystemCleaning.hs | 32 +++++ .../MeetingsSubsystemCleaning/Interpreter.hs | 73 ++++++++++ .../Wire/MockInterpreters/MeetingsStore.hs | 12 ++ libs/wire-subsystems/wire-subsystems.cabal | 2 + .../background-worker/background-worker.cabal | 3 + .../background-worker.integration.yaml | 6 + services/background-worker/default.nix | 4 + .../src/Wire/BackgroundWorker.hs | 6 + .../src/Wire/BackgroundWorker/Env.hs | 11 ++ .../src/Wire/BackgroundWorker/Options.hs | 27 ++++ .../src/Wire/MeetingsCleanupWorker.hs | 134 ++++++++++++++++++ .../Wire/BackendNotificationPusherSpec.hs | 4 +- .../background-worker/test/Test/Wire/Util.hs | 1 + services/galley/galley.integration.yaml | 2 +- 20 files changed, 543 insertions(+), 54 deletions(-) create mode 100644 changelog.d/5-internal/WPB-24076 create mode 100644 libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning.hs create mode 100644 libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs create mode 100644 services/background-worker/src/Wire/MeetingsCleanupWorker.hs diff --git a/changelog.d/5-internal/WPB-24076 b/changelog.d/5-internal/WPB-24076 new file mode 100644 index 00000000000..caf6d968821 --- /dev/null +++ b/changelog.d/5-internal/WPB-24076 @@ -0,0 +1 @@ +Add meeting cleaner job in `background-worker`. diff --git a/charts/wire-server/templates/background-worker/configmap.yaml b/charts/wire-server/templates/background-worker/configmap.yaml index 594bd5d1a92..8ff8fc6ad18 100644 --- a/charts/wire-server/templates/background-worker/configmap.yaml +++ b/charts/wire-server/templates/background-worker/configmap.yaml @@ -91,6 +91,10 @@ data: {{toYaml .backendNotificationPusher | indent 6 }} {{- with .backgroundJobs }} backgroundJobs: +{{ toYaml . | indent 6 }} + {{- end }} + {{- with .meetingsCleanup }} + meetingsCleanup: {{ toYaml . | indent 6 }} {{- end }} {{- if $.Values.galley.config.postgresMigration }} diff --git a/charts/wire-server/values.yaml b/charts/wire-server/values.yaml index fc50c17dfad..e84a199f1b5 100644 --- a/charts/wire-server/values.yaml +++ b/charts/wire-server/values.yaml @@ -7,7 +7,7 @@ tags: legalhold: false - federation: false + federation: false backoffice: false mlsstats: false integration: false @@ -145,7 +145,8 @@ galley: # To disable proteus for new federated conversations: # federationProtocols: ["mls"] - featureFlags: # see #RefConfigOptions in `/docs/reference` (https://github.com/wireapp/wire-server/) + featureFlags: + # see #RefConfigOptions in `/docs/reference` (https://github.com/wireapp/wire-server/) appLock: defaults: config: @@ -340,8 +341,7 @@ galley: seccompProfile: type: RuntimeDefault tests: - config: - {} + config: {} # uploadXml: # baseUrl: s3://bucket/path/ @@ -509,8 +509,8 @@ cannon: # name: # key: - # See also the section 'Controlling the speed of websocket draining during - # cannon pod replacement' in docs/how-to/install/configuration-options.rst + # See also the section 'Controlling the speed of websocket draining during + # cannon pod replacement' in docs/how-to/install/configuration-options.rst drainOpts: # The following drains a minimum of 400 connections/second # for a total of 10000 over 25 seconds @@ -719,21 +719,21 @@ gundeck: # name: # key: - # To enable additional writes during a migration: - # redisAdditionalWrite: - # host: redis-two - # port: 6379 - # connectionMode: master - # enableTls: false - # insecureSkipVerifyTls: false - # - # # To configure custom TLS CA, please provide one of these: - # # tlsCa: - # # - # # Or refer to an existing secret (containing the CA): - # # tlsCaSecretRef: - # # name: - # # key: + # To enable additional writes during a migration: + # redisAdditionalWrite: + # host: redis-two + # port: 6379 + # connectionMode: master + # enableTls: false + # insecureSkipVerifyTls: false + # + # # To configure custom TLS CA, please provide one of these: + # # tlsCa: + # # + # # Or refer to an existing secret (containing the CA): + # # tlsCaSecretRef: + # # name: + # # key: aws: region: "eu-west-1" proxy: {} @@ -759,7 +759,6 @@ gundeck: # will be deleted and thus not delivered. # The default is 28 days. notificationTTL: 2419200 - # To enable cells notifications # cellsEventQueue: cells_events @@ -835,7 +834,6 @@ spar: # Disable one ore more API versions. Please make sure the configuration value is the same in all these charts: # brig, cannon, cargohold, galley, gundeck, proxy, spar. disabledAPIVersions: [development] - # SAML - ServiceProvider configuration # Usually, one would configure one set of options for a single domain. For # multi-ingress setups (one backend is available through multiple domains), @@ -982,8 +980,22 @@ background-worker: # Total attempts, including the first try maxAttempts: 3 - secrets: - {} + # Meetings cleanup configuration + meetingsCleanup: + # Delete meetings older than this many hours (48 hours = 2 days) + cleanOlderThanHours: 48.0 + # Maximum number of meetings to delete per batch + batchSize: 1000 + # Cron schedule for the cleanup job (0 * * * * = every hour) + schedule: "0 * * * *" + + # Controls where conversation data is stored/accessed + postgresMigration: + conversation: cassandra + conversationCodes: cassandra + teamFeatures: cassandra + + secrets: {} podSecurityContext: allowPrivilegeEscalation: false @@ -1031,7 +1043,7 @@ brig: # tlsCaSecretRef: # name: # key: - + elasticsearch: scheme: http host: elasticsearch-client @@ -1062,7 +1074,7 @@ brig: sesEndpoint: https://email.eu-west-1.amazonaws.com sqsEndpoint: https://sqs.eu-west-1.amazonaws.com # dynamoDBEndpoint: https://dynamodb.eu-west-1.amazonaws.com - # -- If set to false, 'dynamoDBEndpoint' _must_ be set. + # -- If set to false, 'dynamoDBEndpoint' _must_ be set. randomPrekeys: true useSES: true multiSFT: @@ -1077,19 +1089,19 @@ brig: # tlsCaSecretRef: # name: # key: - - # Postgres connection settings - # - # Values are described in https://www.postgresql.org/docs/17/libpq-connect.html#LIBPQ-PARAMKEYWORDS - # To set the password via a brig secret see `secrets.pgPassword`. - # - # `additionalVolumeMounts` and `additionalVolumes` can be used to mount - # additional files (e.g. certificates) into the brig container. This way - # does not work for password files (parameter `passfile`), because - # libpq-connect requires access rights (mask 0600) for them that we cannot - # provide for random uids. - # - # Below is an example configuration we're using for our CI tests. + + # Postgres connection settings + # + # Values are described in https://www.postgresql.org/docs/17/libpq-connect.html#LIBPQ-PARAMKEYWORDS + # To set the password via a brig secret see `secrets.pgPassword`. + # + # `additionalVolumeMounts` and `additionalVolumes` can be used to mount + # additional files (e.g. certificates) into the brig container. This way + # does not work for password files (parameter `passfile`), because + # libpq-connect requires access rights (mask 0600) for them that we cannot + # provide for random uids. + # + # Below is an example configuration we're using for our CI tests. postgresql: host: postgresql # DNS name without protocol port: "5432" @@ -1100,7 +1112,7 @@ brig: acquisitionTimeout: 10s agingTimeout: 1d idlenessTimeout: 10m - + emailSMS: general: templateBranding: @@ -1195,25 +1207,25 @@ brig: maxRateLimitedKeys: 100000 # Estimated memory usage: 4 MB # setAuditLogEmailRecipient: security@wire.com setEphemeralUserCreationEnabled: true - + smtp: passwordFile: /etc/wire/brig/secrets/smtp-password.txt proxy: {} wireServerEnterprise: enabled: false - + turnStatic: v1: - turn:localhost:3478 v2: - turn:localhost:3478 - turn:localhost:3478?transport=tcp - + turn: serversSource: files # files | dns # baseDomain: turn.wire.example # Must be configured if serversSource is dns discoveryIntervalSeconds: 10 # Used only if serversSource is dns - + serviceAccount: # When setting this to 'false', either make sure that a service account named # 'brig' exists or change the 'name' field to 'default' @@ -1221,9 +1233,9 @@ brig: name: brig annotations: {} automountServiceAccountToken: true - + secrets: {} - + podSecurityContext: allowPrivilegeEscalation: false capabilities: @@ -1233,15 +1245,14 @@ brig: seccompProfile: type: RuntimeDefault tests: - config: - {} + config: {} # uploadXml: # baseUrl: s3://bucket/path/ - + secrets: # uploadXmlAwsAccessKeyId: # uploadXmlAwsSecretAccessKey: - + # These "secrets" are only used in tests and are therefore safe to be stored unencrypted providerPrivateKey: | -----BEGIN RSA PRIVATE KEY----- @@ -1303,7 +1314,6 @@ brig: hZMuK3BWD3fzkQVfW0yMwz6fWRXB483ZmekGkgndOTDoJQMdJXZxHpI3t2FcxQYj T45GXxRd18neXtuYa/OoAw9UQFDN5XfXN0g= -----END CERTIFICATE----- - # pgPassword: test: elasticsearch: diff --git a/integration/test/Test/Meetings.hs b/integration/test/Test/Meetings.hs index f3d47dbb698..86fe6f87867 100644 --- a/integration/test/Test/Meetings.hs +++ b/integration/test/Test/Meetings.hs @@ -4,10 +4,16 @@ module Test.Meetings where import API.Galley import qualified API.GalleyInternal as I +import Control.Monad.Reader (ask) +import qualified Data.Text as Text +import qualified Data.Text.Encoding as Text import Data.Time.Clock import qualified Data.Time.Format as Time import SetupHelpers +import System.Timeout (timeout) import Testlib.Prelude +import Text.Regex.TDFA ((=~)) +import UnliftIO.Concurrent (threadDelay) -- Helper to extract meetingId and domain from a meeting JSON object getMeetingIdAndDomain :: (HasCallStack) => Value -> App (String, String) @@ -359,3 +365,93 @@ testMeetingDeleteUnauthorized = do meeting <- getJSON 201 r1 (meetingId, domain) <- getMeetingIdAndDomain meeting deleteMeeting otherUser domain meetingId >>= assertStatus 404 + +testMeetingCleanup :: (HasCallStack) => App () +testMeetingCleanup = do + env <- ask + timedOutResult <- liftIO $ timeout (2 * 60 * 1_000_000) $ runAppWithEnv env $ do + -- 2 minutes timeout + (owner, _tid, _members) <- createTeam OwnDomain 1 + now <- liftIO getCurrentTime + -- Create a meeting that ends now. + -- Configured retention is 0.0014 hours (~5 seconds). + -- cutoffTime will be now' - 5s. + -- We need end_date < cutoffTime. + -- If we wait 6 seconds, now' = now + 6s. + -- cutoffTime = now + 6s - 5s = now + 1s. + -- end_date (now) < cutoffTime (now + 1s). + let startTime = addUTCTime (negate 3600) now + endTime = now + newMeeting = defaultMeetingJson "Cleanup Test" startTime endTime [] + + r1 <- postMeetings owner newMeeting + assertSuccess r1 + meeting <- getJSON 201 r1 + (meetingId, domain) <- getMeetingIdAndDomain meeting + + -- Wait 6 seconds to ensure meeting is old enough + liftIO $ threadDelay 6_000_000 + + -- Wait for cleanup job to run + waitForCleanupJob OwnDomain + + -- Check it's gone + getMeeting owner domain meetingId >>= assertStatus 404 + + case timedOutResult of + Just () -> pure () + Nothing -> assertFailure "testMeetingCleanup timed out after 2 minutes" + +waitForCleanupJob :: (HasCallStack, MakesValue domain) => domain -> App () +waitForCleanupJob domain = do + initialMetrics <- getMetricsBody domain + let initialCount = getRunCount initialMetrics + + waitForIncrease domain initialCount + where + getMetricsBody d = do + getMetrics d BackgroundWorker `bindResponse` \resp -> do + resp.status `shouldMatchInt` 200 + pure $ Text.unpack $ Text.decodeUtf8 resp.body + + getRunCount metrics = + let (_, _, _, matches) :: (String, String, String, [String]) = (metrics =~ "wire_meetings_cleanup_runs_total ([0-9]+)") + in case matches of + [val] -> read val :: Int + _ -> 0 + + waitForIncrease d oldVal = do + metrics <- getMetricsBody d + let newVal = getRunCount metrics + -- We wait until it increases. + -- Note: if oldVal was 0 (metric didn't exist), getting 0 again means it hasn't run. + -- If it runs, it should become >= 1. + -- But wait, if matches is empty, we return 0. + -- If the metric appears, it will be >= 1 (initialized at 0? Counter starts at 0). + -- If it runs, it increments. + when (newVal <= oldVal) $ do + liftIO $ threadDelay 1_000_000 -- Wait 1s + waitForIncrease d oldVal + +testMeetingExpiration :: (HasCallStack) => App () +testMeetingExpiration = do + (owner, _tid, _members) <- createTeam OwnDomain 1 + now <- liftIO getCurrentTime + let startTime = addUTCTime (negate 3600) now + -- meetingValidityPeriodSeconds is configured to 5 seconds in galley.integration.yaml + endTime = now + newMeeting = defaultMeetingJson "Expiring Meeting" startTime endTime [] + + r1 <- postMeetings owner newMeeting + assertSuccess r1 + meeting <- getJSON 201 r1 + (meetingId, domain) <- getMeetingIdAndDomain meeting + + -- Check it is accessible immediately (endDate = now, so valid until now + 5s) + getMeeting owner domain meetingId >>= assertStatus 200 + + -- Wait 6 seconds + liftIO $ threadDelay 6_000_000 + + -- Check it is expired + getMeeting owner domain meetingId >>= assertStatus 404 diff --git a/libs/wire-subsystems/src/Wire/MeetingsStore.hs b/libs/wire-subsystems/src/Wire/MeetingsStore.hs index e2e0c7b55d6..2819f809f87 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsStore.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsStore.hs @@ -167,5 +167,13 @@ data MeetingsStore m a where MeetingId -> [EmailAddress] -> MeetingsStore m () + -- Cleanup operations + GetOldMeetings :: + UTCTime -> + Int -> + MeetingsStore m [StoredMeeting] + DeleteMeetingBatch :: + [MeetingId] -> + MeetingsStore m Int64 makeSem ''MeetingsStore diff --git a/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs b/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs index 6e83d61ff5d..1102cbac0b4 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs @@ -64,6 +64,10 @@ interpretMeetingsStoreToPostgres = addInvitedEmailsImpl meetingId email RemoveInvitedEmails meetingId emails -> removeInvitedEmailsImpl meetingId emails + GetOldMeetings cutoffTime batchSize -> + getOldMeetingsImpl cutoffTime batchSize + DeleteMeetingBatch meetingIds -> + deleteMeetingBatchImpl meetingIds -- * Create @@ -363,3 +367,56 @@ removeInvitedEmailsImpl meetingId emails = do updated_at = NOW() WHERE id = ($2 :: uuid) |] + +getOldMeetingsImpl :: + ( Member (Input Pool) r, + Member (Embed IO) r, + Member (Error UsageError) r + ) => + UTCTime -> + Int -> + Sem r [StoredMeeting] +getOldMeetingsImpl cutoffTime batchSize = do + pool <- input + result <- liftIO $ use pool session + either throw pure result + where + session :: Session [StoredMeeting] + session = statement (cutoffTime, fromIntegral batchSize) $ V.toList <$> listStatement + listStatement :: Statement (UTCTime, Int32) (V.Vector StoredMeeting) + listStatement = + refineResult + (traverse (postgresUnmarshall @StoredMeetingTuple @StoredMeeting)) + $ [vectorStatement| + SELECT + id :: uuid, title :: text, creator :: uuid, + start_time :: timestamptz, end_time :: timestamptz, + recurrence_frequency :: text?, recurrence_interval :: int4?, recurrence_until :: timestamptz?, + conversation_id :: uuid, invited_emails :: text[], trial :: boolean, + created_at :: timestamptz, updated_at :: timestamptz + FROM meetings + WHERE end_time < ($1 :: timestamptz) + ORDER BY end_time ASC + LIMIT ($2 :: int4) + |] + +deleteMeetingBatchImpl :: + ( Member (Input Pool) r, + Member (Embed IO) r, + Member (Error UsageError) r + ) => + [MeetingId] -> + Sem r Int64 +deleteMeetingBatchImpl meetingIds = do + pool <- input + result <- liftIO $ use pool session + either throw pure result + where + session :: Session Int64 + session = statement (V.fromList (toUUID <$> meetingIds)) deleteStatement + deleteStatement :: Statement (V.Vector UUID) Int64 + deleteStatement = + [rowsAffectedStatement| + DELETE FROM meetings + WHERE id IN (SELECT unnest($1::uuid[])) + |] diff --git a/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning.hs b/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning.hs new file mode 100644 index 00000000000..13910851c97 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning.hs @@ -0,0 +1,32 @@ +{-# LANGUAGE TemplateHaskell #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.MeetingsSubsystemCleaning where + +import Data.Time.Clock (UTCTime) +import Imports +import Polysemy + +data MeetingsSubsystemCleaning m a where + CleanupOldMeetings :: + UTCTime -> + Int -> + MeetingsSubsystemCleaning m Int64 + +makeSem ''MeetingsSubsystemCleaning diff --git a/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs b/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs new file mode 100644 index 00000000000..f11d42b0728 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs @@ -0,0 +1,73 @@ +{-# LANGUAGE DuplicateRecordFields #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.MeetingsSubsystemCleaning.Interpreter where + +import Data.Time.Clock (UTCTime) +import Imports +import Polysemy +import Wire.API.Conversation (GroupConvType (MeetingConversation), cnvmGroupConvType) +import Wire.ConversationStore qualified as ConvStore +import Wire.MeetingsStore qualified as Store +import Wire.MeetingsSubsystemCleaning +import Wire.StoredConversation (StoredConversation (..)) + +interpretMeetingsSubsystemCleaning :: + ( Member Store.MeetingsStore r, + Member ConvStore.ConversationStore r + ) => + InterpreterFor MeetingsSubsystemCleaning r +interpretMeetingsSubsystemCleaning = interpret $ \case + CleanupOldMeetings cutoffTime batchSize -> + cleanupOldMeetingsImpl cutoffTime batchSize + +cleanupOldMeetingsImpl :: + ( Member Store.MeetingsStore r, + Member ConvStore.ConversationStore r + ) => + UTCTime -> + Int -> + Sem r Int64 +cleanupOldMeetingsImpl cutoffTime batchSize = do + -- 1. Fetch old meetings + oldMeetings <- Store.getOldMeetings cutoffTime batchSize + + if null oldMeetings + then pure 0 + else do + -- 2. Extract meeting IDs and conversation IDs + let meetingIds = map (\Store.StoredMeeting {id = mid} -> mid) oldMeetings + convIds = map (\Store.StoredMeeting {conversationId = cid} -> cid) oldMeetings + + -- 3. Delete meetings from database + deletedCount <- Store.deleteMeetingBatch meetingIds + + -- 4. Delete associated conversations if they are meeting conversations + -- We need to check if conversation has GroupConvType = MeetingConversation + for_ (zip oldMeetings convIds) $ \(meeting, convId) -> do + maybeConv <- ConvStore.getConversation convId + case maybeConv of + Just conv + | conv.metadata.cnvmGroupConvType == Just MeetingConversation, + conv.id_ == convId, + meeting.conversationId == convId -> + ConvStore.deleteConversation convId + _ -> pure () + + pure deletedCount diff --git a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs index 60906b23c67..d53d37e0677 100644 --- a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs +++ b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs @@ -103,3 +103,15 @@ inMemoryMeetingsStoreInterpreter = interpret $ \case } modify (Map.insert mid updatedMeeting) DeleteMeeting mid -> modify (Map.delete mid) + GetOldMeetings cutoffTime batchSize -> + gets $ + take batchSize + . List.sortOn (.endTime) + . filter (\sm -> sm.endTime < cutoffTime) + . Map.elems + DeleteMeetingBatch meetingIds -> do + let deleteOne mid = do + exists <- gets (Map.member mid) + when exists $ modify (Map.delete mid) + pure exists + fromIntegral . length <$> traverse deleteOne meetingIds diff --git a/libs/wire-subsystems/wire-subsystems.cabal b/libs/wire-subsystems/wire-subsystems.cabal index 0f4c739c004..abc73e6d7ad 100644 --- a/libs/wire-subsystems/wire-subsystems.cabal +++ b/libs/wire-subsystems/wire-subsystems.cabal @@ -373,6 +373,8 @@ library Wire.MeetingsStore.Postgres Wire.MeetingsSubsystem Wire.MeetingsSubsystem.Interpreter + Wire.MeetingsSubsystemCleaning + Wire.MeetingsSubsystemCleaning.Interpreter Wire.Migration Wire.MigrationLock Wire.NotificationSubsystem diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal index e6433d047c5..927e09d2bc4 100644 --- a/services/background-worker/background-worker.cabal +++ b/services/background-worker/background-worker.cabal @@ -21,6 +21,7 @@ library Wire.BackgroundWorker.Options Wire.BackgroundWorker.Util Wire.DeadUserNotificationWatcher + Wire.MeetingsCleanupWorker Wire.PostgresMigrations hs-source-dirs: src @@ -39,6 +40,7 @@ library , bytestring-conversion , cassandra-util , containers + , cron , data-timeout , exceptions , extended @@ -61,6 +63,7 @@ library , ssl-util , tagged , text + , time , tinylog , transformers , transformers-base diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index fa398766188..eab860a6e88 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -65,6 +65,12 @@ backgroundJobs: jobTimeout: 5s maxAttempts: 3 +# Meetings cleanup configuration for integration +meetingsCleanup: + cleanOlderThanHours: 0.0014 # Clean meetings older than ~5 seconds + batchSize: 100 + schedule: "* * * * *" # Run every minute + postgresMigration: conversation: postgresql conversationCodes: postgresql diff --git a/services/background-worker/default.nix b/services/background-worker/default.nix index 6137da6af87..524be8f3efa 100644 --- a/services/background-worker/default.nix +++ b/services/background-worker/default.nix @@ -11,6 +11,7 @@ , bytestring-conversion , cassandra-util , containers +, cron , data-default , data-timeout , exceptions @@ -44,6 +45,7 @@ , ssl-util , tagged , text +, time , tinylog , transformers , transformers-base @@ -71,6 +73,7 @@ mkDerivation { bytestring-conversion cassandra-util containers + cron data-timeout exceptions extended @@ -93,6 +96,7 @@ mkDerivation { ssl-util tagged text + time tinylog transformers transformers-base diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index 315bea5bd3b..fed929f38ae 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -34,6 +34,7 @@ import Wire.BackgroundWorker.Health qualified as Health import Wire.BackgroundWorker.Jobs.Consumer qualified as Jobs import Wire.BackgroundWorker.Options import Wire.DeadUserNotificationWatcher qualified as DeadUserNotificationWatcher +import Wire.MeetingsCleanupWorker qualified as MeetingsCleanupWorker import Wire.Migration import Wire.Options.Galley qualified as Galley import Wire.PostgresMigrations qualified as Migrations @@ -82,6 +83,10 @@ run opts galleyOpts = do runAppT env $ withNamedLogger "background-job-consumer" $ Jobs.startWorker amqpEP + cleanupMeetings <- + runAppT env $ + withNamedLogger "meetings-cleanup" $ + MeetingsCleanupWorker.startWorker opts.meetingsCleanup let cleanup = void $ runConcurrently $ @@ -93,6 +98,7 @@ run opts galleyOpts = do <*> Concurrently cleanupTeamFeaturesMigration <*> Concurrently cleanupDomainRegistrationMigration <*> Concurrently cleanupJobs + <*> Concurrently cleanupMeetings let server = defaultServer (T.unpack opts.backgroundWorker.host) opts.backgroundWorker.port env.logger let settings = newSettings server diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs index 981ae2139f6..01754e066be 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Env.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -77,6 +77,7 @@ data Env = Env httpManager :: Manager, defederationTimeout :: ResponseTimeout, backendNotificationMetrics :: BackendNotificationMetrics, + meetingsCleanupMetrics :: MeetingsCleanupMetrics, backendNotificationsConfig :: BackendNotificationsConfig, backgroundJobsConfig :: BackgroundJobsConfig, workerRunningGauge :: Vector Text Gauge, @@ -112,6 +113,10 @@ data BackendNotificationMetrics = BackendNotificationMetrics stuckQueuesGauge :: Vector Text Gauge } +data MeetingsCleanupMetrics = MeetingsCleanupMetrics + { runsCounter :: Counter + } + mkBackendNotificationMetrics :: IO BackendNotificationMetrics mkBackendNotificationMetrics = BackendNotificationMetrics @@ -119,6 +124,11 @@ mkBackendNotificationMetrics = <*> register (vector "targetDomain" $ counter $ Prometheus.Info "wire_backend_notifications_errors" "Number of errors that occurred while pushing notifications") <*> register (vector "targetDomain" $ gauge $ Prometheus.Info "wire_backend_notifications_stuck_queues" "Set to 1 when pushing notifications is stuck") +mkMeetingsCleanupMetrics :: IO MeetingsCleanupMetrics +mkMeetingsCleanupMetrics = + MeetingsCleanupMetrics + <$> register (counter $ Prometheus.Info "wire_meetings_cleanup_runs_total" "Number of times the meetings cleanup job has run") + mkWorkerRunningGauge :: IO (Vector Text Gauge) mkWorkerRunningGauge = register (vector "worker" $ gauge $ Prometheus.Info "wire_background_worker_running_workers" "Set to 1 when a worker is running") @@ -146,6 +156,7 @@ mkEnv opts galleyOpts = do (BackgroundJobConsumer, False) ] backendNotificationMetrics <- mkBackendNotificationMetrics + meetingsCleanupMetrics <- mkMeetingsCleanupMetrics let backendNotificationsConfig = opts.backendNotificationPusher backgroundJobsConfig = opts.backgroundJobs federationDomain = galleyOpts._settings._federationDomain diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index 2d1078fe8f1..c3256dc7e88 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE RecordWildCards #-} + -- This file is part of the Wire Server implementation. -- -- Copyright (C) 2025 Wire Swiss GmbH @@ -18,12 +20,14 @@ module Wire.BackgroundWorker.Options where import Data.Aeson +import Data.Aeson.Types (JSONPathElement (Key), parserThrowError) import Data.Misc import Data.Range (Range) import GHC.Generics import Hasql.Pool.Extended import Imports import Network.AMQP.Extended +import System.Cron (CronSchedule, parseCronSchedule) import System.Logger.Extended import Util.Options import Wire.Migration @@ -51,6 +55,7 @@ data Opts = Opts migrateConversationCodes :: !Bool, migrateTeamFeatures :: !Bool, migrateDomainRegistration :: !Bool, + meetingsCleanup :: MeetingsCleanupConfig, backgroundJobs :: BackgroundJobsConfig } deriving (Show, Generic) @@ -93,3 +98,25 @@ data BackgroundJobsConfig = BackgroundJobsConfig } deriving (Show, Generic) deriving (FromJSON) via Generically BackgroundJobsConfig + +data MeetingsCleanupConfig = MeetingsCleanupConfig + { -- | Delete meetings older than this many hours + cleanOlderThanHours :: Double, + -- | Maximum number of meetings to delete per batch + batchSize :: Int, + -- | Cron schedule for the cleanup job + schedule :: CronSchedule + } + deriving (Show, Generic) + +instance FromJSON MeetingsCleanupConfig where + parseJSON = + withObject "MeetingsCleanupConfig" $ \o -> do + cleanOlderThanHours <- o .: "cleanOlderThanHours" + batchSize <- o .: "batchSize" + scheduleRaw <- o .: "schedule" + schedule <- + case parseCronSchedule scheduleRaw of + Left e -> parserThrowError [Key "schedule"] $ "Cannot parse cronjob syntax: " <> e + Right x -> pure x + pure $ MeetingsCleanupConfig {..} diff --git a/services/background-worker/src/Wire/MeetingsCleanupWorker.hs b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs new file mode 100644 index 00000000000..2fa5afcb887 --- /dev/null +++ b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs @@ -0,0 +1,134 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.MeetingsCleanupWorker + ( startWorker, + cleanupOldMeetings, + CleanupConfig (..), + ) +where + +import Data.Bifunctor (first) +import Data.Time.Clock +import Hasql.Pool (UsageError) +import Imports +import Polysemy +import Polysemy.Error (runError) +import Polysemy.Input (runInputConst) +import Prometheus (incCounter) +import System.Cron (Job (..), forkJob) +import System.Logger qualified as Log +import Wire.BackgroundWorker.Env (AppT, Env (..), MeetingsCleanupMetrics (..), runAppT) +import Wire.BackgroundWorker.Options (MeetingsCleanupConfig (..)) +import Wire.BackgroundWorker.Util (CleanupAction) +import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) +import Wire.MeetingsStore.Postgres (interpretMeetingsStoreToPostgres) +import Wire.MeetingsSubsystemCleaning qualified as Meetings +import Wire.MeetingsSubsystemCleaning.Interpreter (interpretMeetingsSubsystemCleaning) + +data CleanupConfig = CleanupConfig + { retentionHours :: Double, + batchSize :: Int + } + deriving (Show, Eq) + +-- | Start the meetings cleanup worker thread +-- +-- This worker runs periodically to clean up old meetings based on the configuration. +startWorker :: + MeetingsCleanupConfig -> + AppT IO CleanupAction +startWorker config = do + env <- ask + Log.info env.logger $ + Log.msg (Log.val "Starting meetings cleanup worker") + . Log.field "schedule" (show config.schedule) + . Log.field "clean_older_than_hours" config.cleanOlderThanHours + + void . liftIO $ do + forkJob $ + Job config.schedule $ + runAppT env $ do + Log.info env.logger $ Log.msg (Log.val "Starting scheduled meetings cleanup") + cleanupOldMeetings (configFromOptions config) + liftIO $ incCounter env.meetingsCleanupMetrics.runsCounter + + pure $ pure () + +-- | Convert MeetingsCleanupConfig to CleanupConfig +configFromOptions :: MeetingsCleanupConfig -> CleanupConfig +configFromOptions cfg = + CleanupConfig + { retentionHours = cfg.cleanOlderThanHours, + batchSize = cfg.batchSize + } + +-- | Main cleanup function that orchestrates the cleanup process +cleanupOldMeetings :: CleanupConfig -> AppT IO () +cleanupOldMeetings config = do + env <- ask + now <- liftIO getCurrentTime + let cutoffTime = addUTCTime (negate $ realToFrac config.retentionHours * 3600) now + + Log.info env.logger $ + Log.msg (Log.val "Starting cleanup of old meetings") + . Log.field "cutoff_time" (show cutoffTime) + . Log.field "retention_hours" config.retentionHours + . Log.field "batch_size" config.batchSize + + -- Loop until no more meetings are deleted + totalDeleted <- cleanupLoop env cutoffTime config.batchSize 0 + + Log.info env.logger $ + Log.msg (Log.val "Completed cleanup of old meetings") + . Log.field "total_deleted" totalDeleted + +cleanupLoop :: Env -> UTCTime -> Int -> Int64 -> AppT IO Int64 +cleanupLoop env cutoffTime batchSize totalSoFar = do + -- Run the subsystem to handle cleanup logic + result <- liftIO $ runMeetingsCleanup env cutoffTime batchSize + + case result of + Left err -> do + Log.err env.logger $ + Log.msg (Log.val "Failed to cleanup old meetings batch") + . Log.field "error" (show err) + . Log.field "total_deleted_so_far" totalSoFar + pure totalSoFar + Right deletedCount -> do + let newTotal = totalSoFar + deletedCount + Log.info env.logger $ + Log.msg (Log.val "Cleaned up meetings batch") + . Log.field "batch_deleted" deletedCount + . Log.field "total_deleted" newTotal + -- Continue if we deleted a full batch (meaning there might be more) + if deletedCount >= fromIntegral batchSize + then cleanupLoop env cutoffTime batchSize newTotal + else pure newTotal + +-- Run the meetings cleanup using the subsystem +runMeetingsCleanup :: Env -> UTCTime -> Int -> IO (Either String Int64) +runMeetingsCleanup env cutoffTime batchSize = + fmap (first show) + . runM + . runError @UsageError + . runInputConst env.hasqlPool + . interpretMeetingsStoreToPostgres + . runInputConst env.hasqlPool + . interpretConversationStoreToPostgres + . interpretMeetingsSubsystemCleaning + $ Meetings.cleanupOldMeetings cutoffTime batchSize diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index 1b4715d07a1..3e38adbed9c 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -57,7 +57,7 @@ import Test.Hspec import Test.QuickCheck import Test.Wire.Util import UnliftIO.Async -import Util.Options (Endpoint (..), PasswordHashingOptions (..)) +import Util.Options import Wire.API.Conversation.Action import Wire.API.Federation.API import Wire.API.Federation.API.Brig @@ -385,6 +385,7 @@ spec = do passwordHashingRateLimitEnv <- newRateLimitEnv defTestRateLimitConfig backendNotificationMetrics <- mkBackendNotificationMetrics + meetingsCleanupMetrics <- mkMeetingsCleanupMetrics workerRunningGauge <- mkWorkerRunningGauge domains <- runAppT Env {..} $ getRemoteDomains (fromJust rabbitmqAdminClient) domains `shouldBe` map Domain ["foo.example", "bar.example", "baz.example"] @@ -437,6 +438,7 @@ spec = do passwordHashingRateLimitEnv <- newRateLimitEnv defTestRateLimitConfig backendNotificationMetrics <- mkBackendNotificationMetrics + meetingsCleanupMetrics <- mkMeetingsCleanupMetrics workerRunningGauge <- mkWorkerRunningGauge domainsThread <- async $ runAppT Env {..} $ getRemoteDomains (fromJust rabbitmqAdminClient) diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index a3e23d4ea56..033a2347305 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -49,6 +49,7 @@ testEnv = do } statuses <- newIORef mempty backendNotificationMetrics <- mkBackendNotificationMetrics + meetingsCleanupMetrics <- mkMeetingsCleanupMetrics workerRunningGauge <- mkWorkerRunningGauge httpManager <- newManager defaultManagerSettings let federatorInternal = Endpoint "localhost" 0 diff --git a/services/galley/galley.integration.yaml b/services/galley/galley.integration.yaml index 9d1c23291cb..04cd4f9c8a0 100644 --- a/services/galley/galley.integration.yaml +++ b/services/galley/galley.integration.yaml @@ -93,7 +93,7 @@ settings: maxRateLimitedKeys: 100000 # Estimated memory usage: 4 MB meetings: - validityPeriod: "48h" + validityPeriod: "5s" # We explicitly do not disable any API version. Please make sure the configuration value is the same in all these configs: # brig, cannon, cargohold, galley, gundeck, proxy, spar. From 9e78888bb016b356129524aac133340250efe696 Mon Sep 17 00:00:00 2001 From: Gautier DI FOLCO Date: Mon, 4 May 2026 10:02:17 +0200 Subject: [PATCH 2/7] fix(leif): feedbacks --- charts/integration/templates/configmap.yaml | 10 +++---- hack/helm_vars/wire-server/values.yaml.gotmpl | 6 +++++ .../MeetingsSubsystemCleaning/Interpreter.hs | 26 +++++++------------ .../src/Wire/BackgroundWorker/Options.hs | 3 +++ .../src/Wire/MeetingsCleanupWorker.hs | 21 ++++++++++++++- 5 files changed, 43 insertions(+), 23 deletions(-) diff --git a/charts/integration/templates/configmap.yaml b/charts/integration/templates/configmap.yaml index b5f2d351732..a1b2592c310 100644 --- a/charts/integration/templates/configmap.yaml +++ b/charts/integration/templates/configmap.yaml @@ -54,7 +54,7 @@ data: port: 8080 backgroundWorker: - host: backgroundWorker.{{ .Release.Namespace }}.svc.cluster.local + host: background-worker.{{ .Release.Namespace }}.svc.cluster.local port: 8080 # Background jobs defaults for integration tests backgroundJobs: @@ -145,7 +145,7 @@ data: port: 8080 backgroundWorker: - host: backgroundWorker.{{ .Release.Namespace }}-fed2.svc.cluster.local + host: background-worker.{{ .Release.Namespace }}-fed2.svc.cluster.local port: 8080 stern: @@ -212,7 +212,7 @@ data: host: proxy.wire-federation-v0.svc.cluster.local port: 8080 backgroundWorker: - host: backgroundWorker.wire-federation-v0.svc.cluster.local + host: background-worker.wire-federation-v0.svc.cluster.local port: 8080 stern: host: stern.wire-federation-v0.svc.cluster.local @@ -255,7 +255,7 @@ data: host: proxy.wire-federation-v1.svc.cluster.local port: 8080 backgroundWorker: - host: backgroundWorker.wire-federation-v1.svc.cluster.local + host: background-worker.wire-federation-v1.svc.cluster.local port: 8080 stern: host: stern.wire-federation-v1.svc.cluster.local @@ -298,7 +298,7 @@ data: host: proxy.wire-federation-v2.svc.cluster.local port: 8080 backgroundWorker: - host: backgroundWorker.wire-federation-v2.svc.cluster.local + host: background-worker.wire-federation-v2.svc.cluster.local port: 8080 stern: host: stern.wire-federation-v2.svc.cluster.local diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index 116b9315c68..2879cc50249 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -314,6 +314,8 @@ galley: # See helmfile for the real value federationDomain: integration.example.com disabledAPIVersions: [] + meetings: + validityPeriod: "5s" # These values are insecure, against anyone getting hold of the hash, # but its not a concern for the integration tests. @@ -646,6 +648,10 @@ background-worker: concurrency: 8 jobTimeout: 60s maxAttempts: 3 + meetingsCleanup: + cleanOlderThanHours: 0.0014 + batchSize: 100 + schedule: "* * * * *" # Cassandra clusters used by background-worker cassandra: host: {{ .Values.cassandraHost }} diff --git a/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs b/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs index f11d42b0728..46dfe744a2f 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs @@ -45,29 +45,21 @@ cleanupOldMeetingsImpl :: Int -> Sem r Int64 cleanupOldMeetingsImpl cutoffTime batchSize = do - -- 1. Fetch old meetings oldMeetings <- Store.getOldMeetings cutoffTime batchSize - if null oldMeetings then pure 0 else do - -- 2. Extract meeting IDs and conversation IDs - let meetingIds = map (\Store.StoredMeeting {id = mid} -> mid) oldMeetings - convIds = map (\Store.StoredMeeting {conversationId = cid} -> cid) oldMeetings - - -- 3. Delete meetings from database - deletedCount <- Store.deleteMeetingBatch meetingIds - - -- 4. Delete associated conversations if they are meeting conversations - -- We need to check if conversation has GroupConvType = MeetingConversation - for_ (zip oldMeetings convIds) $ \(meeting, convId) -> do - maybeConv <- ConvStore.getConversation convId + -- 2. Delete associated conversations first (before meetings) + -- This ensures proper cleanup: conversation data should be removed before meeting records + -- We only delete conversations that are meeting conversations (GroupConvType = MeetingConversation) + for_ oldMeetings $ \meeting -> do + maybeConv <- ConvStore.getConversation meeting.conversationId case maybeConv of Just conv | conv.metadata.cnvmGroupConvType == Just MeetingConversation, - conv.id_ == convId, - meeting.conversationId == convId -> - ConvStore.deleteConversation convId + conv.id_ == meeting.conversationId -> + ConvStore.deleteConversation meeting.conversationId _ -> pure () - pure deletedCount + -- 3. Now delete the meeting records from the database + Store.deleteMeetingBatch $ map (\Store.StoredMeeting {id = mid} -> mid) oldMeetings diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index c3256dc7e88..1fa04d5d6d9 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -114,6 +114,9 @@ instance FromJSON MeetingsCleanupConfig where withObject "MeetingsCleanupConfig" $ \o -> do cleanOlderThanHours <- o .: "cleanOlderThanHours" batchSize <- o .: "batchSize" + when (batchSize <= 0) $ + parserThrowError [Key "batchSize"] $ + "batchSize must be greater than 0, got: " <> show batchSize scheduleRaw <- o .: "schedule" schedule <- case parseCronSchedule scheduleRaw of diff --git a/services/background-worker/src/Wire/MeetingsCleanupWorker.hs b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs index 2fa5afcb887..fd388187fa1 100644 --- a/services/background-worker/src/Wire/MeetingsCleanupWorker.hs +++ b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs @@ -39,6 +39,7 @@ import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) import Wire.MeetingsStore.Postgres (interpretMeetingsStoreToPostgres) import Wire.MeetingsSubsystemCleaning qualified as Meetings import Wire.MeetingsSubsystemCleaning.Interpreter (interpretMeetingsSubsystemCleaning) +import Wire.PostgresMigrationOpts data CleanupConfig = CleanupConfig { retentionHours :: Double, @@ -99,6 +100,11 @@ cleanupOldMeetings config = do cleanupLoop :: Env -> UTCTime -> Int -> Int64 -> AppT IO Int64 cleanupLoop env cutoffTime batchSize totalSoFar = do + when (batchSize <= 0) $ do + Log.err env.logger $ + Log.msg (Log.val "Invalid batch size: must be greater than 0") + . Log.field "batch_size" batchSize + error "Invalid batch size: must be greater than 0" -- Run the subsystem to handle cleanup logic result <- liftIO $ runMeetingsCleanup env cutoffTime batchSize @@ -121,8 +127,21 @@ cleanupLoop env cutoffTime batchSize totalSoFar = do else pure newTotal -- Run the meetings cleanup using the subsystem +-- Note: Meetings are stored only in PostgreSQL. Conversations may be in Cassandra, +-- PostgreSQL, or both during migration. Currently we only support PostgreSQL conversations. +-- For Cassandra or migration modes, conversations won't be properly cleaned up. +-- TODO: Add full migration awareness to support Cassandra and migration modes. runMeetingsCleanup :: Env -> UTCTime -> Int -> IO (Either String Int64) -runMeetingsCleanup env cutoffTime batchSize = +runMeetingsCleanup env cutoffTime batchSize = do + -- Log a warning if we're not in pure PostgreSQL mode + case env.postgresMigration.conversation of + CassandraStorage -> do + Log.err env.logger $ + Log.msg (Log.val "Meetings cleanup: conversations are in Cassandra mode, but cleanup only supports PostgreSQL") + MigrationToPostgresql -> do + Log.warn env.logger $ + Log.msg (Log.val "Meetings cleanup: conversations are in migration mode. Only PostgreSQL conversations will be cleaned up.") + PostgresqlStorage -> pure () fmap (first show) . runM . runError @UsageError From 9cbea4493b95218daa30556fea7d5e8e896c7ca2 Mon Sep 17 00:00:00 2001 From: Gautier DI FOLCO Date: Tue, 5 May 2026 17:15:25 +0200 Subject: [PATCH 3/7] fix(leif): handle all possible conversation backends --- .../src/Wire/ConversationStore/Cassandra.hs | 21 +++++++++ .../src/Wire/MeetingsStore/Postgres.hs | 3 +- .../src/Wire/BackgroundWorker.hs | 2 +- .../Wire/BackgroundWorker/Jobs/Registry.hs | 12 +++-- .../src/Wire/MeetingsCleanupWorker.hs | 44 +++++++++++-------- services/galley/src/Galley/App.hs | 22 ++-------- 6 files changed, 58 insertions(+), 46 deletions(-) diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs index dbddef3cd34..9ec296ea43a 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs @@ -19,6 +19,7 @@ module Wire.ConversationStore.Cassandra ( interpretMLSCommitLockStoreToCassandra, interpretConversationStoreToCassandra, interpretConversationStoreToCassandraAndPostgres, + interpretConversationStoreByMigration, MigrationError (..), ) where @@ -81,6 +82,7 @@ import Wire.ConversationStore.Migration.Cleanup import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) import Wire.MigrationLock import Wire.Postgres +import Wire.PostgresMigrationOpts (StorageLocation (..)) import Wire.Sem.Paging.Cassandra import Wire.StoredConversation import Wire.StoredConversation qualified as StoreConv @@ -1606,3 +1608,22 @@ withMigrationLocksAndUserCleanup cassClient lockType maxWait userIds action = . runInputConst cassClient $ cleanupIfNecessary (Right <$> userIds) action + +interpretConversationStoreByMigration :: + forall r a. + ( Member TinyLog r, + PGConstraints r, + Member Async r, + Member (Error MigrationError) r, + Member Race r, + Member Resource r + ) => + StorageLocation -> + ClientState -> + Sem (ConversationStore ': r) a -> + Sem r a +interpretConversationStoreByMigration storageLocation client = + case storageLocation of + CassandraStorage -> interpretConversationStoreToCassandra client + MigrationToPostgresql -> interpretConversationStoreToCassandraAndPostgres client + PostgresqlStorage -> interpretConversationStoreToPostgres diff --git a/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs b/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs index 1102cbac0b4..c57afaba032 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs @@ -31,11 +31,12 @@ import Data.Time.Clock import Data.UUID (UUID, nil) import Data.Vector qualified as V import Hasql.Pool +import Hasql.Session import Hasql.Statement import Hasql.TH import Imports import Polysemy -import Polysemy.Error (Error) +import Polysemy.Error (Error, throw) import Polysemy.Input import Wire.API.Meeting (Recurrence) import Wire.API.PostgresMarshall (PostgresMarshall (..), PostgresUnmarshall (..), dimapPG) diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index fed929f38ae..0cbe491ec32 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -90,7 +90,7 @@ run opts galleyOpts = do let cleanup = void $ runConcurrently $ - (,,,,,,) + (,,,,,,,) <$> Concurrently cleanupDeadUserNotifWatcher <*> Concurrently cleanupBackendNotifPusher <*> Concurrently cleanupConvMigration diff --git a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs index e595330f457..e8f985c8fc6 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs @@ -70,8 +70,7 @@ import Wire.ClientSubsystem.Error (ClientError) import Wire.CodeStore.Cassandra (interpretCodeStoreToCassandra) import Wire.CodeStore.DualWrite (interpretCodeStoreToCassandraAndPostgres) import Wire.CodeStore.Postgres (interpretCodeStoreToPostgres) -import Wire.ConversationStore.Cassandra -import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) +import Wire.ConversationStore.Cassandra (MigrationError (..), interpretConversationStoreByMigration, interpretMLSCommitLockStoreToCassandra) import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemError, GroupInfoCheckEnabled (..), IntraListing (..), interpretConversationSubsystem) import Wire.ExternalAccess.External import Wire.FeaturesConfigSubsystem (getAllTeamFeaturesForServer) @@ -171,13 +170,11 @@ dispatchJob job = do env <- ask @Env let disableTlsV1 = True extEnv <- liftIO (initExtEnv disableTlsV1) - liftIO $ runInterpreters env extEnv $ runJob job + let mergeErrors = either (Left . T.pack . show) id + liftIO $ fmap mergeErrors $ runInterpreters env extEnv $ runJob job where convStoreInterpreter env = - case env.postgresMigration.conversation of - CassandraStorage -> interpretConversationStoreToCassandra env.cassandraGalley - MigrationToPostgresql -> interpretConversationStoreToCassandraAndPostgres env.cassandraGalley - PostgresqlStorage -> interpretConversationStoreToPostgres + interpretConversationStoreByMigration env.postgresMigration.conversation env.cassandraGalley runInterpreters env extEnv = do let federationAPIAccessConfig = FederationAPIAccessConfig @@ -203,6 +200,7 @@ dispatchJob job = do . interpretRace . runDelay . resourceToIOFinal + . runError @MigrationError . runError . mapError @DynError (.eMessage) . mapError @JSONResponse (T.pack . show . (.value)) diff --git a/services/background-worker/src/Wire/MeetingsCleanupWorker.hs b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs index fd388187fa1..f5e9904e21c 100644 --- a/services/background-worker/src/Wire/MeetingsCleanupWorker.hs +++ b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs @@ -27,15 +27,21 @@ import Data.Time.Clock import Hasql.Pool (UsageError) import Imports import Polysemy +import Polysemy.Async (asyncToIOFinal) +import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency) +import Polysemy.Conc import Polysemy.Error (runError) import Polysemy.Input (runInputConst) +import Polysemy.Resource (resourceToIOFinal) +import Polysemy.TinyLog qualified as P import Prometheus (incCounter) import System.Cron (Job (..), forkJob) import System.Logger qualified as Log +import System.Logger.Class (Logger) import Wire.BackgroundWorker.Env (AppT, Env (..), MeetingsCleanupMetrics (..), runAppT) import Wire.BackgroundWorker.Options (MeetingsCleanupConfig (..)) import Wire.BackgroundWorker.Util (CleanupAction) -import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) +import Wire.ConversationStore.Cassandra (MigrationError (..), interpretConversationStoreByMigration) import Wire.MeetingsStore.Postgres (interpretMeetingsStoreToPostgres) import Wire.MeetingsSubsystemCleaning qualified as Meetings import Wire.MeetingsSubsystemCleaning.Interpreter (interpretMeetingsSubsystemCleaning) @@ -126,28 +132,30 @@ cleanupLoop env cutoffTime batchSize totalSoFar = do then cleanupLoop env cutoffTime batchSize newTotal else pure newTotal +interpretTinyLog :: + (Member (Embed IO) r) => + Logger -> + Sem (P.TinyLog ': r) a -> + Sem r a +interpretTinyLog logger = interpret $ \case + P.Log lvl msg -> Log.log logger lvl msg + -- Run the meetings cleanup using the subsystem --- Note: Meetings are stored only in PostgreSQL. Conversations may be in Cassandra, --- PostgreSQL, or both during migration. Currently we only support PostgreSQL conversations. --- For Cassandra or migration modes, conversations won't be properly cleaned up. --- TODO: Add full migration awareness to support Cassandra and migration modes. runMeetingsCleanup :: Env -> UTCTime -> Int -> IO (Either String Int64) -runMeetingsCleanup env cutoffTime batchSize = do - -- Log a warning if we're not in pure PostgreSQL mode - case env.postgresMigration.conversation of - CassandraStorage -> do - Log.err env.logger $ - Log.msg (Log.val "Meetings cleanup: conversations are in Cassandra mode, but cleanup only supports PostgreSQL") - MigrationToPostgresql -> do - Log.warn env.logger $ - Log.msg (Log.val "Meetings cleanup: conversations are in migration mode. Only PostgreSQL conversations will be cleaned up.") - PostgresqlStorage -> pure () - fmap (first show) - . runM +runMeetingsCleanup env cutoffTime batchSize = + fmap (either (Left . show) (first show)) + . runFinal @IO + . unsafelyPerformConcurrency + . resourceToIOFinal + . runError @MigrationError . runError @UsageError + . embedToFinal @IO + . interpretTinyLog env.logger + . interpretRace + . asyncToIOFinal . runInputConst env.hasqlPool . interpretMeetingsStoreToPostgres . runInputConst env.hasqlPool - . interpretConversationStoreToPostgres + . interpretConversationStoreByMigration env.postgresMigration.conversation env.cassandraGalley . interpretMeetingsSubsystemCleaning $ Meetings.cleanupOldMeetings cutoffTime batchSize diff --git a/services/galley/src/Galley/App.hs b/services/galley/src/Galley/App.hs index b9cf3fe3fa8..3e9708a1b7d 100644 --- a/services/galley/src/Galley/App.hs +++ b/services/galley/src/Galley/App.hs @@ -103,8 +103,7 @@ import Wire.CodeStore.Cassandra import Wire.CodeStore.DualWrite import Wire.CodeStore.Postgres import Wire.ConversationStore (ConversationStore, MLSCommitLockStore) -import Wire.ConversationStore.Cassandra -import Wire.ConversationStore.Postgres +import Wire.ConversationStore.Cassandra (MigrationError (..), interpretConversationStoreByMigration, interpretMLSCommitLockStoreToCassandra) import Wire.ConversationSubsystem import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemError, GroupInfoCheckEnabled (..), IntraListing (IntraListing), interpretConversationSubsystem) import Wire.CustomBackendStore @@ -142,7 +141,6 @@ import Wire.Options.Galley hiding (brig, endpoint, federator) import Wire.Options.Galley qualified as O import Wire.Options.Keys import Wire.ParseException -import Wire.Postgres (PGConstraints) import Wire.ProposalStore (ProposalStore) import Wire.ProposalStore.Cassandra import Wire.RateLimit @@ -402,22 +400,8 @@ logAndMapError fErr fLog logMsg action = evalGalley :: Env -> Sem GalleyEffects a -> ExceptT JSONResponse IO a evalGalley e = - let convStoreInterpreter :: - forall r a. - ( Member TinyLog r, - PGConstraints r, - Member Async r, - Member (Error MigrationError) r, - Member Race r, - Member Resource r - ) => - Sem (ConversationStore ': r) a -> - Sem r a - convStoreInterpreter = - case (e ^. options . postgresMigration).conversation of - CassandraStorage -> interpretConversationStoreToCassandra (e ^. cstate) - MigrationToPostgresql -> interpretConversationStoreToCassandraAndPostgres (e ^. cstate) - PostgresqlStorage -> interpretConversationStoreToPostgres + let convStoreInterpreter = + interpretConversationStoreByMigration (e ^. options . postgresMigration).conversation (e ^. cstate) convCodesStoreInterpreter = case (e ^. options . postgresMigration).conversationCodes of CassandraStorage -> interpretCodeStoreToCassandra From cf184f9a40a1a23a925bc3ee35851d7bb2f4245a Mon Sep 17 00:00:00 2001 From: Gautier DI FOLCO Date: Wed, 6 May 2026 15:16:30 +0200 Subject: [PATCH 4/7] refactor(leif): rely on `Actions` for deletion --- .../src/Wire/ConversationSubsystem.hs | 3 + .../src/Wire/ConversationSubsystem/Action.hs | 21 + .../Wire/ConversationSubsystem/Interpreter.hs | 3 + .../wire-subsystems/src/Wire/MeetingsStore.hs | 3 - .../src/Wire/MeetingsStore/Postgres.hs | 31 +- .../src/Wire/MeetingsSubsystem.hs | 5 + .../src/Wire/MeetingsSubsystem/Interpreter.hs | 41 +- .../src/Wire/MeetingsSubsystemCleaning.hs | 32 -- .../MeetingsSubsystemCleaning/Interpreter.hs | 65 --- .../Wire/MeetingsSubsystem/InterpreterSpec.hs | 3 + .../MockInterpreters/ConversationSubsystem.hs | 4 + .../Wire/MockInterpreters/MeetingsStore.hs | 6 - libs/wire-subsystems/wire-subsystems.cabal | 2 - .../background-worker/background-worker.cabal | 1 + .../Wire/BackgroundWorker/Jobs/Registry.hs | 277 +------------ .../background-worker/src/Wire/Effects.hs | 389 ++++++++++++++++++ .../src/Wire/MeetingsCleanupWorker.hs | 63 +-- 17 files changed, 498 insertions(+), 451 deletions(-) delete mode 100644 libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning.hs delete mode 100644 libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs create mode 100644 services/background-worker/src/Wire/Effects.hs diff --git a/libs/wire-subsystems/src/Wire/ConversationSubsystem.hs b/libs/wire-subsystems/src/Wire/ConversationSubsystem.hs index 44605b64467..cdf585c9150 100644 --- a/libs/wire-subsystems/src/Wire/ConversationSubsystem.hs +++ b/libs/wire-subsystems/src/Wire/ConversationSubsystem.hs @@ -339,6 +339,9 @@ data ConversationSubsystem m a where ConnId -> Local ConvId -> ConversationSubsystem m (UpdateResult Event) + InternalDeleteLocalConversation :: + Local ConvId -> + ConversationSubsystem m () GetMLSPublicKeys :: Maybe MLSPublicKeyFormat -> ConversationSubsystem m (MLSKeysByPurpose (MLSKeys SomeKey)) diff --git a/libs/wire-subsystems/src/Wire/ConversationSubsystem/Action.hs b/libs/wire-subsystems/src/Wire/ConversationSubsystem/Action.hs index 3c81ed700f9..7ca234495fc 100644 --- a/libs/wire-subsystems/src/Wire/ConversationSubsystem/Action.hs +++ b/libs/wire-subsystems/src/Wire/ConversationSubsystem/Action.hs @@ -29,6 +29,7 @@ module Wire.ConversationSubsystem.Action updateLocalConversationLeave, updateLocalConversationMemberUpdate, updateLocalConversationDelete, + updateLocalConversationDeleteUnchecked, updateLocalConversationRename, updateLocalConversationMessageTimerUpdate, updateLocalConversationReceiptModeUpdate, @@ -1088,6 +1089,26 @@ updateLocalConversationDelete :: updateLocalConversationDelete lcnvId uid connId = updateLocalConversation @'ConversationDeleteTag lcnvId uid connId () +updateLocalConversationDeleteUnchecked :: + ( Member (ErrorS 'InvalidOperation) r, + Member (ErrorS 'ConvNotFound) r, + Member CodeStore r, + Member E.ConversationStore r, + Member (ErrorS 'NotATeamMember) r, + Member ProposalStore r + ) => + Local ConvId -> + Sem r () +updateLocalConversationDeleteUnchecked lcnv = do + let tag = sing @'ConversationDeleteTag + conv <- getConversationWithError lcnv + -- check that the action does not bypass the underlying protocol + unless (protocolValidAction conv.protocol tag ()) $ + throwS @'InvalidOperation + -- perform all authorisation checks and, if successful, then update itself + let lconv = qualifyAs lcnv conv + void $ performAction @'ConversationDeleteTag lconv (error "not used") Nothing () + updateLocalConversationRename :: ( Member (Error FederationError) r, Member (ErrorS ('ActionDenied (ConversationActionPermission 'ConversationRenameTag))) r, diff --git a/libs/wire-subsystems/src/Wire/ConversationSubsystem/Interpreter.hs b/libs/wire-subsystems/src/Wire/ConversationSubsystem/Interpreter.hs index 672372cf8bc..d1e3f48b8b5 100644 --- a/libs/wire-subsystems/src/Wire/ConversationSubsystem/Interpreter.hs +++ b/libs/wire-subsystems/src/Wire/ConversationSubsystem/Interpreter.hs @@ -44,6 +44,7 @@ import Wire.CodeStore (CodeStore) import Wire.ConversationStore (ConversationStore) import Wire.ConversationStore qualified as ConvStore import Wire.ConversationSubsystem (ConversationSubsystem (..)) +import Wire.ConversationSubsystem.Action qualified as Action import Wire.ConversationSubsystem.Action.Notify qualified as ActionNotify import Wire.ConversationSubsystem.Clients as Clients import Wire.ConversationSubsystem.Create qualified as Create @@ -210,6 +211,8 @@ interpretConversationSubsystem = interpret $ \case mapErrors $ Update.postProteusBroadcast lusr con msg DeleteLocalConversation lusr con lcnv -> mapErrors $ Update.deleteLocalConversation lusr con lcnv + InternalDeleteLocalConversation lcnv -> + mapErrors $ Action.updateLocalConversationDeleteUnchecked lcnv GetMLSPublicKeys fmt -> mapErrors $ MLS.getMLSPublicKeys fmt ResetMLSConversation lusr reset -> diff --git a/libs/wire-subsystems/src/Wire/MeetingsStore.hs b/libs/wire-subsystems/src/Wire/MeetingsStore.hs index 2819f809f87..63ad71f3902 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsStore.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsStore.hs @@ -172,8 +172,5 @@ data MeetingsStore m a where UTCTime -> Int -> MeetingsStore m [StoredMeeting] - DeleteMeetingBatch :: - [MeetingId] -> - MeetingsStore m Int64 makeSem ''MeetingsStore diff --git a/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs b/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs index c57afaba032..4ee034fccd2 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsStore/Postgres.hs @@ -67,8 +67,6 @@ interpretMeetingsStoreToPostgres = removeInvitedEmailsImpl meetingId emails GetOldMeetings cutoffTime batchSize -> getOldMeetingsImpl cutoffTime batchSize - DeleteMeetingBatch meetingIds -> - deleteMeetingBatchImpl meetingIds -- * Create @@ -396,28 +394,7 @@ getOldMeetingsImpl cutoffTime batchSize = do conversation_id :: uuid, invited_emails :: text[], trial :: boolean, created_at :: timestamptz, updated_at :: timestamptz FROM meetings - WHERE end_time < ($1 :: timestamptz) - ORDER BY end_time ASC - LIMIT ($2 :: int4) - |] - -deleteMeetingBatchImpl :: - ( Member (Input Pool) r, - Member (Embed IO) r, - Member (Error UsageError) r - ) => - [MeetingId] -> - Sem r Int64 -deleteMeetingBatchImpl meetingIds = do - pool <- input - result <- liftIO $ use pool session - either throw pure result - where - session :: Session Int64 - session = statement (V.fromList (toUUID <$> meetingIds)) deleteStatement - deleteStatement :: Statement (V.Vector UUID) Int64 - deleteStatement = - [rowsAffectedStatement| - DELETE FROM meetings - WHERE id IN (SELECT unnest($1::uuid[])) - |] + WHERE end_time < ($1 :: timestamptz) + ORDER BY end_time ASC + LIMIT ($2 :: int4) + |] diff --git a/libs/wire-subsystems/src/Wire/MeetingsSubsystem.hs b/libs/wire-subsystems/src/Wire/MeetingsSubsystem.hs index 8a126f1be0b..ea17c8a5ee9 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsSubsystem.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsSubsystem.hs @@ -21,6 +21,7 @@ module Wire.MeetingsSubsystem where import Data.Id import Data.Qualified +import Data.Time.Clock (UTCTime) import Imports import Polysemy import Wire.API.Meeting @@ -59,5 +60,9 @@ data MeetingsSubsystem m a where Qualified MeetingId -> [EmailAddress] -> MeetingsSubsystem m Bool + CleanupOldMeetings :: + UTCTime -> + Int -> + MeetingsSubsystem m Int64 makeSem ''MeetingsSubsystem diff --git a/libs/wire-subsystems/src/Wire/MeetingsSubsystem/Interpreter.hs b/libs/wire-subsystems/src/Wire/MeetingsSubsystem/Interpreter.hs index a37eb03b338..5083f6a0d3b 100644 --- a/libs/wire-subsystems/src/Wire/MeetingsSubsystem/Interpreter.hs +++ b/libs/wire-subsystems/src/Wire/MeetingsSubsystem/Interpreter.hs @@ -26,13 +26,14 @@ import Data.Default (def) import Data.Domain (Domain) import Data.Id import Data.Map qualified as Map -import Data.Qualified (Local, Qualified (..), qualifyAs, tDomain, tUnqualified) +import Data.Qualified (Local, Qualified (..), inputQualifyLocal, qualifyAs, tDomain, tUnqualified) import Data.Range (Range, unsafeRange) import Data.Set qualified as Set import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime) import Imports import Polysemy import Polysemy.Error +import Polysemy.Input (Input) import Wire.API.Conversation hiding (Member) import Wire.API.Conversation.Role (roleNameWireAdmin) import Wire.API.Meeting qualified as API @@ -73,7 +74,8 @@ interpretMeetingsSubsystem :: Member TeamSubsystem r, Member FeaturesConfigSubsystem r, Member Now r, - Member (Error MeetingError) r + Member (Error MeetingError) r, + Member (Input (Local ())) r ) => NominalDiffTime -> InterpreterFor MeetingsSubsystem r @@ -92,6 +94,8 @@ interpretMeetingsSubsystem validityPeriod = interpret $ \case addInvitedEmailsImpl zUser meetingId emails validityPeriod RemoveInvitedEmails zUser meetingId emails -> removeInvitedEmailsImpl zUser meetingId emails validityPeriod + CleanupOldMeetings cutoffTime batchSize -> + cleanupOldMeetingsImpl cutoffTime batchSize createMeetingImpl :: ( Member Store.MeetingsStore r, @@ -405,3 +409,36 @@ removeInvitedEmailsImpl zUser meetingId emails validityPeriod = do lift $ Store.removeInvitedEmails (qUnqualified meetingId) emails pure $ isJust result + +cleanupOldMeetingsImpl :: + ( Member Store.MeetingsStore r, + Member ConversationSubsystem r, + Member (Input (Local ())) r + ) => + UTCTime -> + Int -> + Sem r Int64 +cleanupOldMeetingsImpl cutoffTime batchSize = do + oldMeetings <- Store.getOldMeetings cutoffTime batchSize + if null oldMeetings + then pure 0 + else do + for_ oldMeetings forceDeleteMeeting + pure $ fromIntegral $ length oldMeetings + +forceDeleteMeeting :: + ( Member Store.MeetingsStore r, + Member ConversationSubsystem r, + Member (Input (Local ())) r + ) => + Store.StoredMeeting -> + Sem r () +forceDeleteMeeting meeting = do + maybeConv <- ConversationSubsystem.internalGetConversation meeting.conversationId + case maybeConv of + Just conv + | conv.metadata.cnvmGroupConvType == Just MeetingConversation, + conv.id_ == meeting.conversationId -> + ConversationSubsystem.internalDeleteLocalConversation =<< inputQualifyLocal meeting.conversationId + _ -> pure () + Store.deleteMeeting meeting.id diff --git a/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning.hs b/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning.hs deleted file mode 100644 index 13910851c97..00000000000 --- a/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning.hs +++ /dev/null @@ -1,32 +0,0 @@ -{-# LANGUAGE TemplateHaskell #-} - --- This file is part of the Wire Server implementation. --- --- Copyright (C) 2026 Wire Swiss GmbH --- --- This program is free software: you can redistribute it and/or modify it under --- the terms of the GNU Affero General Public License as published by the Free --- Software Foundation, either version 3 of the License, or (at your option) any --- later version. --- --- This program is distributed in the hope that it will be useful, but WITHOUT --- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS --- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more --- details. --- --- You should have received a copy of the GNU Affero General Public License along --- with this program. If not, see . - -module Wire.MeetingsSubsystemCleaning where - -import Data.Time.Clock (UTCTime) -import Imports -import Polysemy - -data MeetingsSubsystemCleaning m a where - CleanupOldMeetings :: - UTCTime -> - Int -> - MeetingsSubsystemCleaning m Int64 - -makeSem ''MeetingsSubsystemCleaning diff --git a/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs b/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs deleted file mode 100644 index 46dfe744a2f..00000000000 --- a/libs/wire-subsystems/src/Wire/MeetingsSubsystemCleaning/Interpreter.hs +++ /dev/null @@ -1,65 +0,0 @@ -{-# LANGUAGE DuplicateRecordFields #-} - --- This file is part of the Wire Server implementation. --- --- Copyright (C) 2026 Wire Swiss GmbH --- --- This program is free software: you can redistribute it and/or modify it under --- the terms of the GNU Affero General Public License as published by the Free --- Software Foundation, either version 3 of the License, or (at your option) any --- later version. --- --- This program is distributed in the hope that it will be useful, but WITHOUT --- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS --- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more --- details. --- --- You should have received a copy of the GNU Affero General Public License along --- with this program. If not, see . - -module Wire.MeetingsSubsystemCleaning.Interpreter where - -import Data.Time.Clock (UTCTime) -import Imports -import Polysemy -import Wire.API.Conversation (GroupConvType (MeetingConversation), cnvmGroupConvType) -import Wire.ConversationStore qualified as ConvStore -import Wire.MeetingsStore qualified as Store -import Wire.MeetingsSubsystemCleaning -import Wire.StoredConversation (StoredConversation (..)) - -interpretMeetingsSubsystemCleaning :: - ( Member Store.MeetingsStore r, - Member ConvStore.ConversationStore r - ) => - InterpreterFor MeetingsSubsystemCleaning r -interpretMeetingsSubsystemCleaning = interpret $ \case - CleanupOldMeetings cutoffTime batchSize -> - cleanupOldMeetingsImpl cutoffTime batchSize - -cleanupOldMeetingsImpl :: - ( Member Store.MeetingsStore r, - Member ConvStore.ConversationStore r - ) => - UTCTime -> - Int -> - Sem r Int64 -cleanupOldMeetingsImpl cutoffTime batchSize = do - oldMeetings <- Store.getOldMeetings cutoffTime batchSize - if null oldMeetings - then pure 0 - else do - -- 2. Delete associated conversations first (before meetings) - -- This ensures proper cleanup: conversation data should be removed before meeting records - -- We only delete conversations that are meeting conversations (GroupConvType = MeetingConversation) - for_ oldMeetings $ \meeting -> do - maybeConv <- ConvStore.getConversation meeting.conversationId - case maybeConv of - Just conv - | conv.metadata.cnvmGroupConvType == Just MeetingConversation, - conv.id_ == meeting.conversationId -> - ConvStore.deleteConversation meeting.conversationId - _ -> pure () - - -- 3. Now delete the meeting records from the database - Store.deleteMeetingBatch $ map (\Store.StoredMeeting {id = mid} -> mid) oldMeetings diff --git a/libs/wire-subsystems/test/unit/Wire/MeetingsSubsystem/InterpreterSpec.hs b/libs/wire-subsystems/test/unit/Wire/MeetingsSubsystem/InterpreterSpec.hs index 8bb558b8998..c3ed6170d08 100644 --- a/libs/wire-subsystems/test/unit/Wire/MeetingsSubsystem/InterpreterSpec.hs +++ b/libs/wire-subsystems/test/unit/Wire/MeetingsSubsystem/InterpreterSpec.hs @@ -32,6 +32,7 @@ import Data.Time.Clock import Imports import Polysemy import Polysemy.Error +import Polysemy.Input import Polysemy.State import System.Random (StdGen, mkStdGen) import Test.Hspec @@ -70,6 +71,7 @@ type TestStack = GalleyAPIAccess, Now, State UTCTime, + Input (Local ()), Random, State StdGen, ErrorS 'TeamMemberNotFound, @@ -107,6 +109,7 @@ runTestStack now gen teams configs = . runError @(Tagged 'TeamMemberNotFound ()) . evalState gen . randomToStatefulStdGen + . runInputConst (toLocalUnsafe (Domain "my-domain") ()) . evalState now . interpretNowAsState . miniGalleyAPIAccess teams configs diff --git a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/ConversationSubsystem.hs b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/ConversationSubsystem.hs index 27457d83433..38bd471efe6 100644 --- a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/ConversationSubsystem.hs +++ b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/ConversationSubsystem.hs @@ -84,6 +84,10 @@ inMemoryConversationSubsystemInterpreter = interpretH $ \case modify @(Map ConvId StoredConversation) (Map.delete (tUnqualified lcnv)) modify @ConversationMembers (Map.delete (tUnqualified lcnv)) pureT Unchanged + InternalDeleteLocalConversation lcnv -> do + modify @(Map ConvId StoredConversation) (Map.delete (tUnqualified lcnv)) + modify @ConversationMembers (Map.delete (tUnqualified lcnv)) + pureT () GetConversationIds _lusr _range _pagingState -> do pureT $ MultiTablePaging.MultiTablePage [] False (Public.ConversationPagingState MultiTablePaging.PagingLocals Nothing) GetConversations cids -> do diff --git a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs index d53d37e0677..385eac0c791 100644 --- a/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs +++ b/libs/wire-subsystems/test/unit/Wire/MockInterpreters/MeetingsStore.hs @@ -109,9 +109,3 @@ inMemoryMeetingsStoreInterpreter = interpret $ \case . List.sortOn (.endTime) . filter (\sm -> sm.endTime < cutoffTime) . Map.elems - DeleteMeetingBatch meetingIds -> do - let deleteOne mid = do - exists <- gets (Map.member mid) - when exists $ modify (Map.delete mid) - pure exists - fromIntegral . length <$> traverse deleteOne meetingIds diff --git a/libs/wire-subsystems/wire-subsystems.cabal b/libs/wire-subsystems/wire-subsystems.cabal index abc73e6d7ad..0f4c739c004 100644 --- a/libs/wire-subsystems/wire-subsystems.cabal +++ b/libs/wire-subsystems/wire-subsystems.cabal @@ -373,8 +373,6 @@ library Wire.MeetingsStore.Postgres Wire.MeetingsSubsystem Wire.MeetingsSubsystem.Interpreter - Wire.MeetingsSubsystemCleaning - Wire.MeetingsSubsystemCleaning.Interpreter Wire.Migration Wire.MigrationLock Wire.NotificationSubsystem diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal index 927e09d2bc4..a3943555eb7 100644 --- a/services/background-worker/background-worker.cabal +++ b/services/background-worker/background-worker.cabal @@ -21,6 +21,7 @@ library Wire.BackgroundWorker.Options Wire.BackgroundWorker.Util Wire.DeadUserNotificationWatcher + Wire.Effects Wire.MeetingsCleanupWorker Wire.PostgresMigrations diff --git a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs index e8f985c8fc6..7e4bbcc0648 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs @@ -20,287 +20,22 @@ module Wire.BackgroundWorker.Jobs.Registry ) where -import Bilge qualified -import Bilge.Retry -import Cassandra (ClientState) -import Control.Monad.Catch -import Control.Retry -import Data.ByteString qualified as BS -import Data.ByteString.Lazy.Char8 qualified as LC8 -import Data.Id -import Data.Misc -import Data.Qualified -import Data.Tagged (Tagged) -import Data.Text qualified as T -import Data.Text.Lazy qualified as TL -import Galley.Types.Error (InternalError, internalErrorDescription, legalHoldServiceUnavailable) -import Hasql.Pool (UsageError) -import Hasql.Pool qualified as Hasql import Imports -import Network.HTTP.Client qualified as Http -import Network.Wai.Utilities.JSONResponse (JSONResponse (..)) -import OpenSSL.Session qualified as SSL -import Polysemy -import Polysemy.Async (asyncToIOFinal) -import Polysemy.Conc -import Polysemy.Error -import Polysemy.Input -import Polysemy.Resource (resourceToIOFinal) -import Polysemy.TinyLog qualified as P -import Ssl.Util -import System.Logger as Logger -import System.Logger.Class qualified as Log -import URI.ByteString (uriPath) import Wire.API.BackgroundJobs (Job (..)) -import Wire.API.Conversation.Config (ConversationSubsystemConfig (..)) -import Wire.API.Error (APIError (toResponse), DynError (..)) -import Wire.API.Error.Galley -import Wire.API.Federation.Error (FederationError) -import Wire.API.MLS.Keys (MLSKeysByPurpose, MLSPrivateKeys) -import Wire.API.Team.Collaborator (TeamCollaboratorsError) -import Wire.API.Team.Feature (LegalholdConfig) -import Wire.API.Team.FeatureFlags (FanoutLimit, FeatureDefaults (FeatureLegalHoldDisabledPermanently), currentFanoutLimit) -import Wire.BackendNotificationQueueAccess.RabbitMq qualified as BackendNotificationQueueAccess import Wire.BackgroundJobsPublisher.RabbitMQ (interpretBackgroundJobsPublisherRabbitMQ) import Wire.BackgroundJobsRunner (runJob) import Wire.BackgroundJobsRunner.Interpreter hiding (runJob) import Wire.BackgroundWorker.Env (AppT, Env (..)) -import Wire.BrigAPIAccess.Rpc -import Wire.ClientSubsystem.Error (ClientError) -import Wire.CodeStore.Cassandra (interpretCodeStoreToCassandra) -import Wire.CodeStore.DualWrite (interpretCodeStoreToCassandraAndPostgres) -import Wire.CodeStore.Postgres (interpretCodeStoreToPostgres) -import Wire.ConversationStore.Cassandra (MigrationError (..), interpretConversationStoreByMigration, interpretMLSCommitLockStoreToCassandra) -import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemError, GroupInfoCheckEnabled (..), IntraListing (..), interpretConversationSubsystem) +import Wire.Effects import Wire.ExternalAccess.External -import Wire.FeaturesConfigSubsystem (getAllTeamFeaturesForServer) -import Wire.FeaturesConfigSubsystem.Interpreter (runFeaturesConfigSubsystem) -import Wire.FeaturesConfigSubsystem.Types (ExposeInvitationURLsAllowlist (..)) -import Wire.FederationAPIAccess.Interpreter (FederationAPIAccessConfig (..), interpretFederationAPIAccess) -import Wire.FederationSubsystem.Interpreter (runFederationSubsystem) -import Wire.FireAndForget (interpretFireAndForget) -import Wire.GalleyAPIAccess -import Wire.GalleyAPIAccess.Rpc (interpretGalleyAPIAccessToRpc) -import Wire.GundeckAPIAccess -import Wire.HashPassword.Interpreter (runHashPassword) -import Wire.LegalHoldStore.Cassandra (interpretLegalHoldStoreToCassandra) -import Wire.LegalHoldStore.Env (LegalHoldEnv (..)) -import Wire.NotificationSubsystem.Interpreter -import Wire.Options.Galley (GuestLinkTTLSeconds) -import Wire.ParseException -import Wire.PostgresMigrationOpts -import Wire.ProposalStore.Cassandra (interpretProposalStoreToCassandra) -import Wire.RateLimit (RateLimitExceeded) -import Wire.RateLimit.Interpreter (interpretRateLimit) -import Wire.Rpc -import Wire.RpcException -import Wire.Sem.Concurrency (ConcurrencySafety (Unsafe)) -import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency) -import Wire.Sem.Delay (runDelay) -import Wire.Sem.Logger (mapLogger) -import Wire.Sem.Logger.TinyLog (loggerToTinyLog) -import Wire.Sem.Now.IO (nowToIO) -import Wire.Sem.Random.IO (randomToIO) -import Wire.ServiceStore.Cassandra (interpretServiceStoreToCassandra) -import Wire.SparAPIAccess.Rpc (interpretSparAPIAccessToRpc) -import Wire.TeamCollaboratorsStore.Postgres (interpretTeamCollaboratorsStoreToPostgres) -import Wire.TeamCollaboratorsSubsystem.Interpreter (interpretTeamCollaboratorsSubsystem) -import Wire.TeamFeatureStore.Cassandra (interpretTeamFeatureStoreToCassandra) -import Wire.TeamFeatureStore.Error (TeamFeatureStoreError) -import Wire.TeamJournal.Aws (interpretTeamJournal) -import Wire.TeamStore.Cassandra (interpretTeamStoreToCassandra) -import Wire.TeamSubsystem.Interpreter (TeamSubsystemConfig (..), interpretTeamSubsystem) -import Wire.UserClientIndexStore.Cassandra -import Wire.UserGroupStore.Postgres (interpretUserGroupStoreToPostgres) - --- Helper functions for LegalHoldEnv --- Adapted from Galley.External.LegalHoldService.Internal -makeVerifiedRequestWithManagerIO :: - Logger -> - Http.Manager -> - ([Fingerprint Rsa] -> SSL.SSL -> IO ()) -> - Fingerprint Rsa -> - HttpsUrl -> - (Http.Request -> Http.Request) -> - IO (Http.Response LC8.ByteString) -makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr (HttpsUrl url) reqBuilder = do - let verified = verifyFingerprints [fpr] - extHandleAll (errHandler logger) $ do - recovering legalHoldRetryPolicy httpHandlers $ - const $ - withVerifiedSslConnection verified mgr (reqBuilderMods . reqBuilder) $ - \req -> - Http.httpLbs req mgr - where - reqBuilderMods = - maybe id Bilge.host (Bilge.extHost url) - . Bilge.port (fromMaybe 443 (Bilge.extPort url)) - . Bilge.secure - . prependPath (uriPath url) - errHandler logger' e = do - Logger.info logger' $ Log.msg ("error making request to legalhold service: " <> displayException e) - throwM (legalHoldServiceUnavailable e) - prependPath :: BS.ByteString -> Http.Request -> Http.Request - prependPath pth req = req {Http.path = pth `BS.append` Http.path req} -- Modified to use BS.append - -- () from System.FilePath, but here we just need to append. - -- Assuming a simple append is sufficient for URI path segments for this context. - legalHoldRetryPolicy :: RetryPolicy - legalHoldRetryPolicy = limitRetries 3 <> exponentialBackoff 100000 - extHandleAll :: (MonadCatch m) => (SomeException -> m a) -> m a -> m a - extHandleAll f ma = - catches - ma - [ Handler $ \(ex :: SomeAsyncException) -> throwM ex, - Handler $ \(ex :: SomeException) -> f ex - ] - -makeVerifiedRequestIO :: Logger -> ExtEnv -> Fingerprint Rsa -> HttpsUrl -> (Http.Request -> Http.Request) -> IO (Http.Response LC8.ByteString) -makeVerifiedRequestIO logger extEnv fpr url reqBuilder = do - let (mgr, verifyFingerprints) = extGetManager extEnv - makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr url reqBuilder - -makeVerifiedRequestFreshManagerIO :: Logger -> Fingerprint Rsa -> HttpsUrl -> (Http.Request -> Http.Request) -> IO (Http.Response LC8.ByteString) -makeVerifiedRequestFreshManagerIO logger fpr url reqBuilder = do - let disableTlsV1 = True - ExtEnv (mgr, verifyFingerprints) <- initExtEnv disableTlsV1 - makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr url reqBuilder dispatchJob :: Job -> AppT IO (Either Text ()) dispatchJob job = do env <- ask @Env let disableTlsV1 = True extEnv <- liftIO (initExtEnv disableTlsV1) - let mergeErrors = either (Left . T.pack . show) id - liftIO $ fmap mergeErrors $ runInterpreters env extEnv $ runJob job - where - convStoreInterpreter env = - interpretConversationStoreByMigration env.postgresMigration.conversation env.cassandraGalley - runInterpreters env extEnv = do - let federationAPIAccessConfig = - FederationAPIAccessConfig - { ownDomain = env.federationDomain, - federatorEndpoint = Just env.federatorInternal, - http2Manager = env.http2Manager, - requestId = job.requestId - } - teamSubsystemConfig = TeamSubsystemConfig {concurrentDeletionEvents = 1} - legalHoldEnv = - let makeReq fpr url rb = makeVerifiedRequestIO env.logger extEnv fpr url rb - makeReqFresh fpr url rb = makeVerifiedRequestFreshManagerIO env.logger fpr url rb - in LegalHoldEnv {makeVerifiedRequest = makeReq, makeVerifiedRequestFreshManager = makeReqFresh} - convCodesStoreInterpreter = - case env.postgresMigration.conversationCodes of - CassandraStorage -> interpretCodeStoreToCassandra - MigrationToPostgresql -> interpretCodeStoreToCassandraAndPostgres - PostgresqlStorage -> interpretCodeStoreToPostgres - runFinal @IO - . unsafelyPerformConcurrency @_ @'Unsafe - . embedToFinal @IO - . asyncToIOFinal - . interpretRace - . runDelay - . resourceToIOFinal - . runError @MigrationError - . runError - . mapError @DynError (.eMessage) - . mapError @JSONResponse (T.pack . show . (.value)) - . mapError @ConversationSubsystemError toResponse - . mapError @RpcException (T.pack . displayException) - . mapError @ClientError (T.pack . displayException) - . mapError @FederationError (T.pack . displayException) - . mapError @UsageError (T.pack . show) - . mapError @ParseException (T.pack . displayException) - . mapError @MigrationError (T.pack . show) - . mapError @InternalError (TL.toStrict . internalErrorDescription) - . mapError @UnreachableBackends (T.pack . show) - . mapError @TeamCollaboratorsError (const ("Team collaborators error" :: Text)) - . mapError @TeamFeatureStoreError (const ("Team feature store error" :: Text)) - . mapError @(Tagged 'NotATeamMember ()) (const ("Not a team member" :: Text)) - . mapError @(Tagged 'ConvAccessDenied ()) (const ("Conversation access denied" :: Text)) - . mapError @(Tagged 'TeamNotFound ()) (const ("Team not found" :: Text)) - . mapError @(Tagged 'TeamMemberNotFound ()) (const ("Team member not found" :: Text)) - . mapError @(Tagged 'AccessDenied ()) (const ("Access denied" :: Text)) - . mapError @NonFederatingBackends (const ("Non federating backends" :: Text)) - . mapError @UnreachableBackendsLegacy (const ("Unreachable backends legacy" :: Text)) - . mapError @RateLimitExceeded (const ("Rate limit exceeded" :: Text)) - . interpretTinyLog env job.requestId job.jobId - . runInputConst @Hasql.Pool env.hasqlPool - . runInputConst @(Local ()) (toLocalUnsafe env.federationDomain ()) - . runInputConst @(FeatureDefaults LegalholdConfig) FeatureLegalHoldDisabledPermanently - . runInputConst @ClientState env.cassandraGalley - . runInputConst @LegalHoldEnv legalHoldEnv - . runInputConst @ExposeInvitationURLsAllowlist (ExposeInvitationURLsAllowlist $ fromMaybe [] env.exposeInvitationURLsTeamAllowlist) - . runInputConst @(Either HttpsUrl (Map Text HttpsUrl)) env.convCodeURI - . runInputConst @IntraListing (IntraListing env.intraListing) - . runInputConst @(Maybe GroupInfoCheckEnabled) (GroupInfoCheckEnabled <$> env.checkGroupInfo) - . runInputConst @(Maybe GuestLinkTTLSeconds) env.guestLinkTTLSeconds - . runInputConst @FanoutLimit (currentFanoutLimit env.maxTeamSize env.maxFanoutSize) - . interpretMLSCommitLockStoreToCassandra env.cassandraGalley - . interpretProposalStoreToCassandra - . interpretServiceStoreToCassandra env.cassandraBrig - . interpretUserGroupStoreToPostgres - . interpretTeamFeatureStoreToCassandra - . interpretUserClientIndexStoreToCassandra env.cassandraGalley - . convStoreInterpreter env - . interpretTeamStoreToCassandra - . interpretTeamCollaboratorsStoreToPostgres - . interpretLegalHoldStoreToCassandra FeatureLegalHoldDisabledPermanently - . interpretTeamJournal Nothing - . interpretBackgroundJobsPublisherRabbitMQ job.requestId env.amqpJobsPublisherChannel - . nowToIO - . randomToIO - . interpretFireAndForget - . BackendNotificationQueueAccess.interpretBackendNotificationQueueAccess (Just $ backendQueueEnv env) - . runRpcWithHttp env.httpManager job.requestId - . runGundeckAPIAccess env.gundeckEndpoint - -- FUTUREWORK: Currently the brig access effect is needed for the interpreter of ExternalAccess. - -- At the time of implementation the only function used from ExternalAccess is deliverAsync, which will not call brig access. - -- However, to prevent the background worker to require HTTP access to brig, we should consider refactoring this at some point. - . interpretBrigAccess env.brigEndpoint - . interpretGalleyAPIAccessToRpc mempty env.galleyEndpoint - . runInputSem getConversationSubsystemConfig - . runInputSem @(Maybe (MLSKeysByPurpose MLSPrivateKeys)) (inputs @ConversationSubsystemConfig (.mlsKeys)) - . runInputSem getConfiguredFeatureFlags - . runHashPassword env.passwordHashingOptions - . interpretRateLimit env.passwordHashingRateLimitEnv - . convCodesStoreInterpreter - . interpretExternalAccess extEnv - . interpretSparAPIAccessToRpc env.sparEndpoint - . runNotificationSubsystemGundeck (defaultNotificationSubsystemConfig job.requestId) - . interpretFederationAPIAccess federationAPIAccessConfig - . interpretTeamSubsystem teamSubsystemConfig - . ( \m -> do - p <- inputs @ConversationSubsystemConfig (.federationProtocols) - runFederationSubsystem p m - ) - . runFeaturesConfigSubsystem - . runInputSem getAllTeamFeaturesForServer - . interpretTeamCollaboratorsSubsystem - . interpretConversationSubsystem - . interpretBackgroundJobsRunner - - getConversationSubsystemConfig :: - (Member GalleyAPIAccess r) => - Sem r ConversationSubsystemConfig - getConversationSubsystemConfig = getConversationConfig - - backendQueueEnv :: Env -> BackendNotificationQueueAccess.Env - backendQueueEnv env = - BackendNotificationQueueAccess.Env - { channelMVar = env.amqpBackendNotificationsChannel, - logger = env.logger, - local = toLocalUnsafe env.federationDomain (), - requestId = job.requestId - } - -interpretTinyLog :: - (Member (Embed IO) r) => - Env -> - RequestId -> - JobId -> - Sem (P.TinyLog ': r) a -> - Sem r a -interpretTinyLog e reqId jobId = - loggerToTinyLog e.logger - . mapLogger ((field "request" (unRequestId reqId) . field "job" (idToText jobId)) .) - . raiseUnder @P.TinyLog + liftIO + $ runBackgroundWorkerEffects env extEnv job.requestId (Just job.jobId) + . interpretBackgroundJobsPublisherRabbitMQ job.requestId env.amqpJobsPublisherChannel + . interpretBackgroundJobsRunner + $ runJob job diff --git a/services/background-worker/src/Wire/Effects.hs b/services/background-worker/src/Wire/Effects.hs new file mode 100644 index 00000000000..5f474cd1078 --- /dev/null +++ b/services/background-worker/src/Wire/Effects.hs @@ -0,0 +1,389 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2025 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.Effects + ( runBackgroundWorkerEffects, + ) +where + +import Bilge qualified +import Bilge.Retry +import Cassandra (ClientState) +import Control.Monad.Catch +import Control.Retry +import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as LBS +import Data.Id +import Data.Misc +import Data.Qualified +import Data.Tagged (Tagged) +import Data.Text qualified as T +import Data.Text.Lazy qualified as TL +import Galley.Types.Error (InternalError, internalErrorDescription, legalHoldServiceUnavailable) +import Hasql.Pool (UsageError) +import Hasql.Pool qualified as Hasql +import Imports +import Network.HTTP.Client qualified as Http +import Network.Wai.Utilities.JSONResponse (JSONResponse (..)) +import OpenSSL.Session qualified as SSL +import Polysemy +import Polysemy.Async (Async, asyncToIOFinal) +import Polysemy.Conc +import Polysemy.Error +import Polysemy.Input +import Polysemy.Resource (Resource, resourceToIOFinal) +import Polysemy.TinyLog qualified as P +import Ssl.Util +import System.Logger as Logger +import System.Logger qualified as Log +import URI.ByteString (uriPath) +import Wire.API.Conversation.Config (ConversationSubsystemConfig (..)) +import Wire.API.Error (APIError (toResponse), DynError (..)) +import Wire.API.Error.Galley +import Wire.API.Federation.Client (FederatorClient) +import Wire.API.Federation.Error (FederationError) +import Wire.API.MLS.Keys (MLSKeysByPurpose, MLSPrivateKeys) +import Wire.API.Team.Collaborator (TeamCollaboratorsError) +import Wire.API.Team.Feature (AllTeamFeatures, LegalholdConfig) +import Wire.API.Team.FeatureFlags (FanoutLimit, FeatureDefaults (FeatureLegalHoldDisabledPermanently), FeatureFlags, currentFanoutLimit) +import Wire.BackendNotificationQueueAccess (BackendNotificationQueueAccess) +import Wire.BackendNotificationQueueAccess.RabbitMq qualified as BackendNotificationQueueAccess +import Wire.BackgroundWorker.Env (Env (..)) +import Wire.BrigAPIAccess (BrigAPIAccess) +import Wire.BrigAPIAccess.Rpc +import Wire.ClientSubsystem.Error (ClientError) +import Wire.CodeStore (CodeStore) +import Wire.CodeStore.Cassandra (interpretCodeStoreToCassandra) +import Wire.CodeStore.DualWrite (interpretCodeStoreToCassandraAndPostgres) +import Wire.CodeStore.Postgres (interpretCodeStoreToPostgres) +import Wire.ConversationStore (ConversationStore, MLSCommitLockStore) +import Wire.ConversationStore.Cassandra (MigrationError (..), interpretConversationStoreByMigration, interpretMLSCommitLockStoreToCassandra) +import Wire.ConversationSubsystem (ConversationSubsystem) +import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemError, GroupInfoCheckEnabled (..), IntraListing (..), interpretConversationSubsystem) +import Wire.ExternalAccess (ExternalAccess) +import Wire.ExternalAccess.External +import Wire.FeaturesConfigSubsystem (FeaturesConfigSubsystem, getAllTeamFeaturesForServer) +import Wire.FeaturesConfigSubsystem.Interpreter (runFeaturesConfigSubsystem) +import Wire.FeaturesConfigSubsystem.Types (ExposeInvitationURLsAllowlist (..)) +import Wire.FederationAPIAccess (FederationAPIAccess) +import Wire.FederationAPIAccess.Interpreter (FederationAPIAccessConfig (..), interpretFederationAPIAccess) +import Wire.FederationSubsystem (FederationSubsystem) +import Wire.FederationSubsystem.Interpreter (runFederationSubsystem) +import Wire.FireAndForget (FireAndForget, interpretFireAndForget) +import Wire.GalleyAPIAccess +import Wire.GalleyAPIAccess.Rpc (interpretGalleyAPIAccessToRpc) +import Wire.GundeckAPIAccess +import Wire.HashPassword (HashPassword) +import Wire.HashPassword.Interpreter (runHashPassword) +import Wire.LegalHoldStore (LegalHoldStore) +import Wire.LegalHoldStore.Cassandra (interpretLegalHoldStoreToCassandra) +import Wire.LegalHoldStore.Env (LegalHoldEnv (..)) +import Wire.NotificationSubsystem (NotificationSubsystem) +import Wire.NotificationSubsystem.Interpreter +import Wire.Options.Galley (GuestLinkTTLSeconds) +import Wire.ParseException +import Wire.PostgresMigrationOpts +import Wire.ProposalStore (ProposalStore) +import Wire.ProposalStore.Cassandra (interpretProposalStoreToCassandra) +import Wire.RateLimit (RateLimit, RateLimitExceeded) +import Wire.RateLimit.Interpreter (interpretRateLimit) +import Wire.Rpc +import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (Unsafe)) +import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency) +import Wire.Sem.Delay (Delay, runDelay) +import Wire.Sem.Logger (mapLogger) +import Wire.Sem.Logger.TinyLog (loggerToTinyLog) +import Wire.Sem.Now (Now) +import Wire.Sem.Now.IO (nowToIO) +import Wire.Sem.Random (Random) +import Wire.Sem.Random.IO (randomToIO) +import Wire.ServiceStore (ServiceStore) +import Wire.ServiceStore.Cassandra (interpretServiceStoreToCassandra) +import Wire.SparAPIAccess (SparAPIAccess) +import Wire.SparAPIAccess.Rpc (interpretSparAPIAccessToRpc) +import Wire.TeamCollaboratorsStore (TeamCollaboratorsStore) +import Wire.TeamCollaboratorsStore.Postgres (interpretTeamCollaboratorsStoreToPostgres) +import Wire.TeamCollaboratorsSubsystem (TeamCollaboratorsSubsystem) +import Wire.TeamCollaboratorsSubsystem.Interpreter (interpretTeamCollaboratorsSubsystem) +import Wire.TeamFeatureStore (TeamFeatureStore) +import Wire.TeamFeatureStore.Cassandra (interpretTeamFeatureStoreToCassandra) +import Wire.TeamFeatureStore.Error (TeamFeatureStoreError) +import Wire.TeamJournal (TeamJournal) +import Wire.TeamJournal.Aws (interpretTeamJournal) +import Wire.TeamStore (TeamStore) +import Wire.TeamStore.Cassandra (interpretTeamStoreToCassandra) +import Wire.TeamSubsystem (TeamSubsystem) +import Wire.TeamSubsystem.Interpreter (TeamSubsystemConfig (..), interpretTeamSubsystem) +import Wire.UserClientIndexStore (UserClientIndexStore) +import Wire.UserClientIndexStore.Cassandra +import Wire.UserGroupStore (UserGroupStore) +import Wire.UserGroupStore.Postgres (interpretUserGroupStoreToPostgres) + +makeVerifiedRequestWithManagerIO :: + Logger -> + Http.Manager -> + ([Fingerprint Rsa] -> SSL.SSL -> IO ()) -> + Fingerprint Rsa -> + HttpsUrl -> + (Http.Request -> Http.Request) -> + IO (Http.Response LBS.ByteString) +makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr (HttpsUrl url) reqBuilder = do + let verified = verifyFingerprints [fpr] + extHandleAll (errHandler logger) $ do + recovering legalHoldRetryPolicy httpHandlers $ + const $ + withVerifiedSslConnection verified mgr (reqBuilderMods . reqBuilder) $ + \req -> + Http.httpLbs req mgr + where + reqBuilderMods = + maybe id Bilge.host (Bilge.extHost url) + . Bilge.port (fromMaybe 443 (Bilge.extPort url)) + . Bilge.secure + . prependPath (uriPath url) + errHandler logger' e = do + Logger.info logger' $ Log.msg ("error making request to legalhold service: " <> displayException e) + throwM (legalHoldServiceUnavailable e) + prependPath :: BS.ByteString -> Http.Request -> Http.Request + prependPath pth req = req {Http.path = pth `BS.append` Http.path req} + legalHoldRetryPolicy :: RetryPolicy + legalHoldRetryPolicy = limitRetries 3 <> exponentialBackoff 100000 + extHandleAll :: (MonadCatch m) => (SomeException -> m a) -> m a -> m a + extHandleAll f ma = + catches + ma + [ Handler $ \(ex :: SomeAsyncException) -> throwM ex, + Handler $ \(ex :: SomeException) -> f ex + ] + +makeVerifiedRequestIO :: Logger -> ExtEnv -> Fingerprint Rsa -> HttpsUrl -> (Http.Request -> Http.Request) -> IO (Http.Response LBS.ByteString) +makeVerifiedRequestIO logger extEnv fpr url reqBuilder = do + let (mgr, verifyFingerprints) = extGetManager extEnv + makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr url reqBuilder + +makeVerifiedRequestFreshManagerIO :: Logger -> Fingerprint Rsa -> HttpsUrl -> (Http.Request -> Http.Request) -> IO (Http.Response LBS.ByteString) +makeVerifiedRequestFreshManagerIO logger fpr url reqBuilder = do + let disableTlsV1 = True + ExtEnv (mgr, verifyFingerprints) <- initExtEnv disableTlsV1 + makeVerifiedRequestWithManagerIO logger mgr verifyFingerprints fpr url reqBuilder + +type BackgroundWorkerEffects = + '[ ConversationSubsystem, + TeamCollaboratorsSubsystem, + Input AllTeamFeatures, + FeaturesConfigSubsystem, + FederationSubsystem, + TeamSubsystem, + FederationAPIAccess FederatorClient, + NotificationSubsystem, + SparAPIAccess, + ExternalAccess, + RateLimit, + HashPassword, + Input FeatureFlags, + Input (Maybe (MLSKeysByPurpose MLSPrivateKeys)), + Input ConversationSubsystemConfig, + GalleyAPIAccess, + BrigAPIAccess, + GundeckAPIAccess, + Rpc, + CodeStore, + BackendNotificationQueueAccess, + FireAndForget, + Random, + Now, + TeamJournal, + LegalHoldStore, + TeamCollaboratorsStore, + TeamStore, + ConversationStore, + UserClientIndexStore, + TeamFeatureStore, + UserGroupStore, + ServiceStore, + ProposalStore, + MLSCommitLockStore, + Input FanoutLimit, + Input (Maybe GuestLinkTTLSeconds), + Input (Maybe GroupInfoCheckEnabled), + Input IntraListing, + Input (Either HttpsUrl (Map Text HttpsUrl)), + Input ExposeInvitationURLsAllowlist, + Input LegalHoldEnv, + Input ClientState, + Input (FeatureDefaults LegalholdConfig), + Input (Local ()), + Input Hasql.Pool, + P.TinyLog, + Error RateLimitExceeded, + Error UnreachableBackendsLegacy, + Error NonFederatingBackends, + Error (Tagged AccessDenied ()), + Error (Tagged TeamMemberNotFound ()), + Error (Tagged TeamNotFound ()), + Error (Tagged ConvAccessDenied ()), + Error (Tagged NotATeamMember ()), + Error TeamFeatureStoreError, + Error TeamCollaboratorsError, + Error UnreachableBackends, + Error InternalError, + Error MigrationError, + Error ParseException, + Error UsageError, + Error FederationError, + Error ClientError, + Error ConversationSubsystemError, + Error JSONResponse, + Error DynError, + Error Text, + Resource, + Delay, + Race, + Async, + Embed IO, + Concurrency Unsafe, + Final IO + ] + +runBackgroundWorkerEffects :: + Env -> + ExtEnv -> + RequestId -> + Maybe JobId -> + Sem BackgroundWorkerEffects a -> + IO (Either Text a) +runBackgroundWorkerEffects env extEnv requestId mJobId = + runFinal @IO + . unsafelyPerformConcurrency @_ @'Unsafe + . embedToFinal @IO + . asyncToIOFinal + . interpretRace + . runDelay + . resourceToIOFinal + . runError + . mapError @DynError (.eMessage) + . mapError @JSONResponse (T.pack . show . (.value)) + . mapError @ConversationSubsystemError toResponse + . mapError @ClientError (T.pack . displayException) + . mapError @FederationError (T.pack . displayException) + . mapError @UsageError (T.pack . show) + . mapError @ParseException (T.pack . displayException) + . mapError @MigrationError (T.pack . show) + . mapError @InternalError (TL.toStrict . internalErrorDescription) + . mapError @UnreachableBackends (T.pack . show) + . mapError @TeamCollaboratorsError (const ("Team collaborators error" :: Text)) + . mapError @TeamFeatureStoreError (const ("Team feature store error" :: Text)) + . mapError @(Tagged 'NotATeamMember ()) (const ("Not a team member" :: Text)) + . mapError @(Tagged 'ConvAccessDenied ()) (const ("Conversation access denied" :: Text)) + . mapError @(Tagged 'TeamNotFound ()) (const ("Team not found" :: Text)) + . mapError @(Tagged 'TeamMemberNotFound ()) (const ("Team member not found" :: Text)) + . mapError @(Tagged 'AccessDenied ()) (const ("Access denied" :: Text)) + . mapError @NonFederatingBackends (const ("Non federating backends" :: Text)) + . mapError @UnreachableBackendsLegacy (const ("Unreachable backends legacy" :: Text)) + . mapError @RateLimitExceeded (const ("Rate limit exceeded" :: Text)) + . interpretTinyLog + . runInputConst @Hasql.Pool env.hasqlPool + . runInputConst @(Local ()) (toLocalUnsafe env.federationDomain ()) + . runInputConst @(FeatureDefaults LegalholdConfig) FeatureLegalHoldDisabledPermanently + . runInputConst @ClientState env.cassandraGalley + . runInputConst @LegalHoldEnv legalHoldEnv + . runInputConst @ExposeInvitationURLsAllowlist (ExposeInvitationURLsAllowlist $ fromMaybe [] env.exposeInvitationURLsTeamAllowlist) + . runInputConst @(Either HttpsUrl (Map Text HttpsUrl)) env.convCodeURI + . runInputConst @IntraListing (IntraListing env.intraListing) + . runInputConst @(Maybe GroupInfoCheckEnabled) (GroupInfoCheckEnabled <$> env.checkGroupInfo) + . runInputConst @(Maybe GuestLinkTTLSeconds) env.guestLinkTTLSeconds + . runInputConst @FanoutLimit (currentFanoutLimit env.maxTeamSize env.maxFanoutSize) + . interpretMLSCommitLockStoreToCassandra env.cassandraGalley + . interpretProposalStoreToCassandra + . interpretServiceStoreToCassandra env.cassandraBrig + . interpretUserGroupStoreToPostgres + . interpretTeamFeatureStoreToCassandra + . interpretUserClientIndexStoreToCassandra env.cassandraGalley + . interpretConversationStoreByMigration env.postgresMigration.conversation env.cassandraGalley + . interpretTeamStoreToCassandra + . interpretTeamCollaboratorsStoreToPostgres + . interpretLegalHoldStoreToCassandra FeatureLegalHoldDisabledPermanently + . interpretTeamJournal Nothing + . nowToIO + . randomToIO + . interpretFireAndForget + . BackendNotificationQueueAccess.interpretBackendNotificationQueueAccess (Just backendQueueEnv) + . convCodesStoreInterpreter + . runRpcWithHttp env.httpManager requestId + . runGundeckAPIAccess env.gundeckEndpoint + -- FUTUREWORK: Currently the brig access effect is needed for the interpreter of ExternalAccess. + -- At the time of implementation the only function used from ExternalAccess is deliverAsync, which will not call brig access. + -- However, to prevent the background worker to require HTTP access to brig, we should consider refactoring this at some point. + . interpretBrigAccess env.brigEndpoint + . interpretGalleyAPIAccessToRpc mempty env.galleyEndpoint + . runInputSem getConversationSubsystemConfig + . runInputSem @(Maybe (MLSKeysByPurpose MLSPrivateKeys)) (inputs @ConversationSubsystemConfig (.mlsKeys)) + . runInputSem getConfiguredFeatureFlags + . runHashPassword env.passwordHashingOptions + . interpretRateLimit env.passwordHashingRateLimitEnv + . interpretExternalAccess extEnv + . interpretSparAPIAccessToRpc env.sparEndpoint + . runNotificationSubsystemGundeck (defaultNotificationSubsystemConfig requestId) + . interpretFederationAPIAccess federationAPIAccessConfig + . interpretTeamSubsystem teamSubsystemConfig + . ( \m -> do + p <- inputs @ConversationSubsystemConfig (.federationProtocols) + runFederationSubsystem p m + ) + . runFeaturesConfigSubsystem + . runInputSem getAllTeamFeaturesForServer + . interpretTeamCollaboratorsSubsystem + . interpretConversationSubsystem + where + convCodesStoreInterpreter = + case env.postgresMigration.conversationCodes of + CassandraStorage -> interpretCodeStoreToCassandra + MigrationToPostgresql -> interpretCodeStoreToCassandraAndPostgres + PostgresqlStorage -> interpretCodeStoreToPostgres + legalHoldEnv = + let makeReq fpr url rb = makeVerifiedRequestIO env.logger extEnv fpr url rb + makeReqFresh fpr url rb = makeVerifiedRequestFreshManagerIO env.logger fpr url rb + in LegalHoldEnv {makeVerifiedRequest = makeReq, makeVerifiedRequestFreshManager = makeReqFresh} + teamSubsystemConfig = TeamSubsystemConfig {concurrentDeletionEvents = 1} + federationAPIAccessConfig = + FederationAPIAccessConfig + { ownDomain = env.federationDomain, + federatorEndpoint = Just env.federatorInternal, + http2Manager = env.http2Manager, + requestId = requestId + } + getConversationSubsystemConfig :: + (Member GalleyAPIAccess r) => + Sem r ConversationSubsystemConfig + getConversationSubsystemConfig = getConversationConfig + backendQueueEnv = + BackendNotificationQueueAccess.Env + { channelMVar = env.amqpBackendNotificationsChannel, + logger = env.logger, + local = toLocalUnsafe env.federationDomain (), + requestId = requestId + } + interpretTinyLog :: (Member (Embed IO) r) => Sem (P.TinyLog ': r) a -> Sem r a + interpretTinyLog = + loggerToTinyLog env.logger + . mapLogger (loggerFields .) + . raiseUnder @P.TinyLog + loggerFields :: Log.Msg -> Log.Msg + loggerFields = + case mJobId of + Nothing -> field "request" (unRequestId requestId) + Just jId -> field "request" (unRequestId requestId) . field "job" (idToText jId) diff --git a/services/background-worker/src/Wire/MeetingsCleanupWorker.hs b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs index f5e9904e21c..b3068af2152 100644 --- a/services/background-worker/src/Wire/MeetingsCleanupWorker.hs +++ b/services/background-worker/src/Wire/MeetingsCleanupWorker.hs @@ -17,35 +17,26 @@ module Wire.MeetingsCleanupWorker ( startWorker, - cleanupOldMeetings, CleanupConfig (..), ) where -import Data.Bifunctor (first) +import Data.Id (RequestId (RequestId)) +import Data.Text qualified as T import Data.Time.Clock -import Hasql.Pool (UsageError) import Imports -import Polysemy -import Polysemy.Async (asyncToIOFinal) -import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency) -import Polysemy.Conc import Polysemy.Error (runError) -import Polysemy.Input (runInputConst) -import Polysemy.Resource (resourceToIOFinal) -import Polysemy.TinyLog qualified as P import Prometheus (incCounter) import System.Cron (Job (..), forkJob) import System.Logger qualified as Log -import System.Logger.Class (Logger) import Wire.BackgroundWorker.Env (AppT, Env (..), MeetingsCleanupMetrics (..), runAppT) import Wire.BackgroundWorker.Options (MeetingsCleanupConfig (..)) import Wire.BackgroundWorker.Util (CleanupAction) -import Wire.ConversationStore.Cassandra (MigrationError (..), interpretConversationStoreByMigration) +import Wire.Effects +import Wire.ExternalAccess.External import Wire.MeetingsStore.Postgres (interpretMeetingsStoreToPostgres) -import Wire.MeetingsSubsystemCleaning qualified as Meetings -import Wire.MeetingsSubsystemCleaning.Interpreter (interpretMeetingsSubsystemCleaning) -import Wire.PostgresMigrationOpts +import Wire.MeetingsSubsystem +import Wire.MeetingsSubsystem.Interpreter data CleanupConfig = CleanupConfig { retentionHours :: Double, @@ -71,7 +62,7 @@ startWorker config = do Job config.schedule $ runAppT env $ do Log.info env.logger $ Log.msg (Log.val "Starting scheduled meetings cleanup") - cleanupOldMeetings (configFromOptions config) + runCleanupOldMeetings (configFromOptions config) liftIO $ incCounter env.meetingsCleanupMetrics.runsCounter pure $ pure () @@ -85,8 +76,8 @@ configFromOptions cfg = } -- | Main cleanup function that orchestrates the cleanup process -cleanupOldMeetings :: CleanupConfig -> AppT IO () -cleanupOldMeetings config = do +runCleanupOldMeetings :: CleanupConfig -> AppT IO () +runCleanupOldMeetings config = do env <- ask now <- liftIO getCurrentTime let cutoffTime = addUTCTime (negate $ realToFrac config.retentionHours * 3600) now @@ -132,30 +123,16 @@ cleanupLoop env cutoffTime batchSize totalSoFar = do then cleanupLoop env cutoffTime batchSize newTotal else pure newTotal -interpretTinyLog :: - (Member (Embed IO) r) => - Logger -> - Sem (P.TinyLog ': r) a -> - Sem r a -interpretTinyLog logger = interpret $ \case - P.Log lvl msg -> Log.log logger lvl msg - -- Run the meetings cleanup using the subsystem -runMeetingsCleanup :: Env -> UTCTime -> Int -> IO (Either String Int64) -runMeetingsCleanup env cutoffTime batchSize = - fmap (either (Left . show) (first show)) - . runFinal @IO - . unsafelyPerformConcurrency - . resourceToIOFinal - . runError @MigrationError - . runError @UsageError - . embedToFinal @IO - . interpretTinyLog env.logger - . interpretRace - . asyncToIOFinal - . runInputConst env.hasqlPool +runMeetingsCleanup :: Env -> UTCTime -> Int -> IO (Either Text Int64) +runMeetingsCleanup env cutoffTime batchSize = do + let disableTlsV1 = True + extEnv <- initExtEnv disableTlsV1 + let validityPeriod = realToFrac batchSize * 3600 + let mergeErrors = either (Left . T.pack . show) Right + fmap (either Left mergeErrors) + . runBackgroundWorkerEffects env extEnv (RequestId "meetings-cleanup") Nothing . interpretMeetingsStoreToPostgres - . runInputConst env.hasqlPool - . interpretConversationStoreByMigration env.postgresMigration.conversation env.cassandraGalley - . interpretMeetingsSubsystemCleaning - $ Meetings.cleanupOldMeetings cutoffTime batchSize + . runError @MeetingError + . interpretMeetingsSubsystem validityPeriod + $ Wire.MeetingsSubsystem.cleanupOldMeetings cutoffTime batchSize From 7a20b025902ec5f33bc07591c82d1f5b47f50979 Mon Sep 17 00:00:00 2001 From: Gautier DI FOLCO Date: Tue, 26 May 2026 18:01:29 +0200 Subject: [PATCH 5/7] chore: rebase --- services/background-worker/src/Wire/Effects.hs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/background-worker/src/Wire/Effects.hs b/services/background-worker/src/Wire/Effects.hs index 5f474cd1078..842ad5402d2 100644 --- a/services/background-worker/src/Wire/Effects.hs +++ b/services/background-worker/src/Wire/Effects.hs @@ -102,6 +102,7 @@ import Wire.ProposalStore.Cassandra (interpretProposalStoreToCassandra) import Wire.RateLimit (RateLimit, RateLimitExceeded) import Wire.RateLimit.Interpreter (interpretRateLimit) import Wire.Rpc +import Wire.RpcException (RpcException) import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (Unsafe)) import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency) import Wire.Sem.Delay (Delay, runDelay) @@ -246,6 +247,7 @@ type BackgroundWorkerEffects = Error UsageError, Error FederationError, Error ClientError, + Error RpcException, Error ConversationSubsystemError, Error JSONResponse, Error DynError, @@ -278,6 +280,7 @@ runBackgroundWorkerEffects env extEnv requestId mJobId = . mapError @DynError (.eMessage) . mapError @JSONResponse (T.pack . show . (.value)) . mapError @ConversationSubsystemError toResponse + . mapError @RpcException (T.pack . displayException) . mapError @ClientError (T.pack . displayException) . mapError @FederationError (T.pack . displayException) . mapError @UsageError (T.pack . show) From 4a7e23c7542615b3ad4c146b9a114a82dbefc2fa Mon Sep 17 00:00:00 2001 From: Gautier DI FOLCO Date: Thu, 28 May 2026 16:53:39 +0200 Subject: [PATCH 6/7] fix(leif): remove extra `postgresMigration` --- charts/wire-server/values.yaml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/charts/wire-server/values.yaml b/charts/wire-server/values.yaml index e84a199f1b5..a9c98b33422 100644 --- a/charts/wire-server/values.yaml +++ b/charts/wire-server/values.yaml @@ -989,12 +989,6 @@ background-worker: # Cron schedule for the cleanup job (0 * * * * = every hour) schedule: "0 * * * *" - # Controls where conversation data is stored/accessed - postgresMigration: - conversation: cassandra - conversationCodes: cassandra - teamFeatures: cassandra - secrets: {} podSecurityContext: From e96975a5e2b1c91c851c160c9c8254cde26f9cef Mon Sep 17 00:00:00 2001 From: Gautier DI FOLCO Date: Thu, 28 May 2026 16:55:47 +0200 Subject: [PATCH 7/7] fix: revert unrelated changes in `charts/wire-server/values.yaml` --- charts/wire-server/values.yaml | 96 ++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/charts/wire-server/values.yaml b/charts/wire-server/values.yaml index a9c98b33422..444594944c4 100644 --- a/charts/wire-server/values.yaml +++ b/charts/wire-server/values.yaml @@ -7,7 +7,7 @@ tags: legalhold: false - federation: false + federation: false backoffice: false mlsstats: false integration: false @@ -145,8 +145,7 @@ galley: # To disable proteus for new federated conversations: # federationProtocols: ["mls"] - featureFlags: - # see #RefConfigOptions in `/docs/reference` (https://github.com/wireapp/wire-server/) + featureFlags: # see #RefConfigOptions in `/docs/reference` (https://github.com/wireapp/wire-server/) appLock: defaults: config: @@ -341,7 +340,8 @@ galley: seccompProfile: type: RuntimeDefault tests: - config: {} + config: + {} # uploadXml: # baseUrl: s3://bucket/path/ @@ -509,8 +509,8 @@ cannon: # name: # key: - # See also the section 'Controlling the speed of websocket draining during - # cannon pod replacement' in docs/how-to/install/configuration-options.rst + # See also the section 'Controlling the speed of websocket draining during + # cannon pod replacement' in docs/how-to/install/configuration-options.rst drainOpts: # The following drains a minimum of 400 connections/second # for a total of 10000 over 25 seconds @@ -719,21 +719,21 @@ gundeck: # name: # key: - # To enable additional writes during a migration: - # redisAdditionalWrite: - # host: redis-two - # port: 6379 - # connectionMode: master - # enableTls: false - # insecureSkipVerifyTls: false - # - # # To configure custom TLS CA, please provide one of these: - # # tlsCa: - # # - # # Or refer to an existing secret (containing the CA): - # # tlsCaSecretRef: - # # name: - # # key: + # To enable additional writes during a migration: + # redisAdditionalWrite: + # host: redis-two + # port: 6379 + # connectionMode: master + # enableTls: false + # insecureSkipVerifyTls: false + # + # # To configure custom TLS CA, please provide one of these: + # # tlsCa: + # # + # # Or refer to an existing secret (containing the CA): + # # tlsCaSecretRef: + # # name: + # # key: aws: region: "eu-west-1" proxy: {} @@ -759,6 +759,7 @@ gundeck: # will be deleted and thus not delivered. # The default is 28 days. notificationTTL: 2419200 + # To enable cells notifications # cellsEventQueue: cells_events @@ -834,6 +835,7 @@ spar: # Disable one ore more API versions. Please make sure the configuration value is the same in all these charts: # brig, cannon, cargohold, galley, gundeck, proxy, spar. disabledAPIVersions: [development] + # SAML - ServiceProvider configuration # Usually, one would configure one set of options for a single domain. For # multi-ingress setups (one backend is available through multiple domains), @@ -1037,7 +1039,7 @@ brig: # tlsCaSecretRef: # name: # key: - + elasticsearch: scheme: http host: elasticsearch-client @@ -1068,7 +1070,7 @@ brig: sesEndpoint: https://email.eu-west-1.amazonaws.com sqsEndpoint: https://sqs.eu-west-1.amazonaws.com # dynamoDBEndpoint: https://dynamodb.eu-west-1.amazonaws.com - # -- If set to false, 'dynamoDBEndpoint' _must_ be set. + # -- If set to false, 'dynamoDBEndpoint' _must_ be set. randomPrekeys: true useSES: true multiSFT: @@ -1083,19 +1085,19 @@ brig: # tlsCaSecretRef: # name: # key: - - # Postgres connection settings - # - # Values are described in https://www.postgresql.org/docs/17/libpq-connect.html#LIBPQ-PARAMKEYWORDS - # To set the password via a brig secret see `secrets.pgPassword`. - # - # `additionalVolumeMounts` and `additionalVolumes` can be used to mount - # additional files (e.g. certificates) into the brig container. This way - # does not work for password files (parameter `passfile`), because - # libpq-connect requires access rights (mask 0600) for them that we cannot - # provide for random uids. - # - # Below is an example configuration we're using for our CI tests. + + # Postgres connection settings + # + # Values are described in https://www.postgresql.org/docs/17/libpq-connect.html#LIBPQ-PARAMKEYWORDS + # To set the password via a brig secret see `secrets.pgPassword`. + # + # `additionalVolumeMounts` and `additionalVolumes` can be used to mount + # additional files (e.g. certificates) into the brig container. This way + # does not work for password files (parameter `passfile`), because + # libpq-connect requires access rights (mask 0600) for them that we cannot + # provide for random uids. + # + # Below is an example configuration we're using for our CI tests. postgresql: host: postgresql # DNS name without protocol port: "5432" @@ -1106,7 +1108,7 @@ brig: acquisitionTimeout: 10s agingTimeout: 1d idlenessTimeout: 10m - + emailSMS: general: templateBranding: @@ -1201,25 +1203,25 @@ brig: maxRateLimitedKeys: 100000 # Estimated memory usage: 4 MB # setAuditLogEmailRecipient: security@wire.com setEphemeralUserCreationEnabled: true - + smtp: passwordFile: /etc/wire/brig/secrets/smtp-password.txt proxy: {} wireServerEnterprise: enabled: false - + turnStatic: v1: - turn:localhost:3478 v2: - turn:localhost:3478 - turn:localhost:3478?transport=tcp - + turn: serversSource: files # files | dns # baseDomain: turn.wire.example # Must be configured if serversSource is dns discoveryIntervalSeconds: 10 # Used only if serversSource is dns - + serviceAccount: # When setting this to 'false', either make sure that a service account named # 'brig' exists or change the 'name' field to 'default' @@ -1227,9 +1229,9 @@ brig: name: brig annotations: {} automountServiceAccountToken: true - + secrets: {} - + podSecurityContext: allowPrivilegeEscalation: false capabilities: @@ -1239,14 +1241,15 @@ brig: seccompProfile: type: RuntimeDefault tests: - config: {} + config: + {} # uploadXml: # baseUrl: s3://bucket/path/ - + secrets: # uploadXmlAwsAccessKeyId: # uploadXmlAwsSecretAccessKey: - + # These "secrets" are only used in tests and are therefore safe to be stored unencrypted providerPrivateKey: | -----BEGIN RSA PRIVATE KEY----- @@ -1308,6 +1311,7 @@ brig: hZMuK3BWD3fzkQVfW0yMwz6fWRXB483ZmekGkgndOTDoJQMdJXZxHpI3t2FcxQYj T45GXxRd18neXtuYa/OoAw9UQFDN5XfXN0g= -----END CERTIFICATE----- + # pgPassword: test: elasticsearch: