From 205106f949e808323c86add53cee13b48ca4d280 Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Tue, 7 Apr 2026 14:10:40 +0000 Subject: [PATCH 1/2] Add batch mode processing for entities --- import-automation/executor/app/configs.py | 2 + .../executor/app/executor/import_executor.py | 258 +++++++++--------- .../workflow/import-automation-workflow.yaml | 11 + .../workflow/import-helper/import_helper.py | 30 ++ .../workflow/import-helper/main.py | 10 +- scripts/entities/download.sh | 43 +++ scripts/entities/manifest.json | 46 +++- scripts/entities/process.py | 30 +- tools/import_differ/import_differ.py | 3 +- util/file_util.py | 2 +- 10 files changed, 271 insertions(+), 164 deletions(-) create mode 100755 scripts/entities/download.sh diff --git a/import-automation/executor/app/configs.py b/import-automation/executor/app/configs.py index f90516939f..ebfe551637 100644 --- a/import-automation/executor/app/configs.py +++ b/import-automation/executor/app/configs.py @@ -169,6 +169,8 @@ class ExecutorConfig: disable_email_notifications: bool = True # Skip uploading the data to GCS (for local testing). skip_gcs_upload: bool = False + # Skip uploading input files to GCS. + skip_input_upload: bool = False # Maximum time a blocking call to the importer to # perform an import can take in seconds. importer_import_timeout: float = 20 * 60 diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 7cf27f1d7e..c5fdc8fbed 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -422,22 +422,19 @@ def _get_import_input_files(self, import_input, absolute_import_dir): for pattern in patterns: if pattern: files = glob.glob(os.path.join(absolute_import_dir, - pattern)) + pattern), + recursive=True) if not files and not glob.has_magic(pattern): errors.append( f'No matching files for {file_type}:{pattern}') else: input_files.extend(sorted(files)) - import_prefix = '' - if input_files: - import_prefix = os.path.splitext(os.path.basename( - input_files[0]))[0] if errors: logging.fatal( f'Missing import files in {absolute_import_dir}: {errors}') raise RuntimeError( 'Import job failed due to missing user script output files.') - return input_files, import_prefix + return input_files @log_function_call def _invoke_import_tool(self, absolute_import_dir: str, @@ -453,17 +450,12 @@ def _invoke_import_tool(self, absolute_import_dir: str, import_stage = ImportStage.GENMCF import_name = import_spec['import_name'] import_inputs = import_spec.get('import_inputs', []) - import_prefix_list = [] input_index = -1 for import_input in import_inputs: input_index += 1 - input_files, import_prefix = self._get_import_input_files( - import_input, absolute_import_dir) - import_prefix_list.append(import_prefix) - if not import_prefix: - logging.error( - 'Skipping genmcf due to missing import input spec.') - continue + input_files = self._get_import_input_files(import_input, + absolute_import_dir) + import_prefix = f'input{input_index}' output_path = os.path.join(absolute_import_dir, import_name, version, import_prefix, 'genmcf') @@ -521,7 +513,6 @@ def _invoke_import_tool(self, absolute_import_dir: str, import_name, import_stage, ImportStatus.SUCCESS, import_summary.import_stats.get('genmcf_execution_time', 0), import_summary.import_stats.get('mcf_data_size', 0)) - return import_prefix_list def _get_validation_config_file(self, repo_dir: str, absolute_import_dir: str, import_spec: dict, @@ -556,10 +547,65 @@ def _get_validation_config_file(self, repo_dir: str, validation_output_path) return base_config_path + @log_function_call + def _invoke_import_differ(self, genmcf_output_path: str, + validation_output_path: str, import_name: str, + import_prefix: str, version: str, + latest_version: str, import_input: dict, + absolute_import_dir: str) -> Tuple[str, str]: + """Invokes the differ tool to compare current data with previous data.""" + current_data_path = os.path.join(genmcf_output_path, '*.mcf') + previous_data_path = latest_version + f'/{import_prefix}/genmcf/*.mcf' + diff_found = True + # TODO: remove fallback logic once all imports move to new path. + if not file_util.file_get_matching(previous_data_path): + input_files = self._get_import_input_files(import_input, + absolute_import_dir) + import_prefix = os.path.splitext(os.path.basename( + input_files[0]))[0] + previous_data_path = latest_version + f'/{import_prefix}/genmcf/*.mcf' + if not file_util.file_get_matching(previous_data_path): + previous_data_path = latest_version + f'/{import_prefix}/validation/*.mcf' + # END + + differ_output = '' + differ_job_name = 'differ' + if latest_version and len( + file_util.file_get_matching(previous_data_path)) > 0: + logging.info( + f'Invoking differ tool comparing {import_prefix} with {latest_version}' + ) + timer = Timer() + differ = ImportDiffer(current_data=current_data_path, + previous_data=previous_data_path, + output_location=validation_output_path, + project_id=self.config.gcp_project_id, + job_name=differ_job_name, + file_format='mcf', + runner_mode='local') + differ_summary = differ.run_differ() + log_metric( + AUTO_IMPORT_JOB_STAGE, "INFO", + f"Import: {import_name}, differ for {import_prefix} {latest_version} vs {version}", + { + "stage": ImportStage.DIFFER.name, + "latency": timer.time(), + "import_input": import_prefix, + "previous_version": latest_version, + "current_version": version + }) + differ_output = validation_output_path + diff_found = differ_summary['obs_diff_size'] != 0 or differ_summary[ + 'schema_diff_size'] == 0 + else: + logging.error('Skipping differ tool due to missing latest mcf file') + + return differ_output, diff_found + @log_function_call def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, absolute_import_dir: str, import_spec: dict, - version: str, import_prefix_list: list, + version: str, import_summary: ImportStatusSummary) -> bool: """ Performs validations on import data. @@ -569,20 +615,19 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, data_size = 0 import_name = import_spec['import_name'] validation_status = True + differ_status = False validation_results = [] import_dir = f'{relative_import_dir}/{import_spec["import_name"]}' latest_version = self._get_latest_version(import_dir) logging.info(f'Latest version: {latest_version}') - differ_job_name = 'differ' # Trigger validations for each tmcf/csv under import_inputs. + import_inputs = import_spec.get('import_inputs', []) input_index = -1 - for import_prefix in import_prefix_list: + for import_input in import_inputs: input_index += 1 - if not import_prefix: - logging.error('Skipping validation due to missing import spec.') - continue + import_prefix = f'input{input_index}' genmcf_output_path = os.path.join(absolute_import_dir, import_name, version, import_prefix, 'genmcf') @@ -590,65 +635,42 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, import_name, version, import_prefix, 'validation') os.makedirs(validation_output_path, exist_ok=True) - current_data_path = os.path.join(genmcf_output_path, '*.mcf') - previous_data_path = latest_version + f'/{import_prefix}/genmcf/*.mcf' - # TODO: remove fallback logic once all imports move to new path. - if latest_version and not file_util.file_get_matching( - previous_data_path): - previous_data_path = latest_version + f'/{import_prefix}/validation/*.mcf' - # END + summary_stats = os.path.join(genmcf_output_path, 'summary_report.csv') report_json = os.path.join(genmcf_output_path, 'report.json') validation_output_file = os.path.join(validation_output_path, 'validation_output.csv') - differ_output = os.path.join(validation_output_path, - 'obs_diff_summary.csv') + diff_found = True + differ_output = '' # Invoke differ and validation scripts. - differ_output_file = '' - if self.config.invoke_differ_tool and latest_version and len( - file_util.file_get_matching(previous_data_path)) > 0: - logging.info( - f'Invoking differ tool comparing {import_prefix} with {latest_version}' - ) - timer = Timer() - differ = ImportDiffer(current_data=current_data_path, - previous_data=previous_data_path, - output_location=validation_output_path, - project_id=self.config.gcp_project_id, - job_name=differ_job_name, - file_format='mcf', - runner_mode='local') - differ.run_differ() - log_metric( - AUTO_IMPORT_JOB_STAGE, "INFO", - f"Import: {import_name}, differ for {import_prefix} {latest_version} vs {version}", - { - "stage": ImportStage.DIFFER.name, - "latency": timer.time(), - "import_input": import_prefix, - "input_index": input_index, - "previous_version": latest_version, - "current_version": version - }) - differ_output_file = validation_output_path + if self.config.invoke_differ_tool: + differ_output, diff_found = self._invoke_import_differ( + genmcf_output_path=genmcf_output_path, + validation_output_path=validation_output_path, + import_name=import_name, + import_prefix=import_prefix, + version=version, + latest_version=latest_version, + import_input=import_input, + absolute_import_dir=absolute_import_dir) else: - differ_output_file = '' - logging.error( - 'Skipping differ tool due to missing latest mcf file') + logging.error('Skipping differ tool as per import config') + if not differ_status: + differ_status = diff_found timer = Timer() try: config_file_path = self._get_validation_config_file( repo_dir, absolute_import_dir, import_spec, validation_output_path) logging.info( - f'Invoking validation script with config: {config_file_path}, differ:{differ_output_file}, summary:{summary_stats}...' + f'Invoking validation script with config: {config_file_path}, differ:{differ_output}, summary:{summary_stats}...' ) validation = ValidationRunner( validation_config_path=config_file_path, - differ_output=differ_output_file, + differ_output=differ_output, stats_summary=summary_stats, lint_report=report_json, validation_output=validation_output_file) @@ -693,6 +715,16 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, import_summary.import_stats.get('validation_execution_time', 0), import_summary.import_stats.get('validation_data_size', 0), validation_message) + if not self.config.ignore_validation_status and not validation_status: + logging.error( + "Marking import as VALIDATION due to validation failure.") + import_summary.status = ImportStatus.VALIDATION + elif not differ_status: + logging.info("Marking import as SKIP due to no data diff.") + import_summary.status = ImportStatus.SKIP + else: + import_summary.status = ImportStatus.STAGING + return validation_status def _get_validation_message( @@ -725,11 +757,11 @@ def _create_mount_point(self, gcs_volume_mount_dir: str, exist_ok=True) @log_function_call - def _invoke_import_job( - self, absolute_import_dir: str, relative_import_dir: str, - import_spec: dict, version: str, interpreter_path: str, - process: subprocess.CompletedProcess, - import_summary: ImportStatusSummary) -> import_service.ImportInputs: + def _invoke_import_job(self, absolute_import_dir: str, + relative_import_dir: str, import_spec: dict, + version: str, interpreter_path: str, + process: subprocess.CompletedProcess, + import_summary: ImportStatusSummary): script_paths = import_spec.get('scripts') import_name = import_spec['import_name'] start_timer = Timer() @@ -781,7 +813,7 @@ def _invoke_import_job( import_summary.import_stats['script_execution_time'] = start_timer.time( ) - inputs = self._upload_import_inputs( + self._upload_import_inputs( import_dir=absolute_import_dir, output_dir=f'{relative_import_dir}/{import_name}', version=version, @@ -792,7 +824,6 @@ def _invoke_import_job( import_name, import_stage, ImportStatus.SUCCESS, import_summary.import_stats.get('script_execution_time', 0), import_summary.import_stats.get('source_data_size', 0)) - return inputs def _update_latest_version(self, version, output_dir, import_spec, import_summary): @@ -854,7 +885,7 @@ def _import_one_helper( self.config.requirements_filename) timer = Timer() interpreter_path, process = _create_venv( - (central_requirements_path, requirements_path), + [central_requirements_path, requirements_path], tmpdir, timeout=self.config.venv_create_timeout, ) @@ -867,18 +898,17 @@ def _import_one_helper( }) process.check_returncode() - inputs = self._invoke_import_job( - absolute_import_dir=absolute_import_dir, - relative_import_dir=relative_import_dir, - import_spec=import_spec, - version=version, - interpreter_path=interpreter_path, - process=process, - import_summary=import_summary) + self._invoke_import_job(absolute_import_dir=absolute_import_dir, + relative_import_dir=relative_import_dir, + import_spec=import_spec, + version=version, + interpreter_path=interpreter_path, + process=process, + import_summary=import_summary) if self.config.invoke_import_tool: logging.info("Invoking import tool genmcf") - import_prefix_list = self._invoke_import_tool( + self._invoke_import_tool( absolute_import_dir=absolute_import_dir, relative_import_dir=relative_import_dir, version=version, @@ -894,7 +924,6 @@ def _import_one_helper( absolute_import_dir=absolute_import_dir, import_spec=import_spec, version=version, - import_prefix_list=import_prefix_list, import_summary=import_summary) logging.info( f'Validations for version {version} completed with status: {validation_status}' @@ -910,13 +939,6 @@ def _import_one_helper( import_summary.import_stats.get('validation_data_size', 0)) logging.info(import_summary) - if self.config.ignore_validation_status or validation_status: - import_summary.status = ImportStatus.STAGING - else: - logging.error( - "Staging latest version update due to validation failure.") - import_summary.status = ImportStatus.VALIDATION - self._update_latest_version(version, output_dir, import_spec, import_summary) @@ -935,6 +957,8 @@ def _import_one_helper( # there will be no previous import logging.warning(str(exc)) + # TODO: populate inputs for smart imports + inputs = import_service.ImportInputs() self.importer.smart_import( relative_import_dir, inputs, @@ -946,10 +970,9 @@ def _import_one_helper( logging.info(f'Import workflow completed successfully!') @log_function_call - def _upload_import_inputs( - self, import_dir: str, output_dir: str, version: str, - import_spec: dict, - import_summary: ImportStatusSummary) -> import_service.ImportInputs: + def _upload_import_inputs(self, import_dir: str, output_dir: str, + version: str, import_spec: dict, + import_summary: ImportStatusSummary): """Uploads the generated import data files. Data files are uploaded to //, where is a @@ -963,42 +986,23 @@ def _upload_import_inputs( output_dir: Path to the output directory, as a string. import_inputs: Specification of the import as a dict. - Returns: - ImportInputs object containing the paths to the uploaded inputs. """ - uploaded = import_service.ImportInputs() import_inputs = import_spec.get('import_inputs', []) errors = [] data_size = 0 + input_index = -1 for import_input in import_inputs: - for input_type in self.config.import_input_types: - path = import_input.get(input_type) - if not path: - continue - paths = [path] - if isinstance(path, list): - paths = path - for path in paths: - import_file_path = os.path.join(import_dir, path) - import_files = file_util.file_get_matching(import_file_path) - if import_files: - for file in import_files: - if file: - dest = f'{output_dir}/{version}/{os.path.basename(file)}' - data_size += os.path.getsize(file) - self._upload_file_helper( - src=file, - dest=dest, - ) - uploaded_dest = f'{output_dir}/{version}/{os.path.basename(path)}' - setattr(uploaded, input_type, uploaded_dest) - elif not glob.has_magic(path): - errors.append( - f'Missing file {input_type}:{import_file_path}') - else: - logging.warning( - f'Missing output file: {input_type}:{import_file_path}' - ) + input_index += 1 + import_files = self._get_import_input_files(import_input, + import_dir) + for file in import_files: + dest = f'{output_dir}/{version}/input{input_index}/{os.path.basename(file)}' + data_size += os.path.getsize(file) + if not self.config.skip_input_upload: + self._upload_file_helper( + src=file, + dest=dest, + ) # Upload any files downloaded from source source_files = [ os.path.join(import_dir, file) @@ -1008,17 +1012,17 @@ def _upload_import_inputs( for file in source_files: dest = f'{output_dir}/{version}/source_files/{os.path.relpath(file, import_dir)}' data_size += os.path.getsize(file) - self._upload_file_helper( - src=file, - dest=dest, - ) + if not self.config.skip_input_upload: + self._upload_file_helper( + src=file, + dest=dest, + ) import_summary.import_stats['source_data_size'] = data_size if errors: logging.fatal(f'Missing user_script outputs: {errors}') raise RuntimeError( f'Import job failed due to missing output files {errors}') - return uploaded def _upload_string_helper(self, text: str, dest: str) -> None: """Uploads a text to dest and also copies locally. diff --git a/import-automation/workflow/import-automation-workflow.yaml b/import-automation/workflow/import-automation-workflow.yaml index 378e422f1b..a3bcf30f73 100644 --- a/import-automation/workflow/import-automation-workflow.yaml +++ b/import-automation/workflow/import-automation-workflow.yaml @@ -21,6 +21,7 @@ main: memory: 32768 disk: 100 - resources: ${default(map.get(args, "resources"), defaultResources)} + - runIngestion: ${default(map.get(args, "runIngestion"), false)} - runImportJob: try: call: googleapis.batch.v1.projects.locations.jobs.create @@ -99,6 +100,16 @@ main: override: false comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}' result: functionResponse + - runIngestion: + switch: + - condition: ${runIngestion} + steps: + - runSpannerIngestion: + call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.create + args: + parent: ${"projects/" + projectId + "/locations/" + region + "/workflows/spanner-ingestion-workflow"} + body: + argument: ${json.encode_to_string({"importList": [text.split(importName, ":")[1]]})} - returnResult: return: jobId: ${jobId} diff --git a/import-automation/workflow/import-helper/import_helper.py b/import-automation/workflow/import-helper/import_helper.py index dd1be52343..ca37299fac 100644 --- a/import-automation/workflow/import-helper/import_helper.py +++ b/import-automation/workflow/import-helper/import_helper.py @@ -31,6 +31,8 @@ GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID') INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper" WORKFLOW_ID = 'spanner-ingestion-workflow' +IMPORT_AUTOMATION_WORKFLOW_ID = 'import-automation-workflow' + def invoke_ingestion_workflow(import_name: str): """Triggers the graph ingestion workflows. @@ -51,6 +53,34 @@ def invoke_ingestion_workflow(import_name: str): ) +def invoke_import_workflow(import_name: str, + latest_version: str, + run_ingestion: bool = False): + """Triggers the import automation workflow. + + Args: + import_name: The name of the import. + latest_version: The version of the import. + run_ingestion: Whether to run the ingestion workflow after the import. + """ + import_config = {"user_script_args": [f"--version={latest_version}"]} + workflow_args = { + "importName": import_name, + "importConfig": json.dumps(import_config), + "runIngestion": run_ingestion + } + + logging.info(f"Invoking {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}") + execution_client = executions_v1.ExecutionsClient() + parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{IMPORT_AUTOMATION_WORKFLOW_ID}" + execution_req = executions_v1.Execution(argument=json.dumps(workflow_args)) + response = execution_client.create_execution(parent=parent, + execution=execution_req) + logging.info( + f"Triggered workflow {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}. Execution ID: {response.name}" + ) + + def update_import_status(import_name, import_status, import_version, diff --git a/import-automation/workflow/import-helper/main.py b/import-automation/workflow/import-helper/main.py index d9cb4ec1bc..b7f8729bdc 100644 --- a/import-automation/workflow/import-helper/main.py +++ b/import-automation/workflow/import-helper/main.py @@ -47,12 +47,12 @@ def handle_feed_event(request): graph_path = attributes.get('graph_path', "/**/*.mcf*") job_id = attributes.get('feed_name', 'cda_feed') cron_schedule = attributes.get('cron_schedule', '') - post_process = attributes.get('post_process', '') + run_ingestion = 'Schema' in import_name or 'Place' in import_name + # Update import status in spanner helper.update_import_status(import_name, import_status, latest_version, - graph_path, job_id, cron_schedule) + graph_path, job_id, cron_schedule) - # Invoke ingestion workflow to trigger dataflow job - if post_process == 'spanner_ingestion_workflow': - helper.invoke_ingestion_workflow(import_name) + # Invoke import job and ingestion workflow to trigger dataflow job + helper.invoke_import_workflow(import_name, latest_version, run_ingestion) return 'OK', 200 diff --git a/scripts/entities/download.sh b/scripts/entities/download.sh new file mode 100755 index 0000000000..0da05c233b --- /dev/null +++ b/scripts/entities/download.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +# Parse arguments +for i in "$@"; do + case $i in + --entity=*) + ENTITY="${i#*=}" + shift + ;; + --version=*) + VERSION="${i#*=}" + shift + ;; + *) + # Skip unknown options + ;; + esac +done + +BUCKET_NAME="datcom-prod-imports" +DIR_NAME=$(basename "$(pwd)") +GCS_FOLDER_PREFIX="scripts/${DIR_NAME}/${ENTITY}/${VERSION}" +GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}" + +echo "Downloading import ${ENTITY} for version ${VERSION} from ${GCS_PATH} to $(pwd)" +mkdir -p "${ENTITY}" +gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/" +echo "Successfully downloaded ${ENTITY} version ${VERSION}" diff --git a/scripts/entities/manifest.json b/scripts/entities/manifest.json index 0a363442ba..7a65f3d7e9 100644 --- a/scripts/entities/manifest.json +++ b/scripts/entities/manifest.json @@ -8,15 +8,21 @@ "provenance_url": "https://datacommons.org", "provenance_description": "Schema nodes for Data Commons", "scripts": [ - "process.py --entity=Schema" + "./download.sh --entity=Schema", + "./process.py --entity=Schema" + ], + "import_inputs": [ + { + "node_mcf": "**/*.mcf" + } ], - "import_inputs": [], "source_files": [], "cron_schedule": "15 3 * * *", "config_override": { - "invoke_import_validation": false, - "invoke_import_tool": false, - "invoke_differ_tool": false + "invoke_import_validation": true, + "invoke_import_tool": true, + "invoke_differ_tool": true, + "skip_input_upload": true } }, { @@ -27,11 +33,39 @@ "provenance_url": "https://datacommons.org", "provenance_description": "Place nodes for Data Commons", "scripts": [ - "process.py --entity=Place" + "./download.sh --entity=Place" ], "import_inputs": [], "source_files": [], "cron_schedule": "15 3 * * 1", + "resource_limits": { + "cpu": 8, + "memory": 128, + "disk": 100 + }, + "config_override": { + "invoke_import_validation": false, + "invoke_import_tool": true, + "invoke_differ_tool": false + } + }, + { + "import_name": "Provenance", + "curator_emails": [ + "support@datacommons.org" + ], + "provenance_url": "https://datacommons.org", + "provenance_description": "Provenance nodes for Data Commons", + "scripts": [ + "./download.sh --entity=Provenance" + ], + "import_inputs": [ + { + "node_mcf": "**/*.mcf" + } + ], + "source_files": [], + "cron_schedule": "15 3 * * 1", "config_override": { "invoke_import_validation": false, "invoke_import_tool": false, diff --git a/scripts/entities/process.py b/scripts/entities/process.py index fba269f2f5..16e8b53abe 100644 --- a/scripts/entities/process.py +++ b/scripts/entities/process.py @@ -25,35 +25,17 @@ FLAGS = flags.FLAGS flags.DEFINE_string("entity", "Schema", "Entity type (Schema/Place).") +flags.DEFINE_string("version", "", "Import version.") -BUCKET_NAME = 'datcom-prod-imports' -FILE_NAME = 'staging_version.txt' - - -def process(entity_type: str): - # Ensure the import data is available in GCS. - current_date = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d") - logging.info(f'Checking import {entity_type} for date {current_date}') - file_path = os.path.join('scripts', os.path.basename(os.getcwd()), - entity_type, FILE_NAME) - storage_client = storage.Client() - bucket = storage_client.bucket(BUCKET_NAME) - blob = bucket.blob(file_path) - version = blob.download_as_text() - if version == current_date: - logging.info( - f'Successfully validated import {entity_type} for date {current_date}' - ) - return 0 - else: - raise RuntimeError( - f'{entity_type} data not present in GCS bucket {BUCKET_NAME} for date {current_date}' - ) + +def process(entity_type: str, version: str): + logging.info(f'Processing import {entity_type} for version {version}') + # TODO: add processing logic def main(_): """Runs the code.""" - process(FLAGS.entity) + process(FLAGS.entity, FLAGS.version) if __name__ == "__main__": diff --git a/tools/import_differ/import_differ.py b/tools/import_differ/import_differ.py index c9878685d0..53327fd7ff 100644 --- a/tools/import_differ/import_differ.py +++ b/tools/import_differ/import_differ.py @@ -347,7 +347,7 @@ def run_dataflow_job(self, project: str, job: str, current_data: str, ) return status - def run_differ(self): + def run_differ(self) -> dict: os.makedirs(self.output_path, exist_ok=True) tmp_path = os.path.join(self.output_path, self.job_name) os.makedirs(tmp_path, exist_ok=True) @@ -424,6 +424,7 @@ def run_differ(self): differ_utils.write_csv_data(obs_diff_samples, self.output_path, 'obs_diff_samples.csv', tmp_path) logging.info(f'Differ output written to {self.output_path}') + return differ_summary def main(_): diff --git a/util/file_util.py b/util/file_util.py index 9fde1a2984..c255bc365a 100644 --- a/util/file_util.py +++ b/util/file_util.py @@ -348,7 +348,7 @@ def file_get_matching(filepat: Union[str, list]) -> list: for file in input_files: if file_is_local(file): # Expand local file pattern. - for f in glob.glob(file): + for f in glob.glob(file, recursive=True): files.add(f) elif file_is_gcs(file): bucket = file_get_gcs_bucket(file) From 3df012c5a63def277647cbb8f12b3d7a4ede2f9e Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Thu, 9 Apr 2026 15:20:04 +0000 Subject: [PATCH 2/2] Fixes for review comments --- .../executor/app/executor/import_executor.py | 46 +++++++++------- import-automation/executor/cloudbuild.yaml | 1 - .../workflow/import-automation-workflow.yaml | 5 +- .../workflow/import-helper/main.py | 10 +--- scripts/entities/download.sh | 17 ++++-- scripts/entities/manifest.json | 52 ++++++++++++++++--- scripts/entities/process.py | 27 ++++++++-- tools/import_differ/import_differ.py | 2 + 8 files changed, 115 insertions(+), 45 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index c5fdc8fbed..fb7e063f55 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -555,7 +555,8 @@ def _invoke_import_differ(self, genmcf_output_path: str, absolute_import_dir: str) -> Tuple[str, str]: """Invokes the differ tool to compare current data with previous data.""" current_data_path = os.path.join(genmcf_output_path, '*.mcf') - previous_data_path = latest_version + f'/{import_prefix}/genmcf/*.mcf' + previous_data_path = os.path.join(latest_version, import_prefix, + 'genmcf', '*.mcf') diff_found = True # TODO: remove fallback logic once all imports move to new path. if not file_util.file_get_matching(previous_data_path): @@ -715,17 +716,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, import_summary.import_stats.get('validation_execution_time', 0), import_summary.import_stats.get('validation_data_size', 0), validation_message) - if not self.config.ignore_validation_status and not validation_status: - logging.error( - "Marking import as VALIDATION due to validation failure.") - import_summary.status = ImportStatus.VALIDATION - elif not differ_status: - logging.info("Marking import as SKIP due to no data diff.") - import_summary.status = ImportStatus.SKIP - else: - import_summary.status = ImportStatus.STAGING - - return validation_status + return validation_status, differ_status def _get_validation_message( self, validation_results: List[ValidationResult]) -> str: @@ -916,9 +907,10 @@ def _import_one_helper( import_summary=import_summary) validation_status = True + differ_status = True if self.config.invoke_import_validation: logging.info("Invoking import validations") - validation_status = self._invoke_import_validation( + validation_status, differ_status = self._invoke_import_validation( repo_dir=repo_dir, relative_import_dir=relative_import_dir, absolute_import_dir=absolute_import_dir, @@ -932,6 +924,16 @@ def _import_one_helper( logging.info( 'Skipping import validations as per import config.') + if not self.config.ignore_validation_status and not validation_status: + logging.error( + "Marking import as VALIDATION due to validation failure.") + import_summary.status = ImportStatus.VALIDATION + elif not differ_status: + logging.info("Marking import as SKIP due to no data diff.") + import_summary.status = ImportStatus.SKIP + else: + import_summary.status = ImportStatus.STAGING + import_summary.execution_time = int(time.time() - start_time) import_summary.data_volume = int( import_summary.import_stats.get('source_data_size', 0) + @@ -987,6 +989,13 @@ def _upload_import_inputs(self, import_dir: str, output_dir: str, import_inputs: Specification of the import as a dict. """ + # Copy manifest file + manifest_file = os.path.join(import_dir, 'manifest.json') + dest = f'{output_dir}/{version}/{os.path.basename(manifest_file)}' + self._upload_file_helper( + src=manifest_file, + dest=dest, + ) import_inputs = import_spec.get('import_inputs', []) errors = [] data_size = 0 @@ -996,7 +1005,7 @@ def _upload_import_inputs(self, import_dir: str, output_dir: str, import_files = self._get_import_input_files(import_input, import_dir) for file in import_files: - dest = f'{output_dir}/{version}/input{input_index}/{os.path.basename(file)}' + dest = f'{output_dir}/{version}/{os.path.basename(file)}' data_size += os.path.getsize(file) if not self.config.skip_input_upload: self._upload_file_helper( @@ -1012,11 +1021,10 @@ def _upload_import_inputs(self, import_dir: str, output_dir: str, for file in source_files: dest = f'{output_dir}/{version}/source_files/{os.path.relpath(file, import_dir)}' data_size += os.path.getsize(file) - if not self.config.skip_input_upload: - self._upload_file_helper( - src=file, - dest=dest, - ) + self._upload_file_helper( + src=file, + dest=dest, + ) import_summary.import_stats['source_data_size'] = data_size if errors: diff --git a/import-automation/executor/cloudbuild.yaml b/import-automation/executor/cloudbuild.yaml index 0d65c659df..861ce86189 100644 --- a/import-automation/executor/cloudbuild.yaml +++ b/import-automation/executor/cloudbuild.yaml @@ -46,7 +46,6 @@ steps: python import_test.py env: - 'PROJECT_ID=$PROJECT_ID' - - 'LOCATION=$LOCATION' - 'GCS_BUCKET=${_GCS_BUCKET}' - 'IMAGE_URI=${_DOCKER_IMAGE}:${COMMIT_SHA}' dir: 'import-automation/executor' diff --git a/import-automation/workflow/import-automation-workflow.yaml b/import-automation/workflow/import-automation-workflow.yaml index a3bcf30f73..09bc07e4e5 100644 --- a/import-automation/workflow/import-automation-workflow.yaml +++ b/import-automation/workflow/import-automation-workflow.yaml @@ -22,6 +22,9 @@ main: disk: 100 - resources: ${default(map.get(args, "resources"), defaultResources)} - runIngestion: ${default(map.get(args, "runIngestion"), false)} + - ingestionArgs: + importList: + - ${text.split(importName, ":")[1]} - runImportJob: try: call: googleapis.batch.v1.projects.locations.jobs.create @@ -109,7 +112,7 @@ main: args: parent: ${"projects/" + projectId + "/locations/" + region + "/workflows/spanner-ingestion-workflow"} body: - argument: ${json.encode_to_string({"importList": [text.split(importName, ":")[1]]})} + argument: ${json.encode_to_string(ingestionArgs)} - returnResult: return: jobId: ${jobId} diff --git a/import-automation/workflow/import-helper/main.py b/import-automation/workflow/import-helper/main.py index b7f8729bdc..1fb5020202 100644 --- a/import-automation/workflow/import-helper/main.py +++ b/import-automation/workflow/import-helper/main.py @@ -40,18 +40,10 @@ def handle_feed_event(request): return 'OK', 200 import_name = attributes.get('import_name') - import_status = 'STAGING' latest_version = attributes.get( 'import_version', datetime.now(timezone.utc).strftime("%Y-%m-%d")) - graph_path = attributes.get('graph_path', "/**/*.mcf*") - job_id = attributes.get('feed_name', 'cda_feed') - cron_schedule = attributes.get('cron_schedule', '') - run_ingestion = 'Schema' in import_name or 'Place' in import_name - - # Update import status in spanner - helper.update_import_status(import_name, import_status, latest_version, - graph_path, job_id, cron_schedule) + run_ingestion = True # Invoke import job and ingestion workflow to trigger dataflow job helper.invoke_import_workflow(import_name, latest_version, run_ingestion) diff --git a/scripts/entities/download.sh b/scripts/entities/download.sh index 0da05c233b..700c08ce02 100755 --- a/scripts/entities/download.sh +++ b/scripts/entities/download.sh @@ -20,11 +20,9 @@ for i in "$@"; do case $i in --entity=*) ENTITY="${i#*=}" - shift ;; --version=*) VERSION="${i#*=}" - shift ;; *) # Skip unknown options @@ -34,10 +32,21 @@ done BUCKET_NAME="datcom-prod-imports" DIR_NAME=$(basename "$(pwd)") -GCS_FOLDER_PREFIX="scripts/${DIR_NAME}/${ENTITY}/${VERSION}" -GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}" +GCS_FOLDER_PREFIX="scripts/${DIR_NAME}/${ENTITY}" +GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}/${VERSION}" echo "Downloading import ${ENTITY} for version ${VERSION} from ${GCS_PATH} to $(pwd)" mkdir -p "${ENTITY}" gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/" echo "Successfully downloaded ${ENTITY} version ${VERSION}" + +# TODO: remove after scrpts are checked in +# Download scripts from GCS +SCRIPTS_GCS_PATH="gs://${BUCKET_NAME}/scripts/${DIR_NAME}/process/*" +SCRIPTS_LOCAL_PATH="../../import-automation/executor/scripts" +echo "Downloading scripts from ${SCRIPTS_GCS_PATH} to ${SCRIPTS_LOCAL_PATH}" +mkdir -p "${SCRIPTS_LOCAL_PATH}" +gcloud storage cp -r "${SCRIPTS_GCS_PATH}" "${SCRIPTS_LOCAL_PATH}/" + + + diff --git a/scripts/entities/manifest.json b/scripts/entities/manifest.json index 7a65f3d7e9..85d0983789 100644 --- a/scripts/entities/manifest.json +++ b/scripts/entities/manifest.json @@ -22,7 +22,7 @@ "invoke_import_validation": true, "invoke_import_tool": true, "invoke_differ_tool": true, - "skip_input_upload": true + "skip_input_upload": true } }, { @@ -33,9 +33,14 @@ "provenance_url": "https://datacommons.org", "provenance_description": "Place nodes for Data Commons", "scripts": [ - "./download.sh --entity=Place" + "./download.sh --entity=Place", + "./process.py --entity=Place" + ], + "import_inputs": [ + { + "node_mcf": "**/*.mcf" + } ], - "import_inputs": [], "source_files": [], "cron_schedule": "15 3 * * 1", "resource_limits": { @@ -45,8 +50,9 @@ }, "config_override": { "invoke_import_validation": false, - "invoke_import_tool": true, - "invoke_differ_tool": false + "invoke_import_tool": false, + "invoke_differ_tool": false, + "skip_input_upload": true } }, { @@ -57,7 +63,8 @@ "provenance_url": "https://datacommons.org", "provenance_description": "Provenance nodes for Data Commons", "scripts": [ - "./download.sh --entity=Provenance" + "./download.sh --entity=Provenance", + "./process.py --entity=Provenance" ], "import_inputs": [ { @@ -69,7 +76,38 @@ "config_override": { "invoke_import_validation": false, "invoke_import_tool": false, - "invoke_differ_tool": false + "invoke_differ_tool": false, + "skip_input_upload": true + } + }, + { + "import_name": "Event", + "curator_emails": [ + "support@datacommons.org" + ], + "provenance_url": "https://datacommons.org", + "provenance_description": "Event nodes for Data Commons", + "scripts": [ + "./download.sh --entity=Event", + "./process.py --entity=Event" + ], + "import_inputs": [ + { + "node_mcf": "**/*.mcf" + } + ], + "source_files": [], + "cron_schedule": "15 3 * * 1", + "resource_limits": { + "cpu": 8, + "memory": 128, + "disk": 100 + }, + "config_override": { + "invoke_import_validation": false, + "invoke_import_tool": false, + "invoke_differ_tool": false, + "skip_input_upload": true } } ] diff --git a/scripts/entities/process.py b/scripts/entities/process.py index 16e8b53abe..f86b9f877b 100644 --- a/scripts/entities/process.py +++ b/scripts/entities/process.py @@ -19,18 +19,37 @@ from absl import app from absl import flags from absl import logging -import datetime -from google.cloud import storage import os +import sys + +# Add the scripts directory to sys.path +script_dir = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', '..', 'import-automation', + 'executor', 'scripts')) +sys.path.append(script_dir) +import generate_provisional_nodes +import convert_dc_manifest FLAGS = flags.FLAGS -flags.DEFINE_string("entity", "Schema", "Entity type (Schema/Place).") +flags.DEFINE_string("entity", "", "Entity type (Schema/Place).") flags.DEFINE_string("version", "", "Import version.") def process(entity_type: str, version: str): logging.info(f'Processing import {entity_type} for version {version}') - # TODO: add processing logic + local_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), entity_type, version)) + + if entity_type == 'Provenance': + # Local path to Provenance data + logging.info(f'Processing DC manifest files in {local_path}') + convert_dc_manifest.process_directory(local_path) + + # Local path to data + logging.info( + f'Generating provisional nodes for {entity_type} in {local_path}') + generate_provisional_nodes.generate_provisional_nodes(local_path) + return 0 def main(_): diff --git a/tools/import_differ/import_differ.py b/tools/import_differ/import_differ.py index 53327fd7ff..e0e2a1060c 100644 --- a/tools/import_differ/import_differ.py +++ b/tools/import_differ/import_differ.py @@ -368,6 +368,8 @@ def run_differ(self) -> dict: diff_path = os.path.join(self.output_path, 'schema-diff*') logging.info("Loading schema diff data from: %s", diff_path) schema_diff = differ_utils.load_csv_data(diff_path, tmp_path) + # TODO: populate summary for cloud mode + differ_summary = {} else: # Runs local Python differ. current_dir = os.path.join(tmp_path, 'current')