Skip to content

Commit aa9ed43

Browse files
committed
Add batch mode for schema
1 parent 587bf09 commit aa9ed43

10 files changed

Lines changed: 191 additions & 114 deletions

File tree

import-automation/executor/app/executor/import_executor.py

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -422,22 +422,19 @@ def _get_import_input_files(self, import_input, absolute_import_dir):
422422
for pattern in patterns:
423423
if pattern:
424424
files = glob.glob(os.path.join(absolute_import_dir,
425-
pattern))
425+
pattern),
426+
recursive=True)
426427
if not files and not glob.has_magic(pattern):
427428
errors.append(
428429
f'No matching files for {file_type}:{pattern}')
429430
else:
430431
input_files.extend(sorted(files))
431-
import_prefix = ''
432-
if input_files:
433-
import_prefix = os.path.splitext(os.path.basename(
434-
input_files[0]))[0]
435432
if errors:
436433
logging.fatal(
437434
f'Missing import files in {absolute_import_dir}: {errors}')
438435
raise RuntimeError(
439436
'Import job failed due to missing user script output files.')
440-
return input_files, import_prefix
437+
return input_files
441438

442439
@log_function_call
443440
def _invoke_import_tool(self, absolute_import_dir: str,
@@ -453,17 +450,12 @@ def _invoke_import_tool(self, absolute_import_dir: str,
453450
import_stage = ImportStage.GENMCF
454451
import_name = import_spec['import_name']
455452
import_inputs = import_spec.get('import_inputs', [])
456-
import_prefix_list = []
457453
input_index = -1
458454
for import_input in import_inputs:
459455
input_index += 1
460-
input_files, import_prefix = self._get_import_input_files(
461-
import_input, absolute_import_dir)
462-
import_prefix_list.append(import_prefix)
463-
if not import_prefix:
464-
logging.error(
465-
'Skipping genmcf due to missing import input spec.')
466-
continue
456+
input_files = self._get_import_input_files(import_input,
457+
absolute_import_dir)
458+
import_prefix = f'input{input_index}'
467459
output_path = os.path.join(absolute_import_dir, import_name,
468460
version, import_prefix, 'genmcf')
469461

@@ -521,7 +513,6 @@ def _invoke_import_tool(self, absolute_import_dir: str,
521513
import_name, import_stage, ImportStatus.SUCCESS,
522514
import_summary.import_stats.get('genmcf_execution_time', 0),
523515
import_summary.import_stats.get('mcf_data_size', 0))
524-
return import_prefix_list
525516

526517
def _get_validation_config_file(self, repo_dir: str,
527518
absolute_import_dir: str, import_spec: dict,
@@ -559,7 +550,7 @@ def _get_validation_config_file(self, repo_dir: str,
559550
@log_function_call
560551
def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
561552
absolute_import_dir: str, import_spec: dict,
562-
version: str, import_prefix_list: list,
553+
version: str,
563554
import_summary: ImportStatusSummary) -> bool:
564555
"""
565556
Performs validations on import data.
@@ -577,12 +568,11 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
577568
differ_job_name = 'differ'
578569

579570
# Trigger validations for each tmcf/csv under import_inputs.
571+
import_inputs = import_spec.get('import_inputs', [])
580572
input_index = -1
581-
for import_prefix in import_prefix_list:
573+
for import_input in import_inputs:
582574
input_index += 1
583-
if not import_prefix:
584-
logging.error('Skipping validation due to missing import spec.')
585-
continue
575+
import_prefix = f'input{input_index}'
586576

587577
genmcf_output_path = os.path.join(absolute_import_dir, import_name,
588578
version, import_prefix, 'genmcf')
@@ -593,17 +583,20 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
593583
current_data_path = os.path.join(genmcf_output_path, '*.mcf')
594584
previous_data_path = latest_version + f'/{import_prefix}/genmcf/*.mcf'
595585
# TODO: remove fallback logic once all imports move to new path.
596-
if latest_version and not file_util.file_get_matching(
597-
previous_data_path):
586+
if not file_util.file_get_matching(previous_data_path):
587+
input_files = self._get_import_input_files(
588+
import_input, absolute_import_dir)
589+
import_prefix = os.path.splitext(
590+
os.path.basename(input_files[0]))[0]
591+
previous_data_path = latest_version + f'/{import_prefix}/genmcf/*.mcf'
592+
if not file_util.file_get_matching(previous_data_path):
598593
previous_data_path = latest_version + f'/{import_prefix}/validation/*.mcf'
599594
# END
600595
summary_stats = os.path.join(genmcf_output_path,
601596
'summary_report.csv')
602597
report_json = os.path.join(genmcf_output_path, 'report.json')
603598
validation_output_file = os.path.join(validation_output_path,
604599
'validation_output.csv')
605-
differ_output = os.path.join(validation_output_path,
606-
'obs_diff_summary.csv')
607600

608601
# Invoke differ and validation scripts.
609602
differ_output_file = ''
@@ -620,7 +613,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
620613
job_name=differ_job_name,
621614
file_format='mcf',
622615
runner_mode='local')
623-
differ.run_differ()
616+
differ_summary = differ.run_differ()
624617
log_metric(
625618
AUTO_IMPORT_JOB_STAGE, "INFO",
626619
f"Import: {import_name}, differ for {import_prefix} {latest_version} vs {version}",
@@ -633,6 +626,11 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
633626
"current_version": version
634627
})
635628
differ_output_file = validation_output_path
629+
if differ_summary.get('obs_diff_size',
630+
'0') == 0 and differ_summary.get(
631+
'schema_diff_size', '0') == 0:
632+
import_summary.status = ImportStatus.SKIP
633+
logging.info("Marking import as SKIP due to empty diff.")
636634
else:
637635
differ_output_file = ''
638636
logging.error(
@@ -693,6 +691,13 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
693691
import_summary.import_stats.get('validation_execution_time', 0),
694692
import_summary.import_stats.get('validation_data_size',
695693
0), validation_message)
694+
if self.config.ignore_validation_status or validation_status:
695+
import_summary.status = ImportStatus.STAGING
696+
else:
697+
logging.error(
698+
"Marking import as VALIDATION due to validation failure.")
699+
import_summary.status = ImportStatus.VALIDATION
700+
696701
return validation_status
697702

698703
def _get_validation_message(
@@ -853,11 +858,13 @@ def _import_one_helper(
853858
repo_dir, 'import-automation', 'executor',
854859
self.config.requirements_filename)
855860
timer = Timer()
856-
interpreter_path, process = _create_venv(
857-
(central_requirements_path, requirements_path),
858-
tmpdir,
859-
timeout=self.config.venv_create_timeout,
860-
)
861+
interpreter_path = sys.executable
862+
process = subprocess.CompletedProcess(args=[], returncode=0)
863+
# interpreter_path, process = _create_venv(
864+
# [requirements_path],
865+
# tmpdir,
866+
# timeout=self.config.venv_create_timeout,
867+
# )
861868

862869
_log_process(process=process,
863870
import_name=import_name,
@@ -878,7 +885,7 @@ def _import_one_helper(
878885

879886
if self.config.invoke_import_tool:
880887
logging.info("Invoking import tool genmcf")
881-
import_prefix_list = self._invoke_import_tool(
888+
self._invoke_import_tool(
882889
absolute_import_dir=absolute_import_dir,
883890
relative_import_dir=relative_import_dir,
884891
version=version,
@@ -894,7 +901,6 @@ def _import_one_helper(
894901
absolute_import_dir=absolute_import_dir,
895902
import_spec=import_spec,
896903
version=version,
897-
import_prefix_list=import_prefix_list,
898904
import_summary=import_summary)
899905
logging.info(
900906
f'Validations for version {version} completed with status: {validation_status}'
@@ -910,13 +916,6 @@ def _import_one_helper(
910916
import_summary.import_stats.get('validation_data_size', 0))
911917
logging.info(import_summary)
912918

913-
if self.config.ignore_validation_status or validation_status:
914-
import_summary.status = ImportStatus.STAGING
915-
else:
916-
logging.error(
917-
"Staging latest version update due to validation failure.")
918-
import_summary.status = ImportStatus.VALIDATION
919-
920919
self._update_latest_version(version, output_dir, import_spec,
921920
import_summary)
922921

@@ -970,7 +969,9 @@ def _upload_import_inputs(
970969
import_inputs = import_spec.get('import_inputs', [])
971970
errors = []
972971
data_size = 0
972+
input_index = -1
973973
for import_input in import_inputs:
974+
input_index += 1
974975
for input_type in self.config.import_input_types:
975976
path = import_input.get(input_type)
976977
if not path:
@@ -984,13 +985,13 @@ def _upload_import_inputs(
984985
if import_files:
985986
for file in import_files:
986987
if file:
987-
dest = f'{output_dir}/{version}/{os.path.basename(file)}'
988+
dest = f'{output_dir}/{version}/input{input_index}/{os.path.basename(file)}'
988989
data_size += os.path.getsize(file)
989990
self._upload_file_helper(
990991
src=file,
991992
dest=dest,
992993
)
993-
uploaded_dest = f'{output_dir}/{version}/{os.path.basename(path)}'
994+
uploaded_dest = f'{output_dir}/{version}/input{input_index}/{os.path.basename(path)}'
994995
setattr(uploaded, input_type, uploaded_dest)
995996
elif not glob.has_magic(path):
996997
errors.append(

import-automation/executor/cloudbuild.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ steps:
4646
python import_test.py
4747
env:
4848
- 'PROJECT_ID=$PROJECT_ID'
49-
- 'LOCATION=$LOCATION'
5049
- 'GCS_BUCKET=${_GCS_BUCKET}'
5150
- 'IMAGE_URI=${_DOCKER_IMAGE}:${COMMIT_SHA}'
5251
dir: 'import-automation/executor'

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ main:
2121
memory: 32768
2222
disk: 100
2323
- resources: ${default(map.get(args, "resources"), defaultResources)}
24+
- runIngestion: ${default(map.get(args, "runIngestion"), false)}
2425
- runImportJob:
2526
try:
2627
call: googleapis.batch.v1.projects.locations.jobs.create
@@ -99,6 +100,16 @@ main:
99100
override: false
100101
comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}'
101102
result: functionResponse
103+
- runIngestion:
104+
switch:
105+
- condition: ${runIngestion}
106+
steps:
107+
- runSpannerIngestion:
108+
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.create
109+
args:
110+
parent: ${"projects/" + projectId + "/locations/" + region + "/workflows/spanner-ingestion-workflow"}
111+
body:
112+
argument: ${json.encode_to_string({"importList": [text.split(importName, ":")[1]]})}
102113
- returnResult:
103114
return:
104115
jobId: ${jobId}

import-automation/workflow/import-helper/import_helper.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID')
3232
INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper"
3333
WORKFLOW_ID = 'spanner-ingestion-workflow'
34+
IMPORT_AUTOMATION_WORKFLOW_ID = 'import-automation-workflow'
35+
3436

3537
def invoke_ingestion_workflow(import_name: str):
3638
"""Triggers the graph ingestion workflows.
@@ -51,6 +53,34 @@ def invoke_ingestion_workflow(import_name: str):
5153
)
5254

5355

56+
def invoke_import_workflow(import_name: str,
57+
latest_version: str,
58+
run_ingestion: bool = False):
59+
"""Triggers the import automation workflow.
60+
61+
Args:
62+
import_name: The name of the import.
63+
latest_version: The version of the import.
64+
run_ingestion: Whether to run the ingestion workflow after the import.
65+
"""
66+
import_config = {"user_script_args": [f"--version={latest_version}"]}
67+
workflow_args = {
68+
"importName": import_name,
69+
"importConfig": json.dumps(import_config),
70+
"runIngestion": run_ingestion
71+
}
72+
73+
logging.info(f"Invoking {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}")
74+
execution_client = executions_v1.ExecutionsClient()
75+
parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{IMPORT_AUTOMATION_WORKFLOW_ID}"
76+
execution_req = executions_v1.Execution(argument=json.dumps(workflow_args))
77+
response = execution_client.create_execution(parent=parent,
78+
execution=execution_req)
79+
logging.info(
80+
f"Triggered workflow {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}. Execution ID: {response.name}"
81+
)
82+
83+
5484
def update_import_status(import_name,
5585
import_status,
5686
import_version,

import-automation/workflow/import-helper/main.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ def handle_feed_event(request):
4747
graph_path = attributes.get('graph_path', "/**/*.mcf*")
4848
job_id = attributes.get('feed_name', 'cda_feed')
4949
cron_schedule = attributes.get('cron_schedule', '')
50-
post_process = attributes.get('post_process', '')
50+
run_ingestion = 'Schema' in import_name or 'Place' in import_name
51+
5152
# Update import status in spanner
5253
helper.update_import_status(import_name, import_status, latest_version,
53-
graph_path, job_id, cron_schedule)
54+
graph_path, job_id, cron_schedule)
5455

55-
# Invoke ingestion workflow to trigger dataflow job
56-
if post_process == 'spanner_ingestion_workflow':
57-
helper.invoke_ingestion_workflow(import_name)
56+
# Invoke import job and ingestion workflow to trigger dataflow job
57+
helper.invoke_import_workflow(import_name, latest_version, run_ingestion)
5858
return 'OK', 200

scripts/entities/manifest.json

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@
88
"provenance_url": "https://datacommons.org",
99
"provenance_description": "Schema nodes for Data Commons",
1010
"scripts": [
11-
"process.py --entity=Schema"
11+
"./process.sh --entity=Schema"
12+
],
13+
"import_inputs": [
14+
{
15+
"node_mcf": "**/*.mcf"
16+
}
1217
],
13-
"import_inputs": [],
1418
"source_files": [],
1519
"cron_schedule": "15 3 * * *",
1620
"config_override": {
17-
"invoke_import_validation": false,
18-
"invoke_import_tool": false,
21+
"invoke_import_validation": true,
22+
"invoke_import_tool": true,
1923
"invoke_differ_tool": false
2024
}
2125
},
@@ -27,11 +31,39 @@
2731
"provenance_url": "https://datacommons.org",
2832
"provenance_description": "Place nodes for Data Commons",
2933
"scripts": [
30-
"process.py --entity=Place"
34+
"./process.sh --entity=Place"
3135
],
3236
"import_inputs": [],
3337
"source_files": [],
3438
"cron_schedule": "15 3 * * 1",
39+
"resource_limits": {
40+
"cpu": 8,
41+
"memory": 128,
42+
"disk": 100
43+
},
44+
"config_override": {
45+
"invoke_import_validation": false,
46+
"invoke_import_tool": true,
47+
"invoke_differ_tool": false
48+
}
49+
},
50+
{
51+
"import_name": "Provenance",
52+
"curator_emails": [
53+
"support@datacommons.org"
54+
],
55+
"provenance_url": "https://datacommons.org",
56+
"provenance_description": "Provenance nodes for Data Commons",
57+
"scripts": [
58+
"./process.sh --entity=Provenance"
59+
],
60+
"import_inputs": [
61+
{
62+
"node_mcf": "**/*.mcf"
63+
}
64+
],
65+
"source_files": [],
66+
"cron_schedule": "15 3 * * 1",
3567
"config_override": {
3668
"invoke_import_validation": false,
3769
"invoke_import_tool": false,

0 commit comments

Comments
 (0)