Skip to content

Commit 67dd926

Browse files
committed
Merge branch 'feature/db-sync' into develop
2 parents f849b4f + 384c013 commit 67dd926

11 files changed

Lines changed: 842 additions & 143 deletions

File tree

Makefile

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ GBL_ADMIN_REMOTE_DIR ?= /opt/data/pgdump
5858
GBL_ADMIN_DUMP_GLOB ?= pgdump-geoportal_production-*.sql.gz
5959
GBL_ADMIN_LOCAL_DIR ?= tmp
6060
GBL_ADMIN_SQL_GLOB ?= pgdump-geoportal_production-*.sql
61+
GBL_ADMIN_RETAIN_DBS ?= 2
6162
GBL_ADMIN_IMPORT_CONFLICT ?= update
63+
GBL_ADMIN_RETIRE_MISSING ?= false
6264
GBL_ADMIN_DISTRIBUTIONS_BATCH_SIZE ?= 2000
6365
KAMAL_APP_ROLE ?= web
6466
KAMAL_PYTHON ?= /opt/venv/bin/python
@@ -415,7 +417,7 @@ gbl-admin-db-unzip: ## Decompress latest GBL Admin dump
415417
gunzip -c "$$LOCAL_GZ" > "$$LOCAL_SQL" || { echo "ERROR: gunzip failed (check disk space)."; exit 1; }; \
416418
echo "Decompressed SQL: $$LOCAL_SQL"
417419

418-
# Restore production GBL Admin dump to local ParadeDB. Uses .sql if present, otherwise streams from .gz (no extra disk).
420+
# Restore production GBL Admin dump to local ParadeDB. Uses the newest local .sql or .sql.gz dump.
419421
gbl-admin-db-restore: ## Restore GBL Admin dump to local ParadeDB
420422
@echo "Restoring production GBL Admin SQL into local ParadeDB..."
421423
@if ! command -v docker >/dev/null 2>&1; then \
@@ -427,19 +429,30 @@ gbl-admin-db-restore: ## Restore GBL Admin dump to local ParadeDB
427429
echo "Start it with: docker compose up -d paradedb"; \
428430
exit 1; \
429431
fi
430-
@LOCAL_SQL=$$(ls -1t "$(GBL_ADMIN_LOCAL_DIR)"/$(GBL_ADMIN_SQL_GLOB) 2>/dev/null | head -n 1); \
431-
LOCAL_GZ=$$(ls -1t "$(GBL_ADMIN_LOCAL_DIR)"/$(GBL_ADMIN_DUMP_GLOB) 2>/dev/null | head -n 1); \
432-
if [ -n "$$LOCAL_SQL" ]; then \
433-
SOURCE="$$LOCAL_SQL"; \
434-
DUMP_DATE=$$(basename "$$LOCAL_SQL" | sed -E 's/^pgdump-geoportal_production-([0-9]{8})\.sql$$/\1/'); \
435-
elif [ -n "$$LOCAL_GZ" ]; then \
436-
SOURCE="$$LOCAL_GZ"; \
437-
DUMP_DATE=$$(basename "$$LOCAL_GZ" | sed -E 's/^pgdump-geoportal_production-([0-9]{8})\.sql\.gz$$/\1/'); \
438-
else \
432+
@SOURCE=$$( \
433+
for file in "$(GBL_ADMIN_LOCAL_DIR)"/$(GBL_ADMIN_SQL_GLOB) "$(GBL_ADMIN_LOCAL_DIR)"/$(GBL_ADMIN_DUMP_GLOB); do \
434+
[ -f "$$file" ] || continue; \
435+
MTIME=$$(stat -f %m "$$file" 2>/dev/null || stat -c %Y "$$file" 2>/dev/null); \
436+
[ -n "$$MTIME" ] || continue; \
437+
printf "%s\t%s\n" "$$MTIME" "$$file"; \
438+
done | sort -nr | head -n 1 | cut -f2- \
439+
); \
440+
if [ -z "$$SOURCE" ]; then \
439441
echo "ERROR: No dump found in $(GBL_ADMIN_LOCAL_DIR) (need $(GBL_ADMIN_SQL_GLOB) or $(GBL_ADMIN_DUMP_GLOB))."; \
440442
echo "Run 'make gbl-admin-db-download' first."; \
441443
exit 1; \
442444
fi; \
445+
case "$$SOURCE" in \
446+
*.sql) \
447+
RESTORE_MODE="sql"; \
448+
DUMP_DATE=$$(basename "$$SOURCE" | sed -E 's/^pgdump-geoportal_production-([0-9]{8})\.sql$$/\1/') ;; \
449+
*.sql.gz) \
450+
RESTORE_MODE="gz"; \
451+
DUMP_DATE=$$(basename "$$SOURCE" | sed -E 's/^pgdump-geoportal_production-([0-9]{8})\.sql\.gz$$/\1/') ;; \
452+
*) \
453+
echo "ERROR: Unrecognized dump filename: $$SOURCE"; \
454+
exit 1 ;; \
455+
esac; \
443456
if ! echo "$$DUMP_DATE" | grep -Eq '^[0-9]{8}$$'; then \
444457
echo "ERROR: Could not parse dump date from $$SOURCE."; \
445458
exit 1; \
@@ -450,12 +463,13 @@ gbl-admin-db-restore: ## Restore GBL Admin dump to local ParadeDB
450463
docker compose exec -T paradedb psql -U postgres -d postgres -c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '$$DB_NAME' AND pid <> pg_backend_pid();" || true; \
451464
docker compose exec -T paradedb psql -U postgres -d postgres -c "DROP DATABASE IF EXISTS \"$$DB_NAME\";"; \
452465
docker compose exec -T paradedb psql -U postgres -d postgres -c "CREATE DATABASE \"$$DB_NAME\" OWNER postgres;"; \
453-
if [ -n "$$LOCAL_SQL" ]; then \
454-
echo "Restoring from decompressed SQL: $$LOCAL_SQL"; \
455-
cat "$$LOCAL_SQL" | docker compose exec -T paradedb psql -U postgres -d "$$DB_NAME"; \
466+
echo "Selected newest local dump: $$SOURCE"; \
467+
if [ "$$RESTORE_MODE" = "sql" ]; then \
468+
echo "Restoring from decompressed SQL: $$SOURCE"; \
469+
cat "$$SOURCE" | docker compose exec -T paradedb psql -U postgres -d "$$DB_NAME"; \
456470
else \
457-
echo "Streaming from compressed dump: $$LOCAL_GZ (no extra disk used)"; \
458-
gunzip -c "$$LOCAL_GZ" | docker compose exec -T paradedb psql -U postgres -d "$$DB_NAME"; \
471+
echo "Streaming from compressed dump: $$SOURCE (no extra disk used)"; \
472+
gunzip -c "$$SOURCE" | docker compose exec -T paradedb psql -U postgres -d "$$DB_NAME"; \
459473
fi; \
460474
echo "Restore complete."; \
461475
echo "Dump used: $$SOURCE"; \
@@ -475,7 +489,22 @@ gbl-admin-db-restore: ## Restore GBL Admin dump to local ParadeDB
475489
-e DB_PORT="5432" \
476490
-e DB_USER="postgres" \
477491
-e DB_PASSWORD="$$DB_PASSWORD" \
478-
api bash -lc 'cd /app/backend && python db/migrations/bridge_old_production.py --create-view'
492+
api bash -lc 'cd /app/backend && python db/migrations/bridge_old_production.py --create-view'; \
493+
if [ "$(GBL_ADMIN_RETAIN_DBS)" -lt 1 ]; then \
494+
echo "ERROR: GBL_ADMIN_RETAIN_DBS must be at least 1."; \
495+
exit 1; \
496+
fi; \
497+
PRUNE_DBS=$$(docker compose exec -T paradedb psql -U postgres -d postgres -Atc "WITH ranked AS ( SELECT datname, ROW_NUMBER() OVER ( ORDER BY CASE WHEN datname = '$$DB_NAME' THEN 0 ELSE 1 END, datname DESC ) AS rn FROM pg_database WHERE datname LIKE 'geoportal_production_%' ) SELECT datname FROM ranked WHERE rn > $(GBL_ADMIN_RETAIN_DBS);"); \
498+
if [ -n "$$PRUNE_DBS" ]; then \
499+
echo "Pruning older restored GBL Admin databases (retaining $(GBL_ADMIN_RETAIN_DBS))..."; \
500+
for PRUNE_DB in $$PRUNE_DBS; do \
501+
echo "Dropping old restored DB: $$PRUNE_DB"; \
502+
docker compose exec -T paradedb psql -U postgres -d postgres -c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '$$PRUNE_DB' AND pid <> pg_backend_pid();" || true; \
503+
docker compose exec -T paradedb psql -U postgres -d postgres -c "DROP DATABASE IF EXISTS \"$$PRUNE_DB\";"; \
504+
done; \
505+
else \
506+
echo "No older restored GBL Admin databases to prune."; \
507+
fi
479508

480509
# End-to-end: download latest dump and restore (streams from .gz; no decompression to disk)
481510
gbl-admin-db-sync: gbl-admin-db-download gbl-admin-db-restore ## Download + restore GBL Admin dump
@@ -510,6 +539,12 @@ gbl-admin-db-import-resources: ## Import resources from GBL Admin bridge
510539
echo "ERROR: Could not read POSTGRES_PASSWORD from paradedb container."; \
511540
exit 1; \
512541
fi; \
542+
IMPORT_FLAGS="--conflict $(GBL_ADMIN_IMPORT_CONFLICT) --verify"; \
543+
case "$(GBL_ADMIN_RETIRE_MISSING)" in \
544+
1|true|TRUE|yes|YES) \
545+
IMPORT_FLAGS="$$IMPORT_FLAGS --retire-missing"; \
546+
echo "Missing resources will be marked retired after import." ;; \
547+
esac; \
513548
echo "OLD_DB_NAME=$$RESOLVED_OLD_DB_NAME"; \
514549
docker compose exec -T \
515550
-e OLD_DB_NAME="$$RESOLVED_OLD_DB_NAME" \
@@ -518,7 +553,7 @@ gbl-admin-db-import-resources: ## Import resources from GBL Admin bridge
518553
-e DB_PORT="5432" \
519554
-e DB_USER="postgres" \
520555
-e DB_PASSWORD="$$DB_PASSWORD" \
521-
api bash -lc 'cd /app/backend && python db/migrations/import_from_old_production.py --conflict $(GBL_ADMIN_IMPORT_CONFLICT) --verify'
556+
api bash -lc "cd /app/backend && python db/migrations/import_from_old_production.py $$IMPORT_FLAGS"
522557

523558
# Populate resource_distributions from legacy document_distributions.
524559
# Uses the latest restored geoportal_production_* DB if OLD_DB_NAME is unset.
@@ -589,7 +624,13 @@ populate-data-dictionaries: ## Populate data dictionaries from legacy tables
589624
api bash -lc 'cd /app/backend && python db/migrations/migrate_resource_data_dictionaries.py'
590625

591626
# Full GBL Admin import pipeline after restore.
592-
gbl-admin-db-import-all: gbl-admin-db-add-latest-btaa-fields gbl-admin-db-import-resources populate-distributions populate-data-dictionaries populate-relationships reindex ## Full GBL Admin import pipeline
627+
gbl-admin-db-import-all: ## Full GBL Admin import pipeline
628+
@$(MAKE) gbl-admin-db-add-latest-btaa-fields
629+
@$(MAKE) gbl-admin-db-import-resources GBL_ADMIN_RETIRE_MISSING=true
630+
@$(MAKE) populate-distributions
631+
@$(MAKE) populate-data-dictionaries
632+
@$(MAKE) populate-relationships
633+
@$(MAKE) reindex
593634
@echo "GBL Admin full import pipeline complete!"
594635

595636
# Search indexing tasks

backend/app/services/ogm_harvest/importer.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
)
1616
from app.services.ogm_harvest.aardvark_reader import extract_record_id
1717
from app.services.ogm_harvest.repository import OGMHarvestRepository
18+
from app.services.relationship_sync import (
19+
sync_relationships_for_batch,
20+
sync_relationships_for_resource_ids,
21+
)
1822
from db.database import database
1923
from db.models import resources
2024

@@ -75,6 +79,16 @@ def _parse_iso_date(value: str) -> Optional[date]:
7579
return None
7680

7781

82+
def _normalize_scalar_string(value: Any) -> Optional[str]:
83+
if value is None:
84+
return None
85+
if isinstance(value, list):
86+
values = [str(v).strip() for v in value if str(v).strip()]
87+
return values[0] if values else None
88+
normalized = str(value).strip()
89+
return normalized or None
90+
91+
7892
class OGMResourceImporter:
7993
"""
8094
Import Aardvark records into the `resources` table (UPSERT).
@@ -180,6 +194,12 @@ def _normalize_record(self, record: Dict[str, Any], repo_name: str) -> Dict[str,
180194
workflow_list = [str(v).strip() for v in out["b1g_harvestWorkflow_s"] if str(v).strip()]
181195
out["b1g_harvestWorkflow_s"] = workflow_list[0] if workflow_list else None
182196

197+
publication_state = _normalize_scalar_string(out.get("publication_state"))
198+
b1g_publication_state = _normalize_scalar_string(out.get("b1g_publication_state_s"))
199+
effective_publication_state = publication_state or b1g_publication_state or "published"
200+
out["publication_state"] = effective_publication_state
201+
out["b1g_publication_state_s"] = effective_publication_state
202+
183203
# Tag injection
184204
tags: List[str] = []
185205
existing = out.get("b1g_adminTags_sm")
@@ -301,6 +321,15 @@ def _add_error_sample(stage: str, error: Exception, rid: Optional[str] = None) -
301321
str(dist_err),
302322
)
303323

324+
try:
325+
await sync_relationships_for_resource_ids([str(rid)])
326+
except Exception as rel_err:
327+
logger.warning(
328+
"Relationship sync failed for %s; continuing. err=%s",
329+
rid,
330+
str(rel_err),
331+
)
332+
304333
# Mark seen for missing tracking
305334
await self.repo.upsert_resource_seen(
306335
ogm_repo_name=repo_name,
@@ -433,6 +462,13 @@ async def _flush_rows(rows: List[Dict[str, Any]], seen: List[Dict[str, Any]]) ->
433462
"Distribution sync failed for batch; continuing. err=%s",
434463
str(dist_err),
435464
)
465+
try:
466+
await sync_relationships_for_batch(rows)
467+
except Exception as rel_err:
468+
logger.warning(
469+
"Relationship sync failed for batch; continuing. err=%s",
470+
str(rel_err),
471+
)
436472
await self.repo.upsert_resources_seen_batch(repo_name, seen)
437473
return len(rows)
438474
except Exception as e:
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import Any, Dict, Iterable, List, Sequence, Set, Tuple
5+
6+
from sqlalchemy import delete, or_, select
7+
8+
from db.database import database
9+
from db.models import resource_relationships, resources
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
RELATIONSHIP_FAMILIES: Tuple[Tuple[str, str, str], ...] = (
15+
("dct_relation_sm", "dct:relation", "dct:relation"),
16+
("dct_isPartOf_sm", "dct:isPartOf", "dct:hasPart"),
17+
("pcdm_memberOf_sm", "pcdm:memberOf", "pcdm:hasMember"),
18+
("dct_source_sm", "dct:source", "dct:sourceOf"),
19+
("dct_isVersionOf_sm", "dct:isVersionOf", "dct:hasVersion"),
20+
("dct_replaces_sm", "dct:replaces", "dct:isReplacedBy"),
21+
("dct_isReplacedBy_sm", "dct:isReplacedBy", "dct:replaces"),
22+
)
23+
24+
ALL_RELATIONSHIP_PREDICATES: Tuple[str, ...] = tuple(
25+
sorted(
26+
{
27+
predicate
28+
for _, predicate, inverse in RELATIONSHIP_FAMILIES
29+
for predicate in (predicate, inverse)
30+
}
31+
)
32+
)
33+
34+
35+
def _normalize_resource_ids(resource_ids: Sequence[Any]) -> List[str]:
36+
normalized: List[str] = []
37+
seen: Set[str] = set()
38+
for resource_id in resource_ids:
39+
value = str(resource_id or "").strip()
40+
if not value or value in seen:
41+
continue
42+
seen.add(value)
43+
normalized.append(value)
44+
return normalized
45+
46+
47+
def _normalize_related_ids(value: Any) -> List[str]:
48+
if value is None:
49+
return []
50+
if isinstance(value, (list, tuple, set)):
51+
raw_values = value
52+
else:
53+
raw_values = [value]
54+
55+
related_ids: List[str] = []
56+
seen: Set[str] = set()
57+
for item in raw_values:
58+
candidate = str(item or "").strip()
59+
if not candidate or candidate in seen:
60+
continue
61+
seen.add(candidate)
62+
related_ids.append(candidate)
63+
return related_ids
64+
65+
66+
def _build_relationship_rows(
67+
rows_by_family: Dict[Tuple[str, str, str], Iterable[Dict[str, Any]]],
68+
tracked_ids: Sequence[Any],
69+
) -> List[Dict[str, str]]:
70+
tracked = set(_normalize_resource_ids(tracked_ids))
71+
relationships: Set[Tuple[str, str, str]] = set()
72+
73+
for (field_name, predicate, inverse_predicate), rows in rows_by_family.items():
74+
for row in rows:
75+
subject_id = str(row.get("id") or "").strip()
76+
if not subject_id:
77+
continue
78+
79+
related_ids = _normalize_related_ids(row.get(field_name))
80+
for object_id in related_ids:
81+
if object_id == subject_id:
82+
continue
83+
if subject_id not in tracked and object_id not in tracked:
84+
continue
85+
relationships.add((subject_id, predicate, object_id))
86+
relationships.add((object_id, inverse_predicate, subject_id))
87+
88+
return [
89+
{"subject_id": subject_id, "predicate": predicate, "object_id": object_id}
90+
for subject_id, predicate, object_id in sorted(relationships)
91+
]
92+
93+
94+
async def sync_relationships_for_resource_ids(resource_ids: Sequence[Any]) -> int:
95+
tracked_ids = _normalize_resource_ids(resource_ids)
96+
if not tracked_ids:
97+
return 0
98+
99+
if not database.is_connected:
100+
await database.connect()
101+
102+
await database.execute(
103+
delete(resource_relationships).where(
104+
resource_relationships.c.predicate.in_(ALL_RELATIONSHIP_PREDICATES),
105+
or_(
106+
resource_relationships.c.subject_id.in_(tracked_ids),
107+
resource_relationships.c.object_id.in_(tracked_ids),
108+
),
109+
)
110+
)
111+
112+
rows_by_family: Dict[Tuple[str, str, str], Iterable[Dict[str, Any]]] = {}
113+
for family in RELATIONSHIP_FAMILIES:
114+
field_name, _, _ = family
115+
field_column = resources.c[field_name]
116+
query = select(resources.c.id, field_column).where(
117+
or_(resources.c.id.in_(tracked_ids), field_column.op("&&")(tracked_ids))
118+
)
119+
family_rows = await database.fetch_all(query)
120+
rows_by_family[family] = [dict(row) for row in family_rows]
121+
122+
relationship_rows = _build_relationship_rows(rows_by_family, tracked_ids)
123+
if not relationship_rows:
124+
logger.info(
125+
"Relationship sync complete for %d resources: no relationship rows to insert.",
126+
len(tracked_ids),
127+
)
128+
return 0
129+
130+
await database.execute_many(query=resource_relationships.insert(), values=relationship_rows)
131+
logger.info(
132+
"Relationship sync complete for %d resources: inserted %d rows.",
133+
len(tracked_ids),
134+
len(relationship_rows),
135+
)
136+
return len(relationship_rows)
137+
138+
139+
async def sync_relationships_for_batch(resource_rows: Sequence[Dict[str, Any]]) -> int:
140+
return await sync_relationships_for_resource_ids(
141+
[row.get("id") for row in resource_rows if row.get("id")]
142+
)

0 commit comments

Comments
 (0)