Skip to content

Commit 4dd43af

Browse files
committed
adding transformations for bq table
1 parent 2fa7921 commit 4dd43af

1 file changed

Lines changed: 59 additions & 22 deletions

File tree

scripts/noaa_gfs/dc_bq_ingest.py

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,72 @@
2323

2424
# --- FLAG DEFINITIONS ---
2525
FLAGS = flags.FLAGS
26-
flags.DEFINE_string('project_id', 'datcom-store', 'GCP Project ID.')
26+
flags.DEFINE_string('project_id', 'datcom-external', 'GCP Project ID.')
2727
flags.DEFINE_string('bucket_name', 'datcom-prod-imports', 'GCS Bucket containing the CSVs.')
2828
flags.DEFINE_string('gcs_prefix', 'scripts/noaa_gfs/NOAA_GlobalForecastSystem/output/', 'GCS prefix (folder path).')
29-
flags.DEFINE_string('dataset_id', 'dc_kg_latest', 'BigQuery Dataset ID.')
30-
flags.DEFINE_string('table_id', 'noaa_gfs_pdp', 'BigQuery Table ID.')
29+
flags.DEFINE_string('dataset_id', 'data_commons_noaa_gfs', 'BigQuery Dataset ID.')
30+
flags.DEFINE_string('table_id', 'Observation', 'BigQuery Table ID.')
31+
flags.DEFINE_string('staging_table_id', 'Observation_Staging', 'Temporary Staging Table ID.')
3132

32-
def upload_gcs_to_bq(bq_client, gcs_uri):
33+
def run_mapping_query(bq_client):
3334
"""
34-
Triggers a BigQuery load job directly from a GCS URI.
35+
Executes the SQL transformation to map data from Staging to Final table.
3536
"""
36-
table_ref = f"{FLAGS.project_id}.{FLAGS.dataset_id}.{FLAGS.table_id}"
37+
final_table = f"{FLAGS.project_id}.{FLAGS.dataset_id}.{FLAGS.table_id}"
38+
staging_table = f"{FLAGS.project_id}.{FLAGS.dataset_id}.{FLAGS.staging_table_id}"
39+
40+
query = f"""
41+
INSERT INTO `{final_table}` (
42+
observation_about,
43+
variable_measured,
44+
value,
45+
observation_date,
46+
measurement_method,
47+
unit,
48+
prov_id
49+
)
50+
SELECT
51+
placeName,
52+
variableMeasured,
53+
CAST(value AS STRING),
54+
CAST(observationDate AS STRING),
55+
measurementMethod,
56+
unit,
57+
'dc/base/NOAA_GlobalForecastSystem'
58+
FROM `{staging_table}`;
59+
"""
60+
61+
try:
62+
logging.info("Starting transformation query...")
63+
query_job = bq_client.query(query)
64+
query_job.result() # Wait for completion
65+
66+
# Optional: Truncate staging table after successful migration
67+
bq_client.query(f"TRUNCATE TABLE `{staging_table}`").result()
68+
logging.info("Transformation complete and staging table cleared.")
69+
return True
70+
except Exception as e:
71+
logging.error(f"Mapping query failed: {e}")
72+
return False
73+
74+
def upload_gcs_to_staging(bq_client, gcs_uri):
75+
"""
76+
Loads raw CSV data into the Staging table.
77+
"""
78+
table_ref = f"{FLAGS.project_id}.{FLAGS.dataset_id}.{FLAGS.staging_table_id}"
3779

38-
# Configure the load job
3980
job_config = bigquery.LoadJobConfig(
4081
source_format=bigquery.SourceFormat.CSV,
41-
skip_leading_rows=1, # Skip the header row
42-
autodetect=True, # Automatically infer types
82+
skip_leading_rows=1,
83+
autodetect=True,
84+
# WRITE_APPEND used here to collect all CSVs before the final SQL transformation
4385
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
4486
)
4587

4688
try:
47-
logging.info(f"Starting load job for: {gcs_uri}")
48-
# Direct URI load
49-
load_job = bq_client.load_table_from_uri(
50-
gcs_uri,
51-
table_ref,
52-
job_config=job_config
53-
)
54-
55-
# Wait for the job to complete
89+
logging.info(f"Loading to staging: {gcs_uri}")
90+
load_job = bq_client.load_table_from_uri(gcs_uri, table_ref, job_config=job_config)
5691
load_job.result()
57-
58-
logging.info(f"Successfully loaded rows from {gcs_uri}. Job ID: {load_job.job_id}")
5992
return True
6093
except Exception as e:
6194
logging.error(f"Failed to load {gcs_uri}: {e}")
@@ -80,13 +113,17 @@ def main(argv):
80113

81114
logging.info(f"Found {len(csv_uris)} files in GCS for ingestion.")
82115

83-
# Loop and Trigger Load Jobs
116+
# Step 1: Bulk Load everything into Staging
84117
success_count = 0
85118
for uri in csv_uris:
86-
if upload_gcs_to_bq(bq_client, uri):
119+
if upload_gcs_to_staging(bq_client, uri):
87120
success_count += 1
88121

89122
logging.info(f"Ingestion batch complete. {success_count}/{len(csv_uris)} URIs processed.")
90123

124+
# Step 2: Run Mapping SQL if at least some files loaded
125+
if success_count > 0:
126+
run_mapping_query(bq_client)
127+
91128
if __name__ == "__main__":
92129
app.run(main)

0 commit comments

Comments
 (0)