Skip to content

Commit df7748c

Browse files
committed
Draft a psycopg journal
1 parent ff6e151 commit df7748c

7 files changed

Lines changed: 144 additions & 3 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies = [
2929

3030
[project.optional-dependencies]
3131
django = ["django>=6.0", "django-cmd>=3.0"]
32+
psycopg = ["psycopg>=3.3"]
3233

3334
[project.entry-points.djp]
3435
queueio = "queueio.django.djp"
@@ -50,6 +51,7 @@ dev = [
5051
"ruff>=0.9.6",
5152
"textual-dev>=1.7.0",
5253
"types-pika>=1.2.0b1",
54+
"psycopg>=3.3.3",
5355
]
5456

5557
[tool.ruff.lint]

pyrightconfig.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"pythonVersion": "3.14",
33
"typeCheckingMode": "standard",
4-
"exclude": ["site", ".venv"]
4+
"exclude": ["site", ".venv"],
5+
"ignore": ["queueio/psycopg"]
56
}

queueio/freethreading_test.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ def test_all_queueio_modules_preserve_freethreading():
2222
for _, modname, _ in pkgutil.walk_packages(
2323
queueio.__path__, f"{queueio.__name__}."
2424
):
25-
if not modname.endswith("_test") and ".django" not in modname:
25+
if (
26+
not modname.endswith("_test")
27+
and ".django" not in modname
28+
and ".psycopg" not in modname
29+
):
2630
importlib.import_module(modname)
2731

2832
assert sys._is_gil_enabled() is False

queueio/psycopg/__init__.py

Whitespace-only changes.

queueio/psycopg/journal.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
"""WARNING: This module is a work in progress.
2+
3+
The PsycopgJournal implementation has not been extensively reviewed. It passes
4+
the base test suite but may contain bugs or design flaws. You shouldn't use this yet.
5+
"""
6+
7+
from collections.abc import Iterator
8+
from contextlib import suppress
9+
from threading import Lock
10+
11+
import psycopg
12+
13+
from queueio.journal import Journal
14+
15+
16+
class PsycopgJournal(Journal):
17+
@classmethod
18+
def from_uri(cls, uri: str, /):
19+
return cls(uri)
20+
21+
def __init__(self, uri: str):
22+
self.__publish_conn = psycopg.connect(uri, autocommit=True)
23+
self.__subscribe_conn = psycopg.connect(uri, autocommit=True)
24+
self.__publish_lock = Lock()
25+
self.__shutdown_lock = Lock()
26+
self.__shutdown = False
27+
28+
# Ensure schema exists
29+
self.__publish_conn.execute("""
30+
CREATE TABLE IF NOT EXISTS queueio_journal (
31+
id BIGSERIAL PRIMARY KEY,
32+
body BYTEA NOT NULL,
33+
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
34+
)
35+
""")
36+
37+
# LISTEN before SELECT to avoid race condition:
38+
# any NOTIFY after LISTEN will be buffered, and the subsequent
39+
# SELECT establishes our starting position.
40+
self.__subscribe_conn.execute("LISTEN queueio_journal")
41+
row = self.__subscribe_conn.execute(
42+
"SELECT COALESCE(MAX(id), 0) FROM queueio_journal"
43+
).fetchone()
44+
assert row is not None
45+
self.__last_id: int = row[0]
46+
47+
def publish(self, message: bytes):
48+
with self.__publish_lock, self.__publish_conn.transaction():
49+
self.__publish_conn.execute(
50+
t"INSERT INTO queueio_journal (body) VALUES ({message})"
51+
)
52+
self.__publish_conn.execute("NOTIFY queueio_journal")
53+
54+
def subscribe(self) -> Iterator[bytes]:
55+
try:
56+
while not self.__shutdown:
57+
rows = self.__subscribe_conn.execute(
58+
t"""
59+
SELECT id, body FROM queueio_journal
60+
WHERE id > {self.__last_id}
61+
ORDER BY id
62+
"""
63+
).fetchall()
64+
65+
for row in rows:
66+
self.__last_id = row[0]
67+
yield row[1]
68+
69+
if not rows and not self.__shutdown:
70+
# Wait for a notification to wake us up
71+
for _ in self.__subscribe_conn.notifies(timeout=1.0):
72+
break # Got one, go back to query loop
73+
finally:
74+
with suppress(Exception):
75+
self.__subscribe_conn.execute("UNLISTEN queueio_journal")
76+
with suppress(Exception):
77+
self.__subscribe_conn.close()
78+
79+
def shutdown(self):
80+
with self.__shutdown_lock:
81+
if self.__shutdown:
82+
return
83+
self.__shutdown = True
84+
85+
# Wake up the subscriber by sending a notification
86+
with suppress(Exception):
87+
self.__publish_conn.execute("NOTIFY queueio_journal")
88+
89+
with suppress(Exception):
90+
self.__publish_conn.close()

queueio/psycopg/journal_test.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import os
2+
3+
import pytest
4+
5+
psycopg = pytest.importorskip(
6+
"psycopg", reason="psycopg not available", exc_type=ImportError
7+
)
8+
9+
from queueio.journal_test import BaseJournalTest # noqa: E402
10+
11+
from .journal import PsycopgJournal # noqa: E402
12+
13+
PSYCOPG_TEST_URI = os.environ.get(
14+
"QUEUEIO_PSYCOPG_TEST", "postgresql://postgres@localhost/queueio_test"
15+
)
16+
17+
18+
def psycopg_available() -> bool:
19+
try:
20+
conn = psycopg.connect(PSYCOPG_TEST_URI)
21+
conn.close()
22+
return True
23+
except Exception:
24+
return False
25+
26+
27+
pytestmark = pytest.mark.skipif(
28+
not psycopg_available(), reason="PostgreSQL not available"
29+
)
30+
31+
32+
class TestPsycopgJournal(BaseJournalTest):
33+
@pytest.fixture
34+
def journal(self):
35+
journal = PsycopgJournal.from_uri(PSYCOPG_TEST_URI)
36+
37+
# Truncate to isolate tests
38+
with psycopg.connect(PSYCOPG_TEST_URI, autocommit=True) as conn:
39+
conn.execute("TRUNCATE queueio_journal")
40+
41+
yield journal
42+
journal.shutdown()

queueio/queueio.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ def __default_journal(self) -> Journal:
140140
)
141141
if not (uri.startswith("postgresql:") or uri.startswith("postgres:")):
142142
raise ValueError(f"URI scheme must be 'postgresql:', got: {uri}")
143-
raise ValueError("Journal backend 'psycopg' is not yet implemented.")
143+
from .psycopg.journal import PsycopgJournal
144+
145+
return PsycopgJournal.from_uri(uri)
144146

145147
raise ValueError(f"Unknown journal backend: {backend}")
146148

0 commit comments

Comments
 (0)