|
| 1 | +"""Live integration tests against the local FastAPI service.""" |
| 2 | + |
| 3 | +import json |
| 4 | +import os |
| 5 | +from pathlib import Path |
| 6 | +from urllib.error import HTTPError, URLError |
| 7 | +from urllib.parse import urlencode |
| 8 | +from urllib.request import Request, urlopen |
| 9 | + |
| 10 | +import pymysql |
| 11 | +import pytest |
| 12 | + |
| 13 | +pytestmark = pytest.mark.integration |
| 14 | +LOCAL_BASE_URL = "http://127.0.0.1:5000" |
| 15 | + |
| 16 | + |
| 17 | +def _api_get(path: str, params: dict | None = None, expected_status: int | None = 200) -> dict: |
| 18 | + """Submit a GET request to the local API and return parsed response data.""" |
| 19 | + query = f"?{urlencode(params or {}, doseq=True)}" if params else "" |
| 20 | + req = Request( |
| 21 | + f"{LOCAL_BASE_URL}{path}{query}", |
| 22 | + method="GET", |
| 23 | + headers={ |
| 24 | + "User-Agent": "Pytest Integration Suite/1.0 (integration-tests@example.org)", |
| 25 | + "Accept": "application/json", |
| 26 | + }, |
| 27 | + ) |
| 28 | + |
| 29 | + try: |
| 30 | + with urlopen(req, timeout=120) as res: |
| 31 | + status = res.status |
| 32 | + body_bytes = res.read() |
| 33 | + headers = dict(res.headers.items()) |
| 34 | + except HTTPError as e: |
| 35 | + status = e.code |
| 36 | + body_bytes = e.read() |
| 37 | + headers = dict(e.headers.items()) if e.headers else {} |
| 38 | + except URLError as e: |
| 39 | + pytest.fail(f"Local API is unreachable at {LOCAL_BASE_URL}: {e}") |
| 40 | + |
| 41 | + body_text = body_bytes.decode("utf-8", errors="replace") |
| 42 | + try: |
| 43 | + payload = json.loads(body_text) |
| 44 | + except json.JSONDecodeError: |
| 45 | + payload = body_text |
| 46 | + |
| 47 | + if expected_status is not None: |
| 48 | + assert status == expected_status, f"{path} expected {expected_status}, got {status}: {payload}" |
| 49 | + |
| 50 | + return {"status": status, "payload": payload, "headers": headers} |
| 51 | + |
| 52 | + |
| 53 | +def _load_env_file() -> dict[str, str]: |
| 54 | + """Load key-value pairs from local ``.env`` file if present.""" |
| 55 | + env_path = Path(__file__).resolve().parents[2] / ".env" |
| 56 | + out: dict[str, str] = {} |
| 57 | + if not env_path.exists(): |
| 58 | + return out |
| 59 | + |
| 60 | + for raw_line in env_path.read_text(encoding="utf-8").splitlines(): |
| 61 | + line = raw_line.strip() |
| 62 | + if not line or line.startswith("#") or "=" not in line: |
| 63 | + continue |
| 64 | + key, value = line.split("=", 1) |
| 65 | + normalized_value = value.strip() |
| 66 | + if ( |
| 67 | + len(normalized_value) >= 2 |
| 68 | + and normalized_value[0] == normalized_value[-1] |
| 69 | + and normalized_value[0] in {"'", '"'} |
| 70 | + ): |
| 71 | + normalized_value = normalized_value[1:-1] |
| 72 | + out[key.strip()] = normalized_value |
| 73 | + return out |
| 74 | + |
| 75 | + |
| 76 | +def _db_config() -> dict[str, str | int]: |
| 77 | + """Build DB connection config from environment with sensible defaults.""" |
| 78 | + env_file = _load_env_file() |
| 79 | + |
| 80 | + user = os.environ.get("DB_USER") or env_file.get("DB_USER", "root") |
| 81 | + password = os.environ.get("DB_PASS") |
| 82 | + if password is None: |
| 83 | + password = env_file.get("DB_PASS", "") |
| 84 | + |
| 85 | + db_name = os.environ.get("DB_NAME") |
| 86 | + if db_name is None: |
| 87 | + db_name = env_file.get("DB_NAME_LABEL") or env_file.get("DB_NAME", "label") |
| 88 | + |
| 89 | + return { |
| 90 | + "host": os.environ.get("DB_HOST") or env_file.get("DB_HOST", "127.0.0.1"), |
| 91 | + "port": int(os.environ.get("DB_PORT") or env_file.get("DB_PORT", "3306")), |
| 92 | + "user": user, |
| 93 | + "password": password, |
| 94 | + "database": db_name, |
| 95 | + } |
| 96 | + |
| 97 | + |
| 98 | +def _db_connect(): |
| 99 | + """Open a DB connection for cache verification queries.""" |
| 100 | + cfg = _db_config() |
| 101 | + return pymysql.connect( |
| 102 | + host=cfg["host"], |
| 103 | + port=cfg["port"], |
| 104 | + user=cfg["user"], |
| 105 | + password=cfg["password"], |
| 106 | + database=cfg["database"], |
| 107 | + charset="utf8mb4", |
| 108 | + autocommit=True, |
| 109 | + ) |
| 110 | + |
| 111 | + |
| 112 | +def test_docs_route_is_reachable(): |
| 113 | + """Validate docs route is reachable.""" |
| 114 | + result = _api_get("/docs", expected_status=200) |
| 115 | + content_type = result["headers"].get("Content-Type") or result["headers"].get("content-type", "") |
| 116 | + assert "text/html" in content_type |
| 117 | + |
| 118 | + |
| 119 | +def test_entity_query_json_contract_for_multi_ids(): |
| 120 | + """Validate JSON contract for multi-ID query.""" |
| 121 | + result = _api_get( |
| 122 | + "/", |
| 123 | + params={ |
| 124 | + "id": "Q42,Q2", |
| 125 | + "format": "json", |
| 126 | + "lang": "en", |
| 127 | + "pid": "P31", |
| 128 | + }, |
| 129 | + expected_status=200, |
| 130 | + ) |
| 131 | + payload = result["payload"] |
| 132 | + |
| 133 | + assert isinstance(payload, dict) |
| 134 | + assert set(payload.keys()) == {"Q42", "Q2"} |
| 135 | + assert isinstance(payload["Q42"], dict) |
| 136 | + assert payload["Q42"]["QID"] == "Q42" |
| 137 | + assert "claims" in payload["Q42"] |
| 138 | + |
| 139 | + |
| 140 | +def test_entity_query_text_contract_for_single_id(): |
| 141 | + """Validate text contract for single-ID query.""" |
| 142 | + result = _api_get( |
| 143 | + "/", |
| 144 | + params={ |
| 145 | + "id": "Q42", |
| 146 | + "format": "text", |
| 147 | + "lang": "en", |
| 148 | + "pid": "P31", |
| 149 | + }, |
| 150 | + expected_status=200, |
| 151 | + ) |
| 152 | + payload = result["payload"] |
| 153 | + |
| 154 | + assert isinstance(payload, dict) |
| 155 | + assert "Q42" in payload |
| 156 | + assert isinstance(payload["Q42"], str) |
| 157 | + assert payload["Q42"] |
| 158 | + |
| 159 | + |
| 160 | +def test_cache_writes_and_reuses_label_entries(): |
| 161 | + """Validate label cache rows are written and then reused across repeated requests.""" |
| 162 | + tracked_ids = ["P31", "Q5"] |
| 163 | + |
| 164 | + try: |
| 165 | + with _db_connect() as conn: |
| 166 | + with conn.cursor() as cur: |
| 167 | + cur.execute( |
| 168 | + "DELETE FROM labels WHERE id IN (%s, %s)", |
| 169 | + (tracked_ids[0], tracked_ids[1]), |
| 170 | + ) |
| 171 | + except pymysql.err.OperationalError as e: |
| 172 | + pytest.skip(f"Cannot connect to MariaDB for cache verification: {e}") |
| 173 | + |
| 174 | + first = _api_get( |
| 175 | + "/", |
| 176 | + params={ |
| 177 | + "id": "Q42,Q2", |
| 178 | + "format": "json", |
| 179 | + "lang": "en", |
| 180 | + "pid": "P31", |
| 181 | + }, |
| 182 | + expected_status=200, |
| 183 | + ) |
| 184 | + assert isinstance(first["payload"], dict) |
| 185 | + |
| 186 | + with _db_connect() as conn: |
| 187 | + with conn.cursor() as cur: |
| 188 | + cur.execute( |
| 189 | + "SELECT id, date_added FROM labels WHERE id IN (%s, %s)", |
| 190 | + (tracked_ids[0], tracked_ids[1]), |
| 191 | + ) |
| 192 | + rows_first = cur.fetchall() |
| 193 | + |
| 194 | + assert rows_first, "Expected label cache entries to be created after first request." |
| 195 | + first_dates = {row[0]: row[1] for row in rows_first} |
| 196 | + assert "P31" in first_dates |
| 197 | + |
| 198 | + second = _api_get( |
| 199 | + "/", |
| 200 | + params={ |
| 201 | + "id": "Q42,Q2", |
| 202 | + "format": "json", |
| 203 | + "lang": "en", |
| 204 | + "pid": "P31", |
| 205 | + }, |
| 206 | + expected_status=200, |
| 207 | + ) |
| 208 | + assert isinstance(second["payload"], dict) |
| 209 | + |
| 210 | + with _db_connect() as conn: |
| 211 | + with conn.cursor() as cur: |
| 212 | + cur.execute( |
| 213 | + "SELECT id, date_added FROM labels WHERE id IN (%s, %s)", |
| 214 | + (tracked_ids[0], tracked_ids[1]), |
| 215 | + ) |
| 216 | + rows_second = cur.fetchall() |
| 217 | + |
| 218 | + second_dates = {row[0]: row[1] for row in rows_second} |
| 219 | + assert second_dates["P31"] == first_dates["P31"] |
0 commit comments