Skip to content

Commit 0f0e6ec

Browse files
authored
Import executor clean up (#1903)
1 parent db24a79 commit 0f0e6ec

8 files changed

Lines changed: 34 additions & 166 deletions

File tree

import-automation/executor/app/executor/cloud_batch.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -141,28 +141,26 @@ def get_gce_instance(required_cpu: float,
141141
return suitable_instances[0]['name']
142142

143143

144-
def create_job_request(import_name: str, import_config: dict, import_spec: dict,
145-
default_resources: dict, timeout: int) -> str:
146-
resources = import_spec.get('resource_limits', default_resources)
147-
machine_type = get_gce_instance(resources['cpu'], resources['memory'])
144+
def create_job_request(import_name: str, import_config: dict,
145+
import_spec: dict) -> str:
146+
resources = {}
147+
if 'resource_limits' in import_spec:
148+
resources = import_spec.get('resource_limits', {})
149+
machine_type = get_gce_instance(resources.get('cpu', 0),
150+
resources.get('memory', 0))
151+
resources[
152+
"machine"] = machine_type if machine_type is not None else 'n2-standard-8'
153+
resources["cpu"] = resources["cpu"] * 1000
154+
resources["memory"] = resources["memory"] * 1024
148155

149-
resources[
150-
"machine"] = machine_type if machine_type is not None else 'n2-standard-8'
151-
152-
resources["cpu"] = resources["cpu"] * 1000
153-
resources["memory"] = resources["memory"] * 1024
154-
schedule = import_spec.get('cron_schedule')
155156
import_config_string = json.dumps(import_config)
156-
job_name = import_name.split(':')[1]
157-
job_name = job_name.replace("_", "-").lower()
158157
argument_payload = {
159-
"jobName": job_name,
160158
"importName": import_name,
161159
"importConfig": import_config_string,
162-
"resources": resources,
163-
"timeout": timeout,
164-
"schedule": schedule
165160
}
161+
if resources:
162+
argument_payload["resources"] = resources
163+
166164
argument_string = json.dumps(argument_payload)
167165
final_payload = {
168166
"argument": argument_string,

import-automation/executor/app/executor/import_executor.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from timer import Timer
4848
from import_differ.import_differ import ImportDiffer
4949
from tools.import_validation.runner import ValidationRunner
50+
from tools.import_validation.result import ValidationStatus, ValidationResult
5051
from tools.import_validation.validation_config import merge_and_save_config
5152
from app import configs
5253
from app import utils
@@ -568,6 +569,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
568569
data_size = 0
569570
import_name = import_spec['import_name']
570571
validation_status = True
572+
validation_results = []
571573

572574
import_dir = f'{relative_import_dir}/{import_spec["import_name"]}'
573575
latest_version = self._get_latest_version(import_dir)
@@ -650,7 +652,8 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
650652
stats_summary=summary_stats,
651653
lint_report=report_json,
652654
validation_output=validation_output_file)
653-
overall_status, _ = validation.run_validations()
655+
overall_status, current_results = validation.run_validations()
656+
validation_results.extend(current_results)
654657
if validation_status:
655658
validation_status = overall_status
656659
except ValueError as e:
@@ -683,13 +686,26 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
683686
import_summary.import_stats[
684687
'validation_execution_time'] = start_timer.time()
685688
import_summary.import_stats['validation_data_size'] = data_size
689+
validation_message = self._get_validation_message(validation_results)
686690
log_import_status(
687691
import_name, import_stage,
688692
ImportStatus.SUCCESS if validation_status else ImportStatus.FAILURE,
689693
import_summary.import_stats.get('validation_execution_time', 0),
690-
import_summary.import_stats.get('validation_data_size', 0))
694+
import_summary.import_stats.get('validation_data_size',
695+
0), validation_message)
691696
return validation_status
692697

698+
def _get_validation_message(
699+
self, validation_results: List[ValidationResult]) -> str:
700+
"""Generates a summary message of validation results."""
701+
failed_validations = []
702+
703+
for res in validation_results:
704+
if res.status.name == 'FAILED':
705+
failed_validations.append(res.name)
706+
707+
return f"FAILED: {', '.join(failed_validations) if failed_validations else 'None'}"
708+
693709
def _create_mount_point(self, gcs_volume_mount_dir: str,
694710
cleanup_gcs_volume_mount: bool,
695711
absolute_import_dir: str, import_name: str) -> None:
@@ -824,10 +840,6 @@ def _import_one_helper(
824840
output_dir = f'{relative_import_dir}/{import_name}'
825841
version = self.config.import_version_override if self.config.import_version_override else _clean_time(
826842
utils.pacific_time())
827-
# Used for imports using CDA feed tranfers with a date placeholder in the GCS path,
828-
# thus, we can determine the path using the current date (instead of a variable timestamp).
829-
if version == 'DATE_VERSION_PLACEHOLDER':
830-
version = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d")
831843
import_summary.latest_version = 'gs://' + os.path.join(
832844
self.config.storage_prod_bucket_name, output_dir, version)
833845
import_summary.graph_path = self.config.graph_data_path

import-automation/executor/app/executor/scheduler_job_manager.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,7 @@ def create_or_update_import_schedule(absolute_import_name: str,
158158
# This set up a cloud scheduler job which periodically invokes a GCP workflow job.
159159
# The workflow job runs a CLOUD BATCH job with the specified configuration in the request.
160160
json_encoded_body = cloud_batch.create_job_request(
161-
absolute_import_name, override_config, import_spec, resources,
162-
timeout)
161+
absolute_import_name, override_config, import_spec)
163162
cloud_batch_job_url = f'https://workflowexecutions.googleapis.com/v1/projects/{config.gcp_project_id}/locations/{config.scheduler_location}/workflows/{config.cloud_workflow_id}/executions'
164163
req = cloud_scheduler.cloud_batch_job_request(
165164
absolute_import_name, schedule, cloud_batch_job_url,

import-automation/executor/cloud_batch_import_test.sh

Lines changed: 0 additions & 104 deletions
This file was deleted.

import-automation/executor/cloud_run_import_test.sh

Lines changed: 0 additions & 33 deletions
This file was deleted.

import-automation/executor/reschedule_all_imports.sh renamed to import-automation/executor/scripts/reschedule_all_imports.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ mapfile -t IMPORT_LIST < <(gcloud scheduler jobs list --location us-central1 --p
1616
SCRIPT_DIR=$(realpath $(dirname $0))
1717
for import in "${IMPORT_LIST[@]}"; do
1818
echo "Scheduling import: $import"
19-
$SCRIPT_DIR/schedule_update_import.sh -s $PROJECT $import
19+
$SCRIPT_DIR/../schedule_update_import.sh -s $PROJECT $import
2020
done
2121
echo "Batch update finished."
2222

import-automation/executor/update_import_version.sh renamed to import-automation/executor/scripts/update_import_version.sh

File renamed without changes.

scripts/entities/manifest.json

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
"source_files": [],
1515
"cron_schedule": "15 3 * * *",
1616
"config_override": {
17-
"import_version_override": "DATE_VERSION_PLACEHOLDER",
18-
"graph_data_path": "/**/*mcf*",
1917
"invoke_import_validation": false,
2018
"invoke_import_tool": false,
2119
"invoke_differ_tool": false
@@ -35,8 +33,6 @@
3533
"source_files": [],
3634
"cron_schedule": "15 3 * * 1",
3735
"config_override": {
38-
"import_version_override": "DATE_VERSION_PLACEHOLDER",
39-
"graph_data_path": "/**/*mcf*",
4036
"invoke_import_validation": false,
4137
"invoke_import_tool": false,
4238
"invoke_differ_tool": false

0 commit comments

Comments
 (0)