Skip to content

Commit 57d525a

Browse files
committed
Add batch mode processing for entities
1 parent 587bf09 commit 57d525a

10 files changed

Lines changed: 271 additions & 164 deletions

File tree

import-automation/executor/app/configs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ class ExecutorConfig:
169169
disable_email_notifications: bool = True
170170
# Skip uploading the data to GCS (for local testing).
171171
skip_gcs_upload: bool = False
172+
# Skip uploading input files to GCS.
173+
skip_input_upload: bool = False
172174
# Maximum time a blocking call to the importer to
173175
# perform an import can take in seconds.
174176
importer_import_timeout: float = 20 * 60

import-automation/executor/app/executor/import_executor.py

Lines changed: 131 additions & 127 deletions
Large diffs are not rendered by default.

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ main:
2121
memory: 32768
2222
disk: 100
2323
- resources: ${default(map.get(args, "resources"), defaultResources)}
24+
- runIngestion: ${default(map.get(args, "runIngestion"), false)}
2425
- runImportJob:
2526
try:
2627
call: googleapis.batch.v1.projects.locations.jobs.create
@@ -99,6 +100,16 @@ main:
99100
override: false
100101
comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}'
101102
result: functionResponse
103+
- runIngestion:
104+
switch:
105+
- condition: ${runIngestion}
106+
steps:
107+
- runSpannerIngestion:
108+
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.create
109+
args:
110+
parent: ${"projects/" + projectId + "/locations/" + region + "/workflows/spanner-ingestion-workflow"}
111+
body:
112+
argument: ${json.encode_to_string({"importList": [text.split(importName, ":")[1]]})}
102113
- returnResult:
103114
return:
104115
jobId: ${jobId}

import-automation/workflow/import-helper/import_helper.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID')
3232
INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper"
3333
WORKFLOW_ID = 'spanner-ingestion-workflow'
34+
IMPORT_AUTOMATION_WORKFLOW_ID = 'import-automation-workflow'
35+
3436

3537
def invoke_ingestion_workflow(import_name: str):
3638
"""Triggers the graph ingestion workflows.
@@ -51,6 +53,34 @@ def invoke_ingestion_workflow(import_name: str):
5153
)
5254

5355

56+
def invoke_import_workflow(import_name: str,
57+
latest_version: str,
58+
run_ingestion: bool = False):
59+
"""Triggers the import automation workflow.
60+
61+
Args:
62+
import_name: The name of the import.
63+
latest_version: The version of the import.
64+
run_ingestion: Whether to run the ingestion workflow after the import.
65+
"""
66+
import_config = {"user_script_args": [f"--version={latest_version}"]}
67+
workflow_args = {
68+
"importName": import_name,
69+
"importConfig": json.dumps(import_config),
70+
"runIngestion": run_ingestion
71+
}
72+
73+
logging.info(f"Invoking {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}")
74+
execution_client = executions_v1.ExecutionsClient()
75+
parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{IMPORT_AUTOMATION_WORKFLOW_ID}"
76+
execution_req = executions_v1.Execution(argument=json.dumps(workflow_args))
77+
response = execution_client.create_execution(parent=parent,
78+
execution=execution_req)
79+
logging.info(
80+
f"Triggered workflow {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}. Execution ID: {response.name}"
81+
)
82+
83+
5484
def update_import_status(import_name,
5585
import_status,
5686
import_version,

import-automation/workflow/import-helper/main.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ def handle_feed_event(request):
4747
graph_path = attributes.get('graph_path', "/**/*.mcf*")
4848
job_id = attributes.get('feed_name', 'cda_feed')
4949
cron_schedule = attributes.get('cron_schedule', '')
50-
post_process = attributes.get('post_process', '')
50+
run_ingestion = 'Schema' in import_name or 'Place' in import_name
51+
5152
# Update import status in spanner
5253
helper.update_import_status(import_name, import_status, latest_version,
53-
graph_path, job_id, cron_schedule)
54+
graph_path, job_id, cron_schedule)
5455

55-
# Invoke ingestion workflow to trigger dataflow job
56-
if post_process == 'spanner_ingestion_workflow':
57-
helper.invoke_ingestion_workflow(import_name)
56+
# Invoke import job and ingestion workflow to trigger dataflow job
57+
helper.invoke_import_workflow(import_name, latest_version, run_ingestion)
5858
return 'OK', 200

scripts/entities/download.sh

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#!/bin/bash
2+
# Copyright 2025 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
set -e
17+
18+
# Parse arguments
19+
for i in "$@"; do
20+
case $i in
21+
--entity=*)
22+
ENTITY="${i#*=}"
23+
shift
24+
;;
25+
--version=*)
26+
VERSION="${i#*=}"
27+
shift
28+
;;
29+
*)
30+
# Skip unknown options
31+
;;
32+
esac
33+
done
34+
35+
BUCKET_NAME="datcom-prod-imports"
36+
DIR_NAME=$(basename "$(pwd)")
37+
GCS_FOLDER_PREFIX="scripts/${DIR_NAME}/${ENTITY}/${VERSION}"
38+
GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}"
39+
40+
echo "Downloading import ${ENTITY} for version ${VERSION} from ${GCS_PATH} to $(pwd)"
41+
mkdir -p "${ENTITY}"
42+
gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/"
43+
echo "Successfully downloaded ${ENTITY} version ${VERSION}"

scripts/entities/manifest.json

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,21 @@
88
"provenance_url": "https://datacommons.org",
99
"provenance_description": "Schema nodes for Data Commons",
1010
"scripts": [
11-
"process.py --entity=Schema"
11+
"./download.sh --entity=Schema",
12+
"./process.py --entity=Schema"
13+
],
14+
"import_inputs": [
15+
{
16+
"node_mcf": "**/*.mcf"
17+
}
1218
],
13-
"import_inputs": [],
1419
"source_files": [],
1520
"cron_schedule": "15 3 * * *",
1621
"config_override": {
17-
"invoke_import_validation": false,
18-
"invoke_import_tool": false,
19-
"invoke_differ_tool": false
22+
"invoke_import_validation": true,
23+
"invoke_import_tool": true,
24+
"invoke_differ_tool": true,
25+
"skip_input_upload": true
2026
}
2127
},
2228
{
@@ -27,11 +33,39 @@
2733
"provenance_url": "https://datacommons.org",
2834
"provenance_description": "Place nodes for Data Commons",
2935
"scripts": [
30-
"process.py --entity=Place"
36+
"./download.sh --entity=Place"
3137
],
3238
"import_inputs": [],
3339
"source_files": [],
3440
"cron_schedule": "15 3 * * 1",
41+
"resource_limits": {
42+
"cpu": 8,
43+
"memory": 128,
44+
"disk": 100
45+
},
46+
"config_override": {
47+
"invoke_import_validation": false,
48+
"invoke_import_tool": true,
49+
"invoke_differ_tool": false
50+
}
51+
},
52+
{
53+
"import_name": "Provenance",
54+
"curator_emails": [
55+
"support@datacommons.org"
56+
],
57+
"provenance_url": "https://datacommons.org",
58+
"provenance_description": "Provenance nodes for Data Commons",
59+
"scripts": [
60+
"./download.sh --entity=Provenance"
61+
],
62+
"import_inputs": [
63+
{
64+
"node_mcf": "**/*.mcf"
65+
}
66+
],
67+
"source_files": [],
68+
"cron_schedule": "15 3 * * 1",
3569
"config_override": {
3670
"invoke_import_validation": false,
3771
"invoke_import_tool": false,

scripts/entities/process.py

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,17 @@
2525

2626
FLAGS = flags.FLAGS
2727
flags.DEFINE_string("entity", "Schema", "Entity type (Schema/Place).")
28+
flags.DEFINE_string("version", "", "Import version.")
2829

29-
BUCKET_NAME = 'datcom-prod-imports'
30-
FILE_NAME = 'staging_version.txt'
31-
32-
33-
def process(entity_type: str):
34-
# Ensure the import data is available in GCS.
35-
current_date = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d")
36-
logging.info(f'Checking import {entity_type} for date {current_date}')
37-
file_path = os.path.join('scripts', os.path.basename(os.getcwd()),
38-
entity_type, FILE_NAME)
39-
storage_client = storage.Client()
40-
bucket = storage_client.bucket(BUCKET_NAME)
41-
blob = bucket.blob(file_path)
42-
version = blob.download_as_text()
43-
if version == current_date:
44-
logging.info(
45-
f'Successfully validated import {entity_type} for date {current_date}'
46-
)
47-
return 0
48-
else:
49-
raise RuntimeError(
50-
f'{entity_type} data not present in GCS bucket {BUCKET_NAME} for date {current_date}'
51-
)
30+
31+
def process(entity_type: str, version: str):
32+
logging.info(f'Processing import {entity_type} for version {version}')
33+
# TODO: add processing logic
5234

5335

5436
def main(_):
5537
"""Runs the code."""
56-
process(FLAGS.entity)
38+
process(FLAGS.entity, FLAGS.version)
5739

5840

5941
if __name__ == "__main__":

tools/import_differ/import_differ.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ def run_dataflow_job(self, project: str, job: str, current_data: str,
347347
)
348348
return status
349349

350-
def run_differ(self):
350+
def run_differ(self) -> dict:
351351
os.makedirs(self.output_path, exist_ok=True)
352352
tmp_path = os.path.join(self.output_path, self.job_name)
353353
os.makedirs(tmp_path, exist_ok=True)
@@ -424,6 +424,7 @@ def run_differ(self):
424424
differ_utils.write_csv_data(obs_diff_samples, self.output_path,
425425
'obs_diff_samples.csv', tmp_path)
426426
logging.info(f'Differ output written to {self.output_path}')
427+
return differ_summary
427428

428429

429430
def main(_):

util/file_util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ def file_get_matching(filepat: Union[str, list]) -> list:
348348
for file in input_files:
349349
if file_is_local(file):
350350
# Expand local file pattern.
351-
for f in glob.glob(file):
351+
for f in glob.glob(file, recursive=True):
352352
files.add(f)
353353
elif file_is_gcs(file):
354354
bucket = file_get_gcs_bucket(file)

0 commit comments

Comments
 (0)