Skip to content

Commit 3852b25

Browse files
committed
feat: update EEAClient to download and parse Parquet files from EEA API
chore: add pyarrow dependency for Parquet file handling
1 parent 8ad09c2 commit 3852b25

2 files changed

Lines changed: 45 additions & 89 deletions

File tree

api/clients/eea_client.py

Lines changed: 44 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,34 @@
33
import os
44
import logging
55
from typing import Any, Dict, List, Optional
6-
import csv
7-
import io
86

7+
import io
98
import requests
10-
9+
import pandas as pd
1110
from api.utils.schema import ensure_eea_env_schema
1211

1312
logger = logging.getLogger(__name__)
1413

1514

15+
1616
class EEAClient:
17-
"""Client for EEA environmental datasets (scaffold).
17+
"""Client for EEA environmental datasets (Parquet API).
1818
19-
If EEA_API_BASE is unset, returns sample data.
19+
Uses the new EEA API (https://eeadmz1-downloads-api-appservice.azurewebsites.net) to fetch Parquet files.
2020
"""
2121

22+
BASE_URL = "https://eeadmz1-downloads-api-appservice.azurewebsites.net"
23+
24+
# These dataset IDs should be updated if EEA changes them
25+
DATASET_IDS = {
26+
"renewables": "share-of-energy-from-renewable-sources", # Example, update as needed
27+
"industrial_pollution": "industrial-releases-of-pollutants-to-water", # Example, update as needed
28+
}
29+
2230
def __init__(self) -> None:
23-
self.api_base = os.getenv("EEA_API_BASE", "").rstrip("/")
24-
self.csv_url = os.getenv("EEA_CSV_URL", "").strip()
2531
self.session = requests.Session()
2632
self.session.headers.update({
27-
"Accept": "application/json",
33+
"Accept": "application/octet-stream",
2834
"User-Agent": "project-permit-api/1.0 (+https://github.com/hk-dev13)"
2935
})
3036

@@ -35,42 +41,17 @@ def create_sample_data(self) -> List[Dict[str, Any]]:
3541
{"country": "PL", "indicator": "GHG", "year": 2023, "value": 210.2, "unit": "MtCO2e"},
3642
]
3743

38-
def _load_from_csv_or_json(self, url: str) -> List[Dict[str, Any]]:
44+
45+
def _download_parquet(self, dataset_id: str) -> List[Dict[str, Any]]:
46+
"""Download and parse a Parquet file from the EEA API."""
47+
url = f"{self.BASE_URL}/api/Download/{dataset_id}"
3948
try:
40-
resp = self.session.get(url, timeout=30)
41-
ct = (resp.headers.get("Content-Type") or "").lower()
42-
# Excel support
43-
if url.lower().endswith(".xlsx") or "spreadsheetml" in ct:
44-
try:
45-
try:
46-
import pandas as pd # type: ignore
47-
except Exception as ie:
48-
logger.warning(f"pandas not available for XLSX parsing: {ie}")
49-
return []
50-
df = pd.read_excel(io.BytesIO(resp.content))
51-
return df.to_dict(orient="records") # type: ignore[return-value]
52-
except Exception as e:
53-
logger.error(f"EEA XLSX parse error: {e}")
54-
return []
55-
text = resp.text
56-
if "json" in ct or (text.lstrip().startswith("[") or text.lstrip().startswith("{")):
57-
data = resp.json()
58-
if isinstance(data, dict):
59-
# Accept common container keys
60-
for key in ("data", "items", "results", "records"):
61-
if key in data and isinstance(data[key], list):
62-
data = data[key]
63-
break
64-
else:
65-
data = []
66-
if not isinstance(data, list):
67-
raise ValueError("Unexpected JSON shape for EEA dataset")
68-
return [d for d in data if isinstance(d, dict)]
69-
buf = io.StringIO(text)
70-
reader = csv.DictReader(buf)
71-
return [dict(row) for row in reader]
49+
resp = self.session.get(url, timeout=60)
50+
resp.raise_for_status()
51+
df = pd.read_parquet(io.BytesIO(resp.content))
52+
return df.to_dict(orient="records")
7253
except Exception as e:
73-
logger.error(f"EEA CSV/JSON load error: {e}")
54+
logger.error(f"EEA Parquet download/parse error for {dataset_id}: {e}")
7455
return []
7556

7657
def get_indicator(self, *, indicator: str = "GHG", country: Optional[str] = None, year: Optional[int] = None, limit: int = 100) -> List[Dict[str, Any]]:
@@ -127,48 +108,34 @@ def _load_csv_any(self, source: str) -> List[Dict[str, Any]]:
127108
# treat as local path
128109
return self._read_local_csv(source)
129110

130-
def get_countries_renewables(self, source: Optional[str] = None) -> List[Dict[str, Any]]:
131-
"""Load country renewables share dataset and normalize keys.
132111

133-
Prefers env EEA_RENEWABLES_SOURCE, then EEA_CSV_URL, then local file in repo.
134-
"""
135-
src = (source or os.getenv("EEA_RENEWABLES_SOURCE") or os.getenv("EEA_CSV_URL")
136-
or os.path.join(os.getcwd(), "countries-breakdown-actual-res-progress-13.csv"))
137-
data = self._load_csv_any(src)
112+
def get_countries_renewables(self) -> List[Dict[str, Any]]:
113+
"""Load country renewables share dataset from EEA Parquet API."""
114+
data = self._download_parquet(self.DATASET_IDS["renewables"])
138115
out: List[Dict[str, Any]] = []
139116
for r in data:
140-
# Headers may contain type suffix after ':'
141-
def g(key: str) -> Optional[str]:
142-
if key in r:
143-
return r.get(key)
144-
# try without type suffix
145-
for k in r.keys():
146-
if k.split(":")[0].strip().lower() == key.lower():
147-
return r.get(k)
148-
return None
149-
150-
country = (g("Country") or "").strip()
117+
# Adjust these keys as needed to match the Parquet schema
118+
country = (r.get("Country") or r.get("country") or "").strip()
151119
if not country:
152120
continue
153-
val20 = g("Renewable energy share 2020")
154-
val21 = g("Renewable energy share 2021")
155-
target20 = g("2020 Target")
121+
ren20 = r.get("Renewable energy share 2020") or r.get("renewable_energy_share_2020")
122+
ren21 = r.get("Renewable energy share 2021") or r.get("renewable_energy_share_2021")
123+
tgt20 = r.get("2020 Target") or r.get("target_2020")
156124
try:
157-
ren20 = float(val20) if (val20 is not None and str(val20).strip() != "") else None
125+
ren20 = float(ren20) if ren20 is not None and str(ren20).strip() != "" else None
158126
except Exception:
159127
ren20 = None
160128
try:
161-
ren21 = float(val21) if (val21 is not None and str(val21).strip() != "") else None
129+
ren21 = float(ren21) if ren21 is not None and str(ren21).strip() != "" else None
162130
except Exception:
163131
ren21 = None
164132
try:
165-
tgt20 = float(target20) if (target20 is not None and str(target20).strip() != "") else None
133+
tgt20 = float(tgt20) if tgt20 is not None and str(tgt20).strip() != "" else None
166134
except Exception:
167135
tgt20 = None
168136
out.append({
169137
"country": country,
170138
"renewable_energy_share_2020": ren20,
171-
# as requested: use 2021 column as proxy
172139
"renewable_energy_share_2021_proxy": ren21,
173140
"target_2020": tgt20,
174141
})
@@ -183,25 +150,14 @@ def get_country_renewables(self, country: Optional[str], source: Optional[str] =
183150
return r
184151
return None
185152

186-
def get_industrial_pollution(self, source: Optional[str] = None) -> List[Dict[str, Any]]:
187-
"""Load industrial pollutants time series (index=2010=100 style).
188153

189-
Prefers env EEA_POLLUTION_SOURCE, then local repo CSV.
190-
"""
191-
src = (source or os.getenv("EEA_POLLUTION_SOURCE")
192-
or os.path.join(os.getcwd(), "industrial-releases-of-pollutants-to.csv"))
193-
data = self._load_csv_any(src)
194-
# Normalize
154+
def get_industrial_pollution(self) -> List[Dict[str, Any]]:
155+
"""Load industrial pollutants time series from EEA Parquet API."""
156+
data = self._download_parquet(self.DATASET_IDS["industrial_pollution"])
195157
norm: List[Dict[str, Any]] = []
196158
for r in data:
197-
def g(key: str) -> Optional[str]:
198-
if key in r:
199-
return r.get(key)
200-
for k in r.keys():
201-
if k.split(":")[0].strip().lower() == key.lower():
202-
return r.get(k)
203-
return None
204-
y = g("Year")
159+
# Adjust keys as needed to match the Parquet schema
160+
y = r.get("Year") or r.get("year")
205161
try:
206162
year = int(float(y)) if y not in (None, "") else None
207163
except Exception:
@@ -215,13 +171,12 @@ def to_float(v):
215171
return None
216172
norm.append({
217173
"year": year,
218-
"cd_hg_ni_pb": to_float(g("Cd, Hg, Ni, Pb")),
219-
"toc": to_float(g("TOC")),
220-
"total_n": to_float(g("Total N")),
221-
"total_p": to_float(g("Total P")),
222-
"gva": to_float(g("GVA")),
174+
"cd_hg_ni_pb": to_float(r.get("Cd, Hg, Ni, Pb") or r.get("cd_hg_ni_pb")),
175+
"toc": to_float(r.get("TOC") or r.get("toc")),
176+
"total_n": to_float(r.get("Total N") or r.get("total_n")),
177+
"total_p": to_float(r.get("Total P") or r.get("total_p")),
178+
"gva": to_float(r.get("GVA") or r.get("gva")),
223179
})
224-
# sort by year
225180
norm.sort(key=lambda x: x.get("year", 0))
226181
return norm
227182

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ Werkzeug==3.0.1
99
typing-extensions>=4.7.0
1010
pytest==7.4.0
1111
httpx==0.27.0
12+
pyarrow==16.1.0

0 commit comments

Comments
 (0)