Skip to content

Commit 40f1d63

Browse files
committed
Fixes for review comments
1 parent 57d525a commit 40f1d63

8 files changed

Lines changed: 115 additions & 45 deletions

File tree

import-automation/executor/app/executor/import_executor.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,8 @@ def _invoke_import_differ(self, genmcf_output_path: str,
555555
absolute_import_dir: str) -> Tuple[str, str]:
556556
"""Invokes the differ tool to compare current data with previous data."""
557557
current_data_path = os.path.join(genmcf_output_path, '*.mcf')
558-
previous_data_path = latest_version + f'/{import_prefix}/genmcf/*.mcf'
558+
previous_data_path = os.path.join(latest_version, import_prefix,
559+
'genmcf', '*.mcf')
559560
diff_found = True
560561
# TODO: remove fallback logic once all imports move to new path.
561562
if not file_util.file_get_matching(previous_data_path):
@@ -715,17 +716,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
715716
import_summary.import_stats.get('validation_execution_time', 0),
716717
import_summary.import_stats.get('validation_data_size',
717718
0), validation_message)
718-
if not self.config.ignore_validation_status and not validation_status:
719-
logging.error(
720-
"Marking import as VALIDATION due to validation failure.")
721-
import_summary.status = ImportStatus.VALIDATION
722-
elif not differ_status:
723-
logging.info("Marking import as SKIP due to no data diff.")
724-
import_summary.status = ImportStatus.SKIP
725-
else:
726-
import_summary.status = ImportStatus.STAGING
727-
728-
return validation_status
719+
return validation_status, differ_status
729720

730721
def _get_validation_message(
731722
self, validation_results: List[ValidationResult]) -> str:
@@ -916,9 +907,10 @@ def _import_one_helper(
916907
import_summary=import_summary)
917908

918909
validation_status = True
910+
differ_status = True
919911
if self.config.invoke_import_validation:
920912
logging.info("Invoking import validations")
921-
validation_status = self._invoke_import_validation(
913+
validation_status, differ_status = self._invoke_import_validation(
922914
repo_dir=repo_dir,
923915
relative_import_dir=relative_import_dir,
924916
absolute_import_dir=absolute_import_dir,
@@ -932,6 +924,16 @@ def _import_one_helper(
932924
logging.info(
933925
'Skipping import validations as per import config.')
934926

927+
if not self.config.ignore_validation_status and not validation_status:
928+
logging.error(
929+
"Marking import as VALIDATION due to validation failure.")
930+
import_summary.status = ImportStatus.VALIDATION
931+
elif not differ_status:
932+
logging.info("Marking import as SKIP due to no data diff.")
933+
import_summary.status = ImportStatus.SKIP
934+
else:
935+
import_summary.status = ImportStatus.STAGING
936+
935937
import_summary.execution_time = int(time.time() - start_time)
936938
import_summary.data_volume = int(
937939
import_summary.import_stats.get('source_data_size', 0) +
@@ -987,6 +989,13 @@ def _upload_import_inputs(self, import_dir: str, output_dir: str,
987989
import_inputs: Specification of the import as a dict.
988990
989991
"""
992+
# Copy manifest file
993+
manifest_file = os.path.join(import_dir, 'manifest.json')
994+
dest = f'{output_dir}/{version}/{os.path.basename(manifest_file)}'
995+
self._upload_file_helper(
996+
src=manifest_file,
997+
dest=dest,
998+
)
990999
import_inputs = import_spec.get('import_inputs', [])
9911000
errors = []
9921001
data_size = 0
@@ -996,7 +1005,7 @@ def _upload_import_inputs(self, import_dir: str, output_dir: str,
9961005
import_files = self._get_import_input_files(import_input,
9971006
import_dir)
9981007
for file in import_files:
999-
dest = f'{output_dir}/{version}/input{input_index}/{os.path.basename(file)}'
1008+
dest = f'{output_dir}/{version}/{os.path.basename(file)}'
10001009
data_size += os.path.getsize(file)
10011010
if not self.config.skip_input_upload:
10021011
self._upload_file_helper(
@@ -1012,11 +1021,10 @@ def _upload_import_inputs(self, import_dir: str, output_dir: str,
10121021
for file in source_files:
10131022
dest = f'{output_dir}/{version}/source_files/{os.path.relpath(file, import_dir)}'
10141023
data_size += os.path.getsize(file)
1015-
if not self.config.skip_input_upload:
1016-
self._upload_file_helper(
1017-
src=file,
1018-
dest=dest,
1019-
)
1024+
self._upload_file_helper(
1025+
src=file,
1026+
dest=dest,
1027+
)
10201028

10211029
import_summary.import_stats['source_data_size'] = data_size
10221030
if errors:

import-automation/executor/cloudbuild.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ steps:
4646
python import_test.py
4747
env:
4848
- 'PROJECT_ID=$PROJECT_ID'
49-
- 'LOCATION=$LOCATION'
5049
- 'GCS_BUCKET=${_GCS_BUCKET}'
5150
- 'IMAGE_URI=${_DOCKER_IMAGE}:${COMMIT_SHA}'
5251
dir: 'import-automation/executor'

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ main:
2222
disk: 100
2323
- resources: ${default(map.get(args, "resources"), defaultResources)}
2424
- runIngestion: ${default(map.get(args, "runIngestion"), false)}
25+
- ingestionArgs:
26+
importList:
27+
- ${text.split(importName, ":")[1]}
2528
- runImportJob:
2629
try:
2730
call: googleapis.batch.v1.projects.locations.jobs.create
@@ -109,7 +112,7 @@ main:
109112
args:
110113
parent: ${"projects/" + projectId + "/locations/" + region + "/workflows/spanner-ingestion-workflow"}
111114
body:
112-
argument: ${json.encode_to_string({"importList": [text.split(importName, ":")[1]]})}
115+
argument: ${json.encode_to_string(ingestionArgs)}
113116
- returnResult:
114117
return:
115118
jobId: ${jobId}

import-automation/workflow/import-helper/main.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,10 @@ def handle_feed_event(request):
4040
return 'OK', 200
4141

4242
import_name = attributes.get('import_name')
43-
import_status = 'STAGING'
4443
latest_version = attributes.get(
4544
'import_version',
4645
datetime.now(timezone.utc).strftime("%Y-%m-%d"))
47-
graph_path = attributes.get('graph_path', "/**/*.mcf*")
48-
job_id = attributes.get('feed_name', 'cda_feed')
49-
cron_schedule = attributes.get('cron_schedule', '')
50-
run_ingestion = 'Schema' in import_name or 'Place' in import_name
51-
52-
# Update import status in spanner
53-
helper.update_import_status(import_name, import_status, latest_version,
54-
graph_path, job_id, cron_schedule)
46+
run_ingestion = True
5547

5648
# Invoke import job and ingestion workflow to trigger dataflow job
5749
helper.invoke_import_workflow(import_name, latest_version, run_ingestion)

scripts/entities/download.sh

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@ for i in "$@"; do
2020
case $i in
2121
--entity=*)
2222
ENTITY="${i#*=}"
23-
shift
2423
;;
2524
--version=*)
2625
VERSION="${i#*=}"
27-
shift
2826
;;
2927
*)
3028
# Skip unknown options
@@ -34,10 +32,21 @@ done
3432

3533
BUCKET_NAME="datcom-prod-imports"
3634
DIR_NAME=$(basename "$(pwd)")
37-
GCS_FOLDER_PREFIX="scripts/${DIR_NAME}/${ENTITY}/${VERSION}"
38-
GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}"
35+
GCS_FOLDER_PREFIX="scripts/${DIR_NAME}/${ENTITY}"
36+
GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}/${VERSION}"
3937

4038
echo "Downloading import ${ENTITY} for version ${VERSION} from ${GCS_PATH} to $(pwd)"
4139
mkdir -p "${ENTITY}"
4240
gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/"
4341
echo "Successfully downloaded ${ENTITY} version ${VERSION}"
42+
43+
# TODO: remove after scrpts are checked in
44+
# Download scripts from GCS
45+
SCRIPTS_GCS_PATH="gs://${BUCKET_NAME}/scripts/${DIR_NAME}/process/*"
46+
SCRIPTS_LOCAL_PATH="../../import-automation/executor/scripts"
47+
echo "Downloading scripts from ${SCRIPTS_GCS_PATH} to ${SCRIPTS_LOCAL_PATH}"
48+
mkdir -p "${SCRIPTS_LOCAL_PATH}"
49+
gcloud storage cp -r "${SCRIPTS_GCS_PATH}" "${SCRIPTS_LOCAL_PATH}/"
50+
51+
52+

scripts/entities/manifest.json

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
"invoke_import_validation": true,
2323
"invoke_import_tool": true,
2424
"invoke_differ_tool": true,
25-
"skip_input_upload": true
25+
"skip_input_upload": true
2626
}
2727
},
2828
{
@@ -33,9 +33,14 @@
3333
"provenance_url": "https://datacommons.org",
3434
"provenance_description": "Place nodes for Data Commons",
3535
"scripts": [
36-
"./download.sh --entity=Place"
36+
"./download.sh --entity=Place",
37+
"./process.py --entity=Place"
38+
],
39+
"import_inputs": [
40+
{
41+
"node_mcf": "**/*.mcf"
42+
}
3743
],
38-
"import_inputs": [],
3944
"source_files": [],
4045
"cron_schedule": "15 3 * * 1",
4146
"resource_limits": {
@@ -45,8 +50,9 @@
4550
},
4651
"config_override": {
4752
"invoke_import_validation": false,
48-
"invoke_import_tool": true,
49-
"invoke_differ_tool": false
53+
"invoke_import_tool": false,
54+
"invoke_differ_tool": false,
55+
"skip_input_upload": true
5056
}
5157
},
5258
{
@@ -57,7 +63,8 @@
5763
"provenance_url": "https://datacommons.org",
5864
"provenance_description": "Provenance nodes for Data Commons",
5965
"scripts": [
60-
"./download.sh --entity=Provenance"
66+
"./download.sh --entity=Provenance",
67+
"./process.py --entity=Provenance"
6168
],
6269
"import_inputs": [
6370
{
@@ -69,7 +76,38 @@
6976
"config_override": {
7077
"invoke_import_validation": false,
7178
"invoke_import_tool": false,
72-
"invoke_differ_tool": false
79+
"invoke_differ_tool": false,
80+
"skip_input_upload": true
81+
}
82+
},
83+
{
84+
"import_name": "Event",
85+
"curator_emails": [
86+
"support@datacommons.org"
87+
],
88+
"provenance_url": "https://datacommons.org",
89+
"provenance_description": "Event nodes for Data Commons",
90+
"scripts": [
91+
"./download.sh --entity=Event",
92+
"./process.py --entity=Event"
93+
],
94+
"import_inputs": [
95+
{
96+
"node_mcf": "**/*.mcf"
97+
}
98+
],
99+
"source_files": [],
100+
"cron_schedule": "15 3 * * 1",
101+
"resource_limits": {
102+
"cpu": 8,
103+
"memory": 128,
104+
"disk": 100
105+
},
106+
"config_override": {
107+
"invoke_import_validation": false,
108+
"invoke_import_tool": false,
109+
"invoke_differ_tool": false,
110+
"skip_input_upload": true
73111
}
74112
}
75113
]

scripts/entities/process.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,37 @@
1919
from absl import app
2020
from absl import flags
2121
from absl import logging
22-
import datetime
23-
from google.cloud import storage
2422
import os
23+
import sys
24+
25+
# Add the scripts directory to sys.path
26+
script_dir = os.path.abspath(
27+
os.path.join(os.path.dirname(__file__), '..', '..', 'import-automation',
28+
'executor', 'scripts'))
29+
sys.path.append(script_dir)
30+
import generate_provisional_nodes
31+
import convert_dc_manifest
2532

2633
FLAGS = flags.FLAGS
27-
flags.DEFINE_string("entity", "Schema", "Entity type (Schema/Place).")
34+
flags.DEFINE_string("entity", "", "Entity type (Schema/Place).")
2835
flags.DEFINE_string("version", "", "Import version.")
2936

3037

3138
def process(entity_type: str, version: str):
3239
logging.info(f'Processing import {entity_type} for version {version}')
33-
# TODO: add processing logic
40+
local_path = os.path.abspath(
41+
os.path.join(os.path.dirname(__file__), entity_type, version))
42+
43+
if entity_type == 'Provenance':
44+
# Local path to Provenance data
45+
logging.info(f'Processing DC manifest files in {local_path}')
46+
convert_dc_manifest.process_directory(local_path)
47+
48+
# Local path to data
49+
logging.info(
50+
f'Generating provisional nodes for {entity_type} in {local_path}')
51+
generate_provisional_nodes.generate_provisional_nodes(local_path)
52+
return 0
3453

3554

3655
def main(_):

tools/import_differ/import_differ.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,8 @@ def run_differ(self) -> dict:
368368
diff_path = os.path.join(self.output_path, 'schema-diff*')
369369
logging.info("Loading schema diff data from: %s", diff_path)
370370
schema_diff = differ_utils.load_csv_data(diff_path, tmp_path)
371+
# TODO: populate summary for cloud mode
372+
differ_summary = {}
371373
else:
372374
# Runs local Python differ.
373375
current_dir = os.path.join(tmp_path, 'current')

0 commit comments

Comments
 (0)