Skip to content

Commit 43360b2

Browse files
nuclearcatJenySadadia
authored andcommitted
api/pubsub: Add automatic cleanup of stale subscriptions
Implement background task to periodically remove subscriptions that haven't been polled recently, preventing memory leaks from abandoned connections. Configurable intervals and retry logic implemented too. Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent 5b6591e commit 43360b2

2 files changed

Lines changed: 46 additions & 1 deletion

File tree

api/main.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@
6868
from .metrics import Metrics
6969
from .maintenance import purge_old_nodes
7070

71+
SUBSCRIPTION_CLEANUP_INTERVAL_MINUTES = 15 # How often to run cleanup task
72+
SUBSCRIPTION_MAX_AGE_MINUTES = 15 # Max age before stale
73+
SUBSCRIPTION_CLEANUP_RETRY_MINUTES = 1 # Retry interval if cleanup fails
74+
7175

7276
@asynccontextmanager
7377
async def lifespan(app: FastAPI): # pylint: disable=redefined-outer-name
@@ -102,6 +106,25 @@ async def pubsub_startup():
102106
pubsub = await PubSub.create()
103107

104108

109+
async def subscription_cleanup_task():
110+
"""Background task to cleanup stale subscriptions"""
111+
while True:
112+
try:
113+
await asyncio.sleep(SUBSCRIPTION_CLEANUP_INTERVAL_MINUTES * 60)
114+
cleaned = await pubsub.cleanup_stale_subscriptions(
115+
SUBSCRIPTION_MAX_AGE_MINUTES)
116+
if cleaned > 0:
117+
print(f"Cleaned up {cleaned} stale subscriptions")
118+
except (ConnectionError, OSError, RuntimeError) as e:
119+
print(f"Subscription cleanup error: {e}")
120+
await asyncio.sleep(SUBSCRIPTION_CLEANUP_RETRY_MINUTES * 60)
121+
122+
123+
async def start_background_tasks():
124+
"""Start background cleanup tasks"""
125+
asyncio.create_task(subscription_cleanup_task())
126+
127+
105128
async def create_indexes():
106129
"""Startup event handler to create database indexes"""
107130
await db.create_indexes()
@@ -1081,6 +1104,7 @@ async def purge_handler(current_user: User = Depends(get_current_superuser),
10811104
pubsub_startup,
10821105
create_indexes,
10831106
initialize_beanie,
1107+
start_background_tasks,
10841108
]
10851109
)
10861110

api/pubsub.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import asyncio
99

1010
import json
11-
from datetime import datetime
11+
from datetime import datetime, timedelta
1212
from redis import asyncio as aioredis
1313
from cloudevents.http import CloudEvent, to_json
1414
from .models import Subscription, SubscriptionStats
@@ -225,3 +225,24 @@ async def subscription_stats(self):
225225
)
226226
subscriptions.append(stats)
227227
return subscriptions
228+
229+
async def cleanup_stale_subscriptions(self, max_age_minutes=30):
230+
"""Remove subscriptions not polled recently"""
231+
cutoff = datetime.utcnow() - timedelta(minutes=max_age_minutes)
232+
stale_ids = []
233+
234+
async with self._lock:
235+
for sub_id, sub_data in self._subscriptions.items():
236+
last_poll = sub_data.get('last_poll')
237+
if last_poll and last_poll < cutoff:
238+
stale_ids.append(sub_id)
239+
240+
# Clean up stale subscriptions (internal call, no user check)
241+
for sub_id in stale_ids:
242+
try:
243+
await self.unsubscribe(sub_id)
244+
except KeyError:
245+
# Already removed
246+
pass
247+
248+
return len(stale_ids)

0 commit comments

Comments
 (0)