feat(storage): SQLAlchemy storage provider with PostgreSQL support#1161
feat(storage): SQLAlchemy storage provider with PostgreSQL support#1161
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new SQLAlchemy-based structured storage provider and schema definitions to enable multi-backend SQL storage (SQLite + PostgreSQL), updates test scaffolding to include SQLAlchemy/PG scenarios, and adjusts generated test values to fit PostgreSQL INTEGER bounds for certain columns.
Changes:
- Introduces
SQLAlchemyStorageProvider(SQLAlchemy Core) and a shared SQLAlchemy schema (TABLE_MAP) for OpenWPM’s structured tables. - Updates storage test fixtures and adds SQLAlchemy provider tests, including optional PostgreSQL coverage gated on
pytest-postgresql. - Pins/adds PostgreSQL + psycopg2 + SQLAlchemy + pytest-postgresql dependencies; adjusts random test value ranges for
duration/response_status.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
openwpm/storage/sqlalchemy_provider.py |
New SQLAlchemy-backed StructuredStorageProvider implementation (incl. reflection fallback + coercions). |
openwpm/storage/sqlalchemy_schema.py |
SQLAlchemy Core table definitions for the structured schema, with BigInteger choices for PG overflow avoidance. |
openwpm/storage/sql_provider.py |
Rewrites SQLiteStorageProvider into a thin wrapper delegating to SQLAlchemyStorageProvider. |
test/storage/test_sqlalchemy_provider.py |
New tests for schema equivalence, all-tables insert smoke tests, and _coerce_record behavior; adds optional PG scenario test. |
test/storage/fixtures.py |
Adds sqlalchemy_sqlite scenario and gated postgresql scenario to structured provider fixtures. |
test/storage/conftest.py |
Adds conditional pytest-postgresql fixtures for discovery when the dependency is installed. |
test/storage/test_values.py |
Narrows random ranges for duration/response_status to 32-bit int bounds. |
environment.yaml |
Adds pinned deps: PostgreSQL, psycopg2, pytest-postgresql, SQLAlchemy. |
scripts/environment-unpinned.yaml |
Adds corresponding unpinned dependency entries. |
.gitignore |
Ignores Crosslink-managed local state files. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._connection.execute(sa_table.insert(), record) | ||
| self._sql_counter += 1 | ||
| except Exception as e: | ||
| self.logger.error( | ||
| "Unsupported record:\n%s\n%s\ntable=%s\n%s\n" | ||
| % (type(e), e, table, repr(record)) | ||
| ) | ||
| # On PostgreSQL, a failed statement aborts the entire transaction. | ||
| # All subsequent statements would fail with "InFailedSqlTransaction" | ||
| # until a ROLLBACK is issued. We must rollback here so that | ||
| # subsequent inserts can succeed. | ||
| try: | ||
| self._connection.rollback() | ||
| except Exception as rollback_err: | ||
| self.logger.error("Rollback failed: %s", rollback_err) |
There was a problem hiding this comment.
store_record rolls back the entire transaction on any insert error. That rollback will also undo any previous successful inserts since the last commit, causing silent data loss for a whole batch/visit when a single bad record arrives. Consider isolating each insert with a SAVEPOINT / nested transaction (e.g., begin_nested()), or committing more frequently and only rolling back the failed statement, while still recovering from PostgreSQL's aborted-transaction state.
| self._connection.execute(sa_table.insert(), record) | |
| self._sql_counter += 1 | |
| except Exception as e: | |
| self.logger.error( | |
| "Unsupported record:\n%s\n%s\ntable=%s\n%s\n" | |
| % (type(e), e, table, repr(record)) | |
| ) | |
| # On PostgreSQL, a failed statement aborts the entire transaction. | |
| # All subsequent statements would fail with "InFailedSqlTransaction" | |
| # until a ROLLBACK is issued. We must rollback here so that | |
| # subsequent inserts can succeed. | |
| try: | |
| self._connection.rollback() | |
| except Exception as rollback_err: | |
| self.logger.error("Rollback failed: %s", rollback_err) | |
| # Isolate each insert in a SAVEPOINT so a single bad record does | |
| # not roll back earlier successful inserts in the outer transaction. | |
| # This also recovers cleanly from PostgreSQL's aborted-transaction | |
| # state by rolling back only the failed statement scope. | |
| with self._connection.begin_nested(): | |
| self._connection.execute(sa_table.insert(), record) | |
| self._sql_counter += 1 | |
| except Exception as e: | |
| self.logger.error( | |
| "Unsupported record:\n%s\n%s\ntable=%s\n%s\n" | |
| % (type(e), e, table, repr(record)) | |
| ) |
| for table_name, test_data in test_table.items(): | ||
| await structured_provider.store_record( | ||
| TableName(table_name), test_data["visit_id"], test_data | ||
| ) | ||
|
|
There was a problem hiding this comment.
This test passes the full test_data dict to store_record, but test_values injects a visit_id key into every table record (including task/crawl, which don't have a visit_id column). With SQLAlchemy inserts, that will raise (extra/unknown column) and, since the provider swallows exceptions, the test can succeed without actually inserting rows. Consider mirroring StorageController behavior here by deleting visit_id when it is INVALID_VISIT_ID (and/or asserting that inserts succeeded, e.g., by validating row counts).
| Compares column names, types, NOT NULL constraints, default values, | ||
| and AUTOINCREMENT status for every table. |
There was a problem hiding this comment.
The docstring states this test compares default values, but the implementation never checks dflt_value from PRAGMA table_info. Either add an assertion for defaults (being careful about SQLite quoting differences) or adjust the docstring so it matches the actual coverage.
| Compares column names, types, NOT NULL constraints, default values, | |
| and AUTOINCREMENT status for every table. | |
| Compares column names, types, NOT NULL constraints, and AUTOINCREMENT | |
| status for every table. |
| async def init(self) -> None: | ||
| self._engine = create_engine(self.db_url, **self.engine_kwargs) | ||
| self._connection = self._engine.connect() | ||
| metadata.create_all(self._engine) | ||
|
|
There was a problem hiding this comment.
These methods are async but perform synchronous (potentially slow) DB work directly on the event loop (engine creation/connection + DDL). In the StorageController this can still block socket handling/other tasks, especially with PostgreSQL. Consider using SQLAlchemy's asyncio support (sqlalchemy.ext.asyncio) or offloading blocking DB work to a thread/executor.
Add SQLAlchemyStorageProvider backed by SQLAlchemy Core (not ORM) that supports any SQLAlchemy-compatible database. SQLiteStorageProvider is now a thin wrapper delegating to SQLAlchemyStorageProvider with a sqlite:/// URL. New files: - sqlalchemy_schema.py: All 13 tables as SQLAlchemy Table objects - sqlalchemy_provider.py: StructuredStorageProvider implementation - test_sqlalchemy_provider.py: Schema equivalence, all-tables, coercion tests Key decisions: - DATETIME columns → Text for cross-dialect compatibility - No foreign keys (test data violates them, SQLite ignores them) - PostgreSQL transaction abort recovery via rollback in store_record - Table reflection fallback for custom tables (e.g. page_links) Closes #1143
… [CL-5] Add psycopg2, pytest-postgresql, and postgresql to environment.yaml and extend test/storage/fixtures.py with a postgresql scenario backed by SQLAlchemyStorageProvider, so the existing parametrized all-tables insertion tests also run against a real PostgreSQL instance.
…tures [CL-5] - Change Integer to BigInteger for columns holding large values - Gate PostgreSQL scenario so existing SQLite tests run without pg - Add missing deps to environment-unpinned.yaml
…re discovery [CL-5]
- Gate postgresql_scenarios on HAS_PYTEST_POSTGRESQL so SQLite tests work without pytest-postgresql installed - Remove psycopg2 from deps (mutually exclusive with psycopg2-binary) - Add explanatory comment for intentional ImportError pass in conftest
psycopg2 is a C extension that requires libpq, which is incompatible with python_abi 3.14 cp314t on conda-forge. psycopg (v3) is a pure Python driver that works on all Python versions. - Replace psycopg2 with psycopg in environment files - Remove postgresql server package (pytest-postgresql manages its own) - Update SQLAlchemy dialect from postgresql+psycopg2 to postgresql+psycopg
75e9c42 to
267ac02
Compare
SQLite requires exactly INTEGER (not BIGINT) for AUTOINCREMENT primary keys. The SQLAlchemy schema used BigInteger for task_id, which rendered as BIGINT in DDL — causing OperationalError during table creation. Since SQLiteStorageProvider now delegates to SQLAlchemyStorageProvider, this crashed the StorageController process, and TaskManager blocked forever on status_queue.get(), hanging ALL test groups in CI. Changes: - task.task_id: BigInteger → Integer (matches schema.sql) - crawl.task_id: BigInteger → Integer (foreign key consistency) - test_values: cap task_id range to 2^31-1 for PostgreSQL compat - dns_responses: add missing redirect_url column from schema.sql
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1161 +/- ##
==========================================
+ Coverage 62.18% 62.73% +0.55%
==========================================
Files 40 42 +2
Lines 3898 3953 +55
==========================================
+ Hits 2424 2480 +56
+ Misses 1474 1473 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary
HAS_PYTEST_POSTGRESQLflagSupersedes #1149. Incorporates fixes from 5 rounds of adversarial review (VDD methodology).
VDD Review History
Test plan
pre-commit run --all-filespasses