Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class ExecutorConfig:
disable_email_notifications: bool = True
# Skip uploading the data to GCS (for local testing).
skip_gcs_upload: bool = False
# Skip uploading input files to GCS.
skip_input_upload: bool = False
# Maximum time a blocking call to the importer to
# perform an import can take in seconds.
importer_import_timeout: float = 20 * 60
Expand Down
262 changes: 137 additions & 125 deletions import-automation/executor/app/executor/import_executor.py

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion import-automation/executor/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ steps:
python import_test.py
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'GCS_BUCKET=${_GCS_BUCKET}'
- 'IMAGE_URI=${_DOCKER_IMAGE}:${COMMIT_SHA}'
dir: 'import-automation/executor'
Expand Down
14 changes: 14 additions & 0 deletions import-automation/workflow/import-automation-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ main:
memory: 32768
disk: 100
- resources: ${default(map.get(args, "resources"), defaultResources)}
- runIngestion: ${default(map.get(args, "runIngestion"), false)}
- ingestionArgs:
importList:
- ${text.split(importName, ":")[1]}
- runImportJob:
try:
call: googleapis.batch.v1.projects.locations.jobs.create
Expand Down Expand Up @@ -99,6 +103,16 @@ main:
override: false
comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}'
result: functionResponse
- runIngestion:
switch:
- condition: ${runIngestion}
steps:
- runSpannerIngestion:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.create
args:
parent: ${"projects/" + projectId + "/locations/" + region + "/workflows/spanner-ingestion-workflow"}
body:
argument: ${json.encode_to_string(ingestionArgs)}
- returnResult:
return:
jobId: ${jobId}
Expand Down
30 changes: 30 additions & 0 deletions import-automation/workflow/import-helper/import_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID')
INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper"
WORKFLOW_ID = 'spanner-ingestion-workflow'
IMPORT_AUTOMATION_WORKFLOW_ID = 'import-automation-workflow'


def invoke_ingestion_workflow(import_name: str):
"""Triggers the graph ingestion workflows.
Expand All @@ -51,6 +53,34 @@ def invoke_ingestion_workflow(import_name: str):
)


def invoke_import_workflow(import_name: str,
latest_version: str,
run_ingestion: bool = False):
"""Triggers the import automation workflow.

Args:
import_name: The name of the import.
latest_version: The version of the import.
run_ingestion: Whether to run the ingestion workflow after the import.
"""
import_config = {"user_script_args": [f"--version={latest_version}"]}
workflow_args = {
"importName": import_name,
"importConfig": json.dumps(import_config),
"runIngestion": run_ingestion
}

logging.info(f"Invoking {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}")
execution_client = executions_v1.ExecutionsClient()
parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{IMPORT_AUTOMATION_WORKFLOW_ID}"
execution_req = executions_v1.Execution(argument=json.dumps(workflow_args))
response = execution_client.create_execution(parent=parent,
execution=execution_req)
logging.info(
f"Triggered workflow {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}. Execution ID: {response.name}"
)


def update_import_status(import_name,
import_status,
import_version,
Expand Down
16 changes: 4 additions & 12 deletions import-automation/workflow/import-helper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,11 @@ def handle_feed_event(request):
return 'OK', 200

import_name = attributes.get('import_name')
import_status = 'STAGING'
latest_version = attributes.get(
'import_version',
datetime.now(timezone.utc).strftime("%Y-%m-%d"))
graph_path = attributes.get('graph_path', "/**/*.mcf*")
job_id = attributes.get('feed_name', 'cda_feed')
cron_schedule = attributes.get('cron_schedule', '')
post_process = attributes.get('post_process', '')
# Update import status in spanner
helper.update_import_status(import_name, import_status, latest_version,
graph_path, job_id, cron_schedule)

# Invoke ingestion workflow to trigger dataflow job
if post_process == 'spanner_ingestion_workflow':
helper.invoke_ingestion_workflow(import_name)
run_ingestion = True

# Invoke import job and ingestion workflow to trigger dataflow job
helper.invoke_import_workflow(import_name, latest_version, run_ingestion)
return 'OK', 200
52 changes: 52 additions & 0 deletions scripts/entities/download.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/bash
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -e

# Parse arguments
for i in "$@"; do
case $i in
--entity=*)
ENTITY="${i#*=}"
;;
--version=*)
VERSION="${i#*=}"
;;
*)
# Skip unknown options
;;
esac
done

BUCKET_NAME="datcom-prod-imports"
DIR_NAME=$(basename "$(pwd)")
GCS_FOLDER_PREFIX="scripts/${DIR_NAME}/${ENTITY}"
GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}/${VERSION}"

echo "Downloading import ${ENTITY} for version ${VERSION} from ${GCS_PATH} to $(pwd)"
mkdir -p "${ENTITY}"
gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/"
Comment thread
ajaits marked this conversation as resolved.
echo "Successfully downloaded ${ENTITY} version ${VERSION}"

# TODO: remove after scrpts are checked in
# Download scripts from GCS
SCRIPTS_GCS_PATH="gs://${BUCKET_NAME}/scripts/${DIR_NAME}/process/*"
SCRIPTS_LOCAL_PATH="../../import-automation/executor/scripts"
echo "Downloading scripts from ${SCRIPTS_GCS_PATH} to ${SCRIPTS_LOCAL_PATH}"
mkdir -p "${SCRIPTS_LOCAL_PATH}"
gcloud storage cp -r "${SCRIPTS_GCS_PATH}" "${SCRIPTS_LOCAL_PATH}/"



88 changes: 80 additions & 8 deletions scripts/entities/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@
"provenance_url": "https://datacommons.org",
"provenance_description": "Schema nodes for Data Commons",
"scripts": [
"process.py --entity=Schema"
"./download.sh --entity=Schema",
"./process.py --entity=Schema"
],
"import_inputs": [
{
"node_mcf": "**/*.mcf"
}
],
"import_inputs": [],
"source_files": [],
"cron_schedule": "15 3 * * *",
"config_override": {
"invoke_import_validation": false,
"invoke_import_tool": false,
"invoke_differ_tool": false
"invoke_import_validation": true,
"invoke_import_tool": true,
"invoke_differ_tool": true,
"skip_input_upload": true
}
},
{
Expand All @@ -27,15 +33,81 @@
"provenance_url": "https://datacommons.org",
"provenance_description": "Place nodes for Data Commons",
"scripts": [
"process.py --entity=Place"
"./download.sh --entity=Place",
"./process.py --entity=Place"
],
"import_inputs": [
{
"node_mcf": "**/*.mcf"
}
],
"source_files": [],
"cron_schedule": "15 3 * * 1",
"resource_limits": {
"cpu": 8,
"memory": 128,
"disk": 100
},
"config_override": {
"invoke_import_validation": false,
"invoke_import_tool": false,
"invoke_differ_tool": false,
"skip_input_upload": true
}
},
{
"import_name": "Provenance",
"curator_emails": [
"support@datacommons.org"
],
"provenance_url": "https://datacommons.org",
"provenance_description": "Provenance nodes for Data Commons",
"scripts": [
"./download.sh --entity=Provenance",
"./process.py --entity=Provenance"
],
"import_inputs": [
{
"node_mcf": "**/*.mcf"
}
],
"source_files": [],
"cron_schedule": "15 3 * * 1",
"config_override": {
"invoke_import_validation": false,
"invoke_import_tool": false,
"invoke_differ_tool": false,
"skip_input_upload": true
}
},
{
"import_name": "Event",
"curator_emails": [
"support@datacommons.org"
],
"provenance_url": "https://datacommons.org",
"provenance_description": "Event nodes for Data Commons",
"scripts": [
"./download.sh --entity=Event",
"./process.py --entity=Event"
],
"import_inputs": [
{
"node_mcf": "**/*.mcf"
}
],
"import_inputs": [],
"source_files": [],
"cron_schedule": "15 3 * * 1",
"resource_limits": {
"cpu": 8,
"memory": 128,
"disk": 100
},
"config_override": {
"invoke_import_validation": false,
"invoke_import_tool": false,
"invoke_differ_tool": false
"invoke_differ_tool": false,
"skip_input_upload": true
}
}
]
Expand Down
57 changes: 29 additions & 28 deletions scripts/entities/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,42 @@
from absl import app
from absl import flags
from absl import logging
import datetime
from google.cloud import storage
import os
import sys

# Add the scripts directory to sys.path
script_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..', 'import-automation',
'executor', 'scripts'))
sys.path.append(script_dir)
import generate_provisional_nodes
import convert_dc_manifest

FLAGS = flags.FLAGS
flags.DEFINE_string("entity", "Schema", "Entity type (Schema/Place).")

BUCKET_NAME = 'datcom-prod-imports'
FILE_NAME = 'staging_version.txt'


def process(entity_type: str):
# Ensure the import data is available in GCS.
current_date = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d")
logging.info(f'Checking import {entity_type} for date {current_date}')
file_path = os.path.join('scripts', os.path.basename(os.getcwd()),
entity_type, FILE_NAME)
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
blob = bucket.blob(file_path)
version = blob.download_as_text()
if version == current_date:
logging.info(
f'Successfully validated import {entity_type} for date {current_date}'
)
return 0
else:
raise RuntimeError(
f'{entity_type} data not present in GCS bucket {BUCKET_NAME} for date {current_date}'
)
flags.DEFINE_string("entity", "", "Entity type (Schema/Place).")
flags.DEFINE_string("version", "", "Import version.")


def process(entity_type: str, version: str):
logging.info(f'Processing import {entity_type} for version {version}')
local_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), entity_type, version))

if entity_type == 'Provenance':
# Local path to Provenance data
logging.info(f'Processing DC manifest files in {local_path}')
convert_dc_manifest.process_directory(local_path)

# Local path to data
logging.info(
f'Generating provisional nodes for {entity_type} in {local_path}')
generate_provisional_nodes.generate_provisional_nodes(local_path)
return 0


def main(_):
"""Runs the code."""
process(FLAGS.entity)
process(FLAGS.entity, FLAGS.version)


if __name__ == "__main__":
Expand Down
5 changes: 4 additions & 1 deletion tools/import_differ/import_differ.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def run_dataflow_job(self, project: str, job: str, current_data: str,
)
return status

def run_differ(self):
def run_differ(self) -> dict:
os.makedirs(self.output_path, exist_ok=True)
tmp_path = os.path.join(self.output_path, self.job_name)
os.makedirs(tmp_path, exist_ok=True)
Expand All @@ -368,6 +368,8 @@ def run_differ(self):
diff_path = os.path.join(self.output_path, 'schema-diff*')
logging.info("Loading schema diff data from: %s", diff_path)
schema_diff = differ_utils.load_csv_data(diff_path, tmp_path)
# TODO: populate summary for cloud mode
differ_summary = {}
else:
# Runs local Python differ.
current_dir = os.path.join(tmp_path, 'current')
Expand Down Expand Up @@ -424,6 +426,7 @@ def run_differ(self):
differ_utils.write_csv_data(obs_diff_samples, self.output_path,
'obs_diff_samples.csv', tmp_path)
logging.info(f'Differ output written to {self.output_path}')
return differ_summary
Comment thread
vish-cs marked this conversation as resolved.


def main(_):
Expand Down
2 changes: 1 addition & 1 deletion util/file_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def file_get_matching(filepat: Union[str, list]) -> list:
for file in input_files:
if file_is_local(file):
# Expand local file pattern.
for f in glob.glob(file):
for f in glob.glob(file, recursive=True):
files.add(f)
elif file_is_gcs(file):
bucket = file_get_gcs_bucket(file)
Expand Down
Loading