Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
fee7a82
adding files related to NOA GFS poc
balit-raibot Jan 4, 2026
adcf7ab
resolving comments
balit-raibot Jan 6, 2026
7e0150e
resolvingcomments
balit-raibot Jan 6, 2026
069eadd
refined schema
balit-raibot Jan 14, 2026
0e97f29
Merge branch 'master' into noa_gfs_poc
balit-raibot Jan 14, 2026
5b46602
refined schema
balit-raibot Jan 14, 2026
94c3769
refined schema
balit-raibot Jan 14, 2026
644825a
used existing stat var mcf file
balit-raibot Jan 15, 2026
6ab31c8
resolving comments
balit-raibot Jan 16, 2026
68b28b8
resolving comments
balit-raibot Jan 16, 2026
2af5ae9
resolving comments
balit-raibot Jan 16, 2026
e0bfb51
resolving comments
balit-raibot Jan 16, 2026
8532138
adding custom stat var processor script
balit-raibot Jan 19, 2026
b13690b
refined schema
balit-raibot Jan 22, 2026
0a643dd
test_data folder creation
balit-raibot Jan 22, 2026
18461a0
renamed parent directory
balit-raibot Jan 23, 2026
d60df2f
Merge branch 'master' into noa_gfs_poc
balit-raibot Jan 23, 2026
4e3b9f6
modified schema file
balit-raibot Jan 23, 2026
3b7efda
Merge branch 'master' into noa_gfs_poc
balit-raibot Jan 23, 2026
d50a1f3
Merge branch 'noa_gfs_poc' of https://github.com/balit-raibot/data in…
balit-raibot Jan 23, 2026
ab3db64
Merge branch 'master' into noa_gfs_poc
balit-raibot Jan 27, 2026
2c83c8f
Merge branch 'master' into noa_gfs_poc
balit-raibot Jan 30, 2026
d7940c5
adding files
smarthg-gi Feb 3, 2026
6200509
Merge branch 'master' into noa_gfs_poc
balit-raibot Mar 2, 2026
0cbda78
Merge branch 'master' into noa_gfs_poc
balit-raibot Mar 12, 2026
e979a2d
Merge branch 'master' into noa_gfs_poc
balit-raibot Mar 12, 2026
dbee392
Merge branch 'master' into noa_gfs_poc
balit-raibot Mar 26, 2026
ec24453
Merge branch 'master' into noa_gfs_poc
balit-raibot Mar 30, 2026
9d6b0a1
resolved comments
balit-raibot Mar 30, 2026
553d0e9
Merge branch 'noa_gfs_poc' of https://github.com/balit-raibot/data in…
balit-raibot Mar 30, 2026
d7987c2
added multiplier logic
balit-raibot Mar 30, 2026
413a3ce
adding run.sh
balit-raibot Mar 30, 2026
8e98ca5
adding manifest
balit-raibot Mar 30, 2026
ab52999
modified README
balit-raibot Mar 30, 2026
d31da68
modified resources
balit-raibot Mar 30, 2026
5827272
corrected custom_processor
balit-raibot Mar 31, 2026
f8ebe59
Merge branch 'master' into noa_gfs_poc
balit-raibot Mar 31, 2026
e258605
modifying measurementMethod
balit-raibot Apr 1, 2026
f7a7bb4
Merge branch 'noa_gfs_poc' of https://github.com/balit-raibot/data in…
balit-raibot Apr 1, 2026
e46638f
modified test data as per new schema
balit-raibot Apr 1, 2026
7973740
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 1, 2026
251b40e
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 2, 2026
8c8f65d
adding python based grib to csv conversion
balit-raibot Apr 5, 2026
cc14788
Merge branch 'noa_gfs_poc' of https://github.com/balit-raibot/data in…
balit-raibot Apr 5, 2026
0c14460
adding python based grib to csv conversion
balit-raibot Apr 5, 2026
8ca0b39
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 5, 2026
710441e
adding absl flags
balit-raibot Apr 5, 2026
2e67bf8
Merge branch 'noa_gfs_poc' of https://github.com/balit-raibot/data in…
balit-raibot Apr 5, 2026
7131ea9
removed wgrib tool from run.sh
balit-raibot Apr 5, 2026
88887d0
changed location of tmcf file
balit-raibot Apr 5, 2026
f18b508
added map for static equalities
balit-raibot Apr 5, 2026
969e7f1
modified pipeline to convert grib to dcid csv
balit-raibot Apr 6, 2026
78b965f
bug fixes
balit-raibot Apr 6, 2026
edef7a9
added download script
balit-raibot Apr 6, 2026
9db56a7
adding libeccodes and pygrib dependencies
balit-raibot Apr 6, 2026
a2bf01a
bug fixes
balit-raibot Apr 6, 2026
621e909
valid mask filter edge case
balit-raibot Apr 6, 2026
763f723
refactored script for parallel execution
balit-raibot Apr 7, 2026
b4215a7
bug fixes
balit-raibot Apr 7, 2026
07e72bb
adding comments
balit-raibot Apr 7, 2026
55bcbe7
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 7, 2026
c3d5473
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 7, 2026
c86408f
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 7, 2026
45ece2d
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 7, 2026
7be51e8
renamed file
balit-raibot Apr 7, 2026
5dee26f
Merge branch 'noa_gfs_poc' of https://github.com/balit-raibot/data in…
balit-raibot Apr 7, 2026
84402c8
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 10, 2026
ee18d74
moved code from statvar_imports to scripts
balit-raibot Apr 10, 2026
92e4e04
Merge branch 'noa_gfs_poc' of https://github.com/balit-raibot/data in…
balit-raibot Apr 10, 2026
eb612a3
adding state.json for incremental ingestion
balit-raibot Apr 12, 2026
1656e88
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 12, 2026
2b5b657
adding dc_bq_ingest.py
balit-raibot Apr 12, 2026
1170e35
Merge branch 'noa_gfs_poc' of https://github.com/balit-raibot/data in…
balit-raibot Apr 12, 2026
7e09438
added import name folder for state.json
balit-raibot Apr 13, 2026
ee2ac1d
moved output to GCS than local
balit-raibot Apr 13, 2026
e434357
bq ingestion from GCS than local
balit-raibot Apr 13, 2026
2fa7921
changed bq table id
balit-raibot Apr 13, 2026
5412a72
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 13, 2026
4dd43af
adding transformations for bq table
balit-raibot Apr 14, 2026
7b1f0b8
Merge branch 'noa_gfs_poc' of https://github.com/balit-raibot/data in…
balit-raibot Apr 14, 2026
8f39cf0
updated cron and README
balit-raibot Apr 14, 2026
e396836
Merge branch 'master' into noa_gfs_poc
balit-raibot Apr 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion import-automation/executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ fonts-liberation \
xdg-utils \
chromium \
chromium-driver \
p7zip-full
p7zip-full \
libeccodes
# Install the Google Cloud CLI
RUN apt-get update && \
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg && \
Expand Down
1 change: 1 addition & 0 deletions import-automation/executor/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ omegaconf
prettytable
protobuf
psutil
pygrib
pylint
pyspellchecker
pytest
Expand Down
61 changes: 61 additions & 0 deletions scripts/noaa_gfs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# NOAA: Global Forecast System Dataset
## Overview
The NOAA-GFS 0.25 Atmos dataset provides high-resolution global atmospheric and land-surface data on a 0.25-degree (~28km) grid. It includes a wide range of meteorological variables, such as temperature, wind, humidity, precipitation, and soil moisture, generated four times daily with forecasts extending up to 16 days (384 hours).
The dataset provides a standardized global output on a 0.25-degree (~28km) equidistant cylindrical grid, covering the entire Earth's surface and up to 127 vertical atmospheric layers. It is distributed in GRIB2 (Gridded Binary Edition 2) format via the NOAA Operational Model Archive and Distribution System (NOMADS) and is categorized as a public domain product of the United States Government.
This pipeline automates the ingestion, format conversion, and standardized mapping of GFS GRIB2 files into Data Commons-compatible StatVar observations.

## Data Source & Provenance
* **Source URL:** [NOMADS NCEP GFS Production](https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/)
* **Provider:** National Centers for Environmental Prediction (NCEP / NOAA).
* **Update Frequency:** 4 times daily (00z, 06z, 12z, 18z).
* **Variable Inventory:** [NCO Product Description](https://www.nco.ncep.noaa.gov/pmb/products/gfs/gfs.t00z.pgrb2.0p25.anl.shtml)


## Automated Pipeline Logic
The pipeline is a Python-driven architecture managed via a `manifest.json` import specification.

### 1. Data Ingestion (`download_noaa_gfs_grib.py`)
* **Stateful Tracking:** The script retrieves its last successful run checkpoint from `gs://{bucket}/state.json`.
* **Chronological Integrity:** It identifies missing 6-hour slots (00z, 06z, 12z, 18z) and performs memory-efficient streamed downloads of GRIB2 files into local `input_files/` directories.

### 2. Transformation & Mapping (`grib_statvar_processor.py`)
This stage converts binary meteorological data into structured CSVs using the `pygrib` library.
* **Parallel Processing:** Utilizes `multiprocessing.Pool` to process GRIB messages across available CPU cores.
* **Coordinate Normalization:** Longitudes are transformed from the 0–360 range to the -180 to 180 range.
* **StatVar Mapping:**
* **DCID Construction:** Maps GRIB short codes (e.g., `TMP`, `UGRD`) and vertical levels to formal Data Commons identifiers like `dcid:Temperature_Place_850Millibar`.
* **Unit Scaling:** Automatically scales variables such as Land and Ice cover.
* **GCS Streaming:** Processed CSVs are merged and uploaded directly to the GCS output prefix.

### 3. BigQuery Ingestion (`dc_bq_ingest.py`)
* **Staging Pattern:** Bulk loads raw CSVs from GCS into a staging table (`Observation_Staging`).
* **SQL Transformation:** Executes an `INSERT INTO` query to map staging data to the final production schema, handling type casting and attaching the provenance ID (`dc/base/NOAA_GlobalForecastSystem`).

---

## Pipeline Configuration (`manifest.json`)
The pipeline is governed by specific resource requirements for high-concurrency GRIB decompression:
* **Cron Schedule:** `30 04,10,16,22 * * *` (Runs 30 minutes after GFS cycle releases).
* **Resource Limits:** 64 CPUs | 256GB RAM | 4GB Disk.
* **Timeout:** 1 hour (`3600s`).

---

## Usage Instructions

### Prerequisites
* **Python Libraries:** `pygrib`, `numpy`, `google-cloud-storage`, `google-cloud-bigquery`, `absl-py`.
* **System Requirements:** Requires `libgrib-api` or `eccodes` installed on the host system.

### Manual Execution
While designed for automated execution, stages can be run manually for debugging:

```bash
# 1. Download missing data
python3 download_noaa_gfs_grib.py --project_id=YOUR_PROJECT_ID

# 2. Process GRIB to CSV and upload to GCS
python3 grib_statvar_processor.py --input=./input_files

# 3. Ingest from GCS to BigQuery
python3 dc_bq_ingest.py --project_id=YOUR_PROJECT_ID --dataset_id=YOUR_DATASET
129 changes: 129 additions & 0 deletions scripts/noaa_gfs/dc_bq_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Automates ingestion of processed NOAA GFS meteorological data into BigQuery.
"""

import os
from absl import app, flags, logging
from google.cloud import bigquery
from google.cloud import storage

# --- FLAG DEFINITIONS ---
FLAGS = flags.FLAGS
flags.DEFINE_string('project_id', 'datcom-external', 'GCP Project ID.')
flags.DEFINE_string('bucket_name', 'datcom-prod-imports', 'GCS Bucket containing the CSVs.')
flags.DEFINE_string('gcs_prefix', 'scripts/noaa_gfs/NOAA_GlobalForecastSystem/output/', 'GCS prefix (folder path).')
flags.DEFINE_string('dataset_id', 'data_commons_noaa_gfs', 'BigQuery Dataset ID.')
flags.DEFINE_string('table_id', 'Observation', 'BigQuery Table ID.')
flags.DEFINE_string('staging_table_id', 'Observation_Staging', 'Temporary Staging Table ID.')

def run_mapping_query(bq_client):
"""
Executes the SQL transformation to map data from Staging to Final table.
"""
final_table = f"{FLAGS.project_id}.{FLAGS.dataset_id}.{FLAGS.table_id}"
staging_table = f"{FLAGS.project_id}.{FLAGS.dataset_id}.{FLAGS.staging_table_id}"

query = f"""
INSERT INTO `{final_table}` (
observation_about,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the geo location for latitude/longitude?

variable_measured,
value,
observation_date,
measurement_method,
unit,
prov_id
)
SELECT
placeName,
variableMeasured,
CAST(value AS STRING),
CAST(observationDate AS STRING),
measurementMethod,
unit,
'dc/base/NOAA_GlobalForecastSystem'
FROM `{staging_table}`;
"""

try:
logging.info("Starting transformation query...")
query_job = bq_client.query(query)
query_job.result() # Wait for completion

# Optional: Truncate staging table after successful migration
bq_client.query(f"TRUNCATE TABLE `{staging_table}`").result()
logging.info("Transformation complete and staging table cleared.")
return True
except Exception as e:
logging.error(f"Mapping query failed: {e}")
return False

def upload_gcs_to_staging(bq_client, gcs_uri):
"""
Loads raw CSV data into the Staging table.
"""
table_ref = f"{FLAGS.project_id}.{FLAGS.dataset_id}.{FLAGS.staging_table_id}"

job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
# WRITE_APPEND used here to collect all CSVs before the final SQL transformation
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)

try:
logging.info(f"Loading to staging: {gcs_uri}")
load_job = bq_client.load_table_from_uri(gcs_uri, table_ref, job_config=job_config)
load_job.result()
return True
except Exception as e:
logging.error(f"Failed to load {gcs_uri}: {e}")
return False

def main(argv):
"""Entry point for the GCS-to-BigQuery ingestion script."""
# Initialize Clients
bq_client = bigquery.Client(project=FLAGS.project_id)
storage_client = storage.Client(project=FLAGS.project_id)

# Get reference to the bucket and list blobs
bucket = storage_client.bucket(FLAGS.bucket_name)
blobs = bucket.list_blobs(prefix=FLAGS.gcs_prefix)

# Filter for CSV files
csv_uris = [f"gs://{FLAGS.bucket_name}/{blob.name}" for blob in blobs if blob.name.endswith('.csv')]

if not csv_uris:
logging.warning(f"No CSV files found at gs://{FLAGS.bucket_name}/{FLAGS.gcs_prefix}")
return

logging.info(f"Found {len(csv_uris)} files in GCS for ingestion.")

# Step 1: Bulk Load everything into Staging
success_count = 0
for uri in csv_uris:
if upload_gcs_to_staging(bq_client, uri):
success_count += 1

logging.info(f"Ingestion batch complete. {success_count}/{len(csv_uris)} URIs processed.")

# Step 2: Run Mapping SQL if at least some files loaded
if success_count > 0:
run_mapping_query(bq_client)

if __name__ == "__main__":
app.run(main)
135 changes: 135 additions & 0 deletions scripts/noaa_gfs/download_noaa_gfs_grib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Automates GFS GRIB2 source file retrieval from NOAA NOMADS.
This script manages dated directory structures and utilizes memory-efficient
HTTP streaming to download large-scale meteorological datasets for
downstream Data Commons ingestion.
"""

import os
import json
import requests
from datetime import datetime, timedelta
from pathlib import Path
from absl import app, flags, logging
from google.cloud import storage
from google.api_core import exceptions

# --- FLAG DEFINITIONS ---
FLAGS = flags.FLAGS
flags.DEFINE_string('project_id', 'datcom', 'The GCP Project ID.')
flags.DEFINE_string('bucket_name', 'datcom-prod-imports', 'The GCS bucket name.')
flags.DEFINE_string('state_path', 'scripts/noaa_gfs/NOAA_GlobalForecastSystem/state.json', 'The path within the bucket for state.json.')

def get_gcs_client():
"""Initializes the GCS client with a specific Project ID."""
return storage.Client(project=FLAGS.project_id)

def load_state():
"""Reads state from GCS. Returns default if file doesn't exist."""
client = get_gcs_client()
bucket = client.bucket(FLAGS.bucket_name)
blob = bucket.blob(FLAGS.state_path)

try:
state_data = blob.download_as_text()
logging.info(f"Successfully loaded state from gs://{FLAGS.bucket_name}/{FLAGS.state_path}")
return json.loads(state_data)
except exceptions.NotFound:
logging.warning("State file not found in GCS. Starting from default (24h ago).")
# Default: Start 24 hours ago
yesterday = (datetime.now() - timedelta(days=1))
return {"date": yesterday.strftime('%Y%m%d'), "cycle": "18"}

def get_next_slot(current_date_str, current_cycle):
"""Calculates the next 6-hour GFS slot."""
current_dt = datetime.strptime(f"{current_date_str}{current_cycle}", '%Y%m%d%H')
next_dt = current_dt + timedelta(hours=6)
return next_dt.strftime('%Y%m%d'), next_dt.strftime('%H')

def download_gfs_file(date_stamp, cycle, fhour="000"):
"""Downloads the GRIB2 file from NOAA."""
# 1. Setup Paths
# Target directory: ./input_files/YYYYMMDD/
target_dir = Path("./input_files") / date_stamp
target_dir.mkdir(parents=True, exist_ok=True)

file_name = f"gfs.t{cycle}z.pgrb2.0p25.f{fhour}"
output_path = target_dir / file_name

# 2. Construct URL
url = (f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/"
f"gfs.{date_stamp}/{cycle}/atmos/{file_name}")

logging.info(f"Downloading: {url}")
logging.info(f"Destination: {output_path}")

# 3. Perform Streamed Download
try:
with requests.get(url, stream=True, timeout=60) as r:
# Check if file exists on server (e.g., handles 404 if data isn't ready)
r.raise_for_status()

with open(output_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024 * 1024): # 1MB chunks
if chunk:
f.write(chunk)

logging.info(f"Successfully downloaded: {date_stamp} Cycle {cycle}")
return str(output_path)

except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logging.error(f"File not found on NOMADS. The {date_stamp} data might not be posted yet.")
else:
logging.error(f"HTTP Error: {e}")
except Exception as e:
logging.error(f"Download failed: {e}")

return None

def main(argv):
"""Entry point for the download script."""
state = load_state()
current_date = state['date']
current_cycle = state['cycle']

# Get the latest possible slot (NOAA usually has a few hours delay)
now = datetime.now() - timedelta(hours=4)

logging.info(f"Iterating from: {current_date} {current_cycle}z")

while True:
# 1. Determine the next slot to try
next_date, next_cycle = get_next_slot(current_date, current_cycle)
next_dt = datetime.strptime(f"{next_date}{next_cycle}", '%Y%m%d%H')

# 2. Stop if we are trying to download files from the future
if next_dt > now:
logging.info("All available files up to current time have been checked.")
break

# 3. Attempt Download
if download_gfs_file(next_date, next_cycle):
current_date, current_cycle = next_date, next_cycle
else:
# If a file isn't found, it might not be posted yet.
# We stop here to maintain chronological integrity.
logging.info(f"Reached the end of available data on server at {next_date} {next_cycle}z.")
break

if __name__ == "__main__":
app.run(main)
Loading
Loading