Skip to content

Commit fbdac9c

Browse files
authored
Add batch mode processing for entities (#1911)
1 parent cc4e120 commit fbdac9c

11 files changed

Lines changed: 353 additions & 176 deletions

File tree

import-automation/executor/app/configs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ class ExecutorConfig:
169169
disable_email_notifications: bool = True
170170
# Skip uploading the data to GCS (for local testing).
171171
skip_gcs_upload: bool = False
172+
# Skip uploading input files to GCS.
173+
skip_input_upload: bool = False
172174
# Maximum time a blocking call to the importer to
173175
# perform an import can take in seconds.
174176
importer_import_timeout: float = 20 * 60

import-automation/executor/app/executor/import_executor.py

Lines changed: 137 additions & 125 deletions
Large diffs are not rendered by default.

import-automation/executor/cloudbuild.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ steps:
4646
python import_test.py
4747
env:
4848
- 'PROJECT_ID=$PROJECT_ID'
49-
- 'LOCATION=$LOCATION'
5049
- 'GCS_BUCKET=${_GCS_BUCKET}'
5150
- 'IMAGE_URI=${_DOCKER_IMAGE}:${COMMIT_SHA}'
5251
dir: 'import-automation/executor'

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ main:
2121
memory: 32768
2222
disk: 100
2323
- resources: ${default(map.get(args, "resources"), defaultResources)}
24+
- runIngestion: ${default(map.get(args, "runIngestion"), false)}
25+
- ingestionArgs:
26+
importList:
27+
- ${text.split(importName, ":")[1]}
2428
- runImportJob:
2529
try:
2630
call: googleapis.batch.v1.projects.locations.jobs.create
@@ -99,6 +103,16 @@ main:
99103
override: false
100104
comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}'
101105
result: functionResponse
106+
- runIngestion:
107+
switch:
108+
- condition: ${runIngestion}
109+
steps:
110+
- runSpannerIngestion:
111+
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.create
112+
args:
113+
parent: ${"projects/" + projectId + "/locations/" + region + "/workflows/spanner-ingestion-workflow"}
114+
body:
115+
argument: ${json.encode_to_string(ingestionArgs)}
102116
- returnResult:
103117
return:
104118
jobId: ${jobId}

import-automation/workflow/import-helper/import_helper.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID')
3232
INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper"
3333
WORKFLOW_ID = 'spanner-ingestion-workflow'
34+
IMPORT_AUTOMATION_WORKFLOW_ID = 'import-automation-workflow'
35+
3436

3537
def invoke_ingestion_workflow(import_name: str):
3638
"""Triggers the graph ingestion workflows.
@@ -51,6 +53,34 @@ def invoke_ingestion_workflow(import_name: str):
5153
)
5254

5355

56+
def invoke_import_workflow(import_name: str,
57+
latest_version: str,
58+
run_ingestion: bool = False):
59+
"""Triggers the import automation workflow.
60+
61+
Args:
62+
import_name: The name of the import.
63+
latest_version: The version of the import.
64+
run_ingestion: Whether to run the ingestion workflow after the import.
65+
"""
66+
import_config = {"user_script_args": [f"--version={latest_version}"]}
67+
workflow_args = {
68+
"importName": import_name,
69+
"importConfig": json.dumps(import_config),
70+
"runIngestion": run_ingestion
71+
}
72+
73+
logging.info(f"Invoking {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}")
74+
execution_client = executions_v1.ExecutionsClient()
75+
parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{IMPORT_AUTOMATION_WORKFLOW_ID}"
76+
execution_req = executions_v1.Execution(argument=json.dumps(workflow_args))
77+
response = execution_client.create_execution(parent=parent,
78+
execution=execution_req)
79+
logging.info(
80+
f"Triggered workflow {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}. Execution ID: {response.name}"
81+
)
82+
83+
5484
def update_import_status(import_name,
5585
import_status,
5686
import_version,

import-automation/workflow/import-helper/main.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,11 @@ def handle_feed_event(request):
4040
return 'OK', 200
4141

4242
import_name = attributes.get('import_name')
43-
import_status = 'STAGING'
4443
latest_version = attributes.get(
4544
'import_version',
4645
datetime.now(timezone.utc).strftime("%Y-%m-%d"))
47-
graph_path = attributes.get('graph_path', "/**/*.mcf*")
48-
job_id = attributes.get('feed_name', 'cda_feed')
49-
cron_schedule = attributes.get('cron_schedule', '')
50-
post_process = attributes.get('post_process', '')
51-
# Update import status in spanner
52-
helper.update_import_status(import_name, import_status, latest_version,
53-
graph_path, job_id, cron_schedule)
54-
55-
# Invoke ingestion workflow to trigger dataflow job
56-
if post_process == 'spanner_ingestion_workflow':
57-
helper.invoke_ingestion_workflow(import_name)
46+
run_ingestion = True
47+
48+
# Invoke import job and ingestion workflow to trigger dataflow job
49+
helper.invoke_import_workflow(import_name, latest_version, run_ingestion)
5850
return 'OK', 200

scripts/entities/download.sh

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#!/bin/bash
2+
# Copyright 2025 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
set -e
17+
18+
# Parse arguments
19+
for i in "$@"; do
20+
case $i in
21+
--entity=*)
22+
ENTITY="${i#*=}"
23+
;;
24+
--version=*)
25+
VERSION="${i#*=}"
26+
;;
27+
*)
28+
# Skip unknown options
29+
;;
30+
esac
31+
done
32+
33+
BUCKET_NAME="datcom-prod-imports"
34+
DIR_NAME=$(basename "$(pwd)")
35+
GCS_FOLDER_PREFIX="scripts/${DIR_NAME}/${ENTITY}"
36+
GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}/${VERSION}"
37+
38+
echo "Downloading import ${ENTITY} for version ${VERSION} from ${GCS_PATH} to $(pwd)"
39+
mkdir -p "${ENTITY}"
40+
gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/"
41+
echo "Successfully downloaded ${ENTITY} version ${VERSION}"
42+
43+
# TODO: remove after scrpts are checked in
44+
# Download scripts from GCS
45+
SCRIPTS_GCS_PATH="gs://${BUCKET_NAME}/scripts/${DIR_NAME}/process/*"
46+
SCRIPTS_LOCAL_PATH="../../import-automation/executor/scripts"
47+
echo "Downloading scripts from ${SCRIPTS_GCS_PATH} to ${SCRIPTS_LOCAL_PATH}"
48+
mkdir -p "${SCRIPTS_LOCAL_PATH}"
49+
gcloud storage cp -r "${SCRIPTS_GCS_PATH}" "${SCRIPTS_LOCAL_PATH}/"
50+
51+
52+

scripts/entities/manifest.json

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,21 @@
88
"provenance_url": "https://datacommons.org",
99
"provenance_description": "Schema nodes for Data Commons",
1010
"scripts": [
11-
"process.py --entity=Schema"
11+
"./download.sh --entity=Schema",
12+
"./process.py --entity=Schema"
13+
],
14+
"import_inputs": [
15+
{
16+
"node_mcf": "**/*.mcf"
17+
}
1218
],
13-
"import_inputs": [],
1419
"source_files": [],
1520
"cron_schedule": "15 3 * * *",
1621
"config_override": {
17-
"invoke_import_validation": false,
18-
"invoke_import_tool": false,
19-
"invoke_differ_tool": false
22+
"invoke_import_validation": true,
23+
"invoke_import_tool": true,
24+
"invoke_differ_tool": true,
25+
"skip_input_upload": true
2026
}
2127
},
2228
{
@@ -27,15 +33,81 @@
2733
"provenance_url": "https://datacommons.org",
2834
"provenance_description": "Place nodes for Data Commons",
2935
"scripts": [
30-
"process.py --entity=Place"
36+
"./download.sh --entity=Place",
37+
"./process.py --entity=Place"
38+
],
39+
"import_inputs": [
40+
{
41+
"node_mcf": "**/*.mcf"
42+
}
43+
],
44+
"source_files": [],
45+
"cron_schedule": "15 3 * * 1",
46+
"resource_limits": {
47+
"cpu": 8,
48+
"memory": 128,
49+
"disk": 100
50+
},
51+
"config_override": {
52+
"invoke_import_validation": false,
53+
"invoke_import_tool": false,
54+
"invoke_differ_tool": false,
55+
"skip_input_upload": true
56+
}
57+
},
58+
{
59+
"import_name": "Provenance",
60+
"curator_emails": [
61+
"support@datacommons.org"
62+
],
63+
"provenance_url": "https://datacommons.org",
64+
"provenance_description": "Provenance nodes for Data Commons",
65+
"scripts": [
66+
"./download.sh --entity=Provenance",
67+
"./process.py --entity=Provenance"
68+
],
69+
"import_inputs": [
70+
{
71+
"node_mcf": "**/*.mcf"
72+
}
73+
],
74+
"source_files": [],
75+
"cron_schedule": "15 3 * * 1",
76+
"config_override": {
77+
"invoke_import_validation": false,
78+
"invoke_import_tool": false,
79+
"invoke_differ_tool": false,
80+
"skip_input_upload": true
81+
}
82+
},
83+
{
84+
"import_name": "Event",
85+
"curator_emails": [
86+
"support@datacommons.org"
87+
],
88+
"provenance_url": "https://datacommons.org",
89+
"provenance_description": "Event nodes for Data Commons",
90+
"scripts": [
91+
"./download.sh --entity=Event",
92+
"./process.py --entity=Event"
93+
],
94+
"import_inputs": [
95+
{
96+
"node_mcf": "**/*.mcf"
97+
}
3198
],
32-
"import_inputs": [],
3399
"source_files": [],
34100
"cron_schedule": "15 3 * * 1",
101+
"resource_limits": {
102+
"cpu": 8,
103+
"memory": 128,
104+
"disk": 100
105+
},
35106
"config_override": {
36107
"invoke_import_validation": false,
37108
"invoke_import_tool": false,
38-
"invoke_differ_tool": false
109+
"invoke_differ_tool": false,
110+
"skip_input_upload": true
39111
}
40112
}
41113
]

scripts/entities/process.py

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,41 +19,42 @@
1919
from absl import app
2020
from absl import flags
2121
from absl import logging
22-
import datetime
23-
from google.cloud import storage
2422
import os
23+
import sys
24+
25+
# Add the scripts directory to sys.path
26+
script_dir = os.path.abspath(
27+
os.path.join(os.path.dirname(__file__), '..', '..', 'import-automation',
28+
'executor', 'scripts'))
29+
sys.path.append(script_dir)
30+
import generate_provisional_nodes
31+
import convert_dc_manifest
2532

2633
FLAGS = flags.FLAGS
27-
flags.DEFINE_string("entity", "Schema", "Entity type (Schema/Place).")
28-
29-
BUCKET_NAME = 'datcom-prod-imports'
30-
FILE_NAME = 'staging_version.txt'
31-
32-
33-
def process(entity_type: str):
34-
# Ensure the import data is available in GCS.
35-
current_date = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d")
36-
logging.info(f'Checking import {entity_type} for date {current_date}')
37-
file_path = os.path.join('scripts', os.path.basename(os.getcwd()),
38-
entity_type, FILE_NAME)
39-
storage_client = storage.Client()
40-
bucket = storage_client.bucket(BUCKET_NAME)
41-
blob = bucket.blob(file_path)
42-
version = blob.download_as_text()
43-
if version == current_date:
44-
logging.info(
45-
f'Successfully validated import {entity_type} for date {current_date}'
46-
)
47-
return 0
48-
else:
49-
raise RuntimeError(
50-
f'{entity_type} data not present in GCS bucket {BUCKET_NAME} for date {current_date}'
51-
)
34+
flags.DEFINE_string("entity", "", "Entity type (Schema/Place).")
35+
flags.DEFINE_string("version", "", "Import version.")
36+
37+
38+
def process(entity_type: str, version: str):
39+
logging.info(f'Processing import {entity_type} for version {version}')
40+
local_path = os.path.abspath(
41+
os.path.join(os.path.dirname(__file__), entity_type, version))
42+
43+
if entity_type == 'Provenance':
44+
# Local path to Provenance data
45+
logging.info(f'Processing DC manifest files in {local_path}')
46+
convert_dc_manifest.process_directory(local_path)
47+
48+
# Local path to data
49+
logging.info(
50+
f'Generating provisional nodes for {entity_type} in {local_path}')
51+
generate_provisional_nodes.generate_provisional_nodes(local_path)
52+
return 0
5253

5354

5455
def main(_):
5556
"""Runs the code."""
56-
process(FLAGS.entity)
57+
process(FLAGS.entity, FLAGS.version)
5758

5859

5960
if __name__ == "__main__":

tools/import_differ/import_differ.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ def run_dataflow_job(self, project: str, job: str, current_data: str,
347347
)
348348
return status
349349

350-
def run_differ(self):
350+
def run_differ(self) -> dict:
351351
os.makedirs(self.output_path, exist_ok=True)
352352
tmp_path = os.path.join(self.output_path, self.job_name)
353353
os.makedirs(tmp_path, exist_ok=True)
@@ -368,6 +368,8 @@ def run_differ(self):
368368
diff_path = os.path.join(self.output_path, 'schema-diff*')
369369
logging.info("Loading schema diff data from: %s", diff_path)
370370
schema_diff = differ_utils.load_csv_data(diff_path, tmp_path)
371+
# TODO: populate summary for cloud mode
372+
differ_summary = {}
371373
else:
372374
# Runs local Python differ.
373375
current_dir = os.path.join(tmp_path, 'current')
@@ -424,6 +426,7 @@ def run_differ(self):
424426
differ_utils.write_csv_data(obs_diff_samples, self.output_path,
425427
'obs_diff_samples.csv', tmp_path)
426428
logging.info(f'Differ output written to {self.output_path}')
429+
return differ_summary
427430

428431

429432
def main(_):

0 commit comments

Comments
 (0)