Skip to content

Commit b36ac62

Browse files
authored
Import workflow fixes (#1886)
1 parent 0f0e6ec commit b36ac62

15 files changed

Lines changed: 550 additions & 283 deletions

import-automation/workflow/aggregation-helper/main.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ def aggregation_helper(request):
4040
HTTP Cloud Function that takes importName and runs a BQ query.
4141
"""
4242
request_json = request.get_json(silent=True)
43-
import_list = request_json.get('importList')
44-
if not import_list:
45-
return ("'importList' parameter is missing", 400)
43+
import_list = request_json.get('importList', [])
4644
logging.info(f"Received request for importList: {import_list}")
4745
results = []
4846
try:

import-automation/workflow/cloudbuild.yaml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,19 @@ steps:
3030
- id: 'import-automation-workflow'
3131
name: 'gcr.io/cloud-builders/gcloud'
3232
args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
33-
dir: 'import-automation/workflow'
3433

3534
- id: 'spanner-ingestion-workflow'
3635
name: 'gcr.io/cloud-builders/gcloud'
3736
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
38-
dir: 'import-automation/workflow'
3937

4038
- id: 'spanner-ingestion-helper'
4139
name: 'gcr.io/cloud-builders/gcloud'
4240
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
43-
dir: 'import-automation/workflow'
4441

4542
- id: 'import-aggregation-helper'
4643
name: 'gcr.io/cloud-builders/gcloud'
4744
args: ['functions', 'deploy', 'import-aggregation-helper', '--runtime', 'python312', '--source', 'aggregation-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'aggregation_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION},BQ_DATASET_ID=${_BQ_DATASET_ID}']
48-
dir: 'import-automation/workflow'
4945

5046
- id: 'import-automation-helper'
5147
name: 'gcr.io/cloud-builders/gcloud'
5248
args: ['functions', 'deploy', 'import-automation-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'import-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'handle_feed_event', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID}']
53-
dir: 'import-automation/workflow'

import-automation/workflow/cloudbuild_main.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ steps:
3838
- '.'
3939
- '--config=cloudbuild.yaml'
4040
- '--project=${_PROJECT_ID}'
41-
- '--substitutions=_ENV=staging,_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID}'
41+
- '--substitutions=_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID}'
4242
dir: 'import-automation/workflow'
4343

4444
# 2. Run E2E Tests on Staging

import-automation/workflow/import-automation-workflow.yaml

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,21 @@ main:
66
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
77
- region: ${sys.get_env("LOCATION")}
88
- imageUri: ${default(map.get(args, "imageUri"), "gcr.io/datcom-ci/dc-import-executor:stable")}
9-
- jobId: ${text.substring(args.jobName, 0, 50) + "-" + string(int(sys.now()))}
9+
- jobId: ${text.replace_all(text.to_lower(text.substring(text.split(args.importName, ":")[1], 0, 50) + "-" + string(int(sys.now()))), "_", "-")}
1010
- importName: ${args.importName}
11-
- importConfig: ${args.importConfig}
11+
- importConfig: ${default(map.get(args, "importConfig"), "{}")}
1212
- gcsMountBucket: ${sys.get_env("GCS_MOUNT_BUCKET")}
1313
- gcsImportBucket: ${sys.get_env("GCS_BUCKET_ID")}
1414
- gcsMountPath: "/tmp/gcs"
1515
- ingestionHelper: "spanner-ingestion-helper"
1616
- functionUrl: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/" + ingestionHelper}
1717
- startTime: ${sys.now()}
18+
- defaultResources:
19+
machine: "n2-standard-8"
20+
cpu: 8000
21+
memory: 32768
22+
disk: 100
23+
- resources: ${default(map.get(args, "resources"), defaultResources)}
1824
- runImportJob:
1925
try:
2026
call: googleapis.batch.v1.projects.locations.jobs.create
@@ -25,11 +31,11 @@ main:
2531
allocationPolicy:
2632
instances:
2733
- policy:
28-
machineType: ${args.resources.machine}
34+
machineType: ${resources.machine}
2935
provisioningModel: "STANDARD"
3036
bootDisk:
3137
image: "projects/debian-cloud/global/images/family/debian-12"
32-
size_gb: ${args.resources.disk}
38+
size_gb: ${resources.disk}
3339
installOpsAgent: true
3440
taskGroups:
3541
taskSpec:
@@ -38,14 +44,14 @@ main:
3844
remotePath: ${gcsMountBucket}
3945
mountPath: ${gcsMountPath}
4046
computeResource:
41-
cpuMilli: ${args.resources.cpu}
42-
memoryMib: ${args.resources.memory}
47+
cpuMilli: ${resources.cpu}
48+
memoryMib: ${resources.memory}
4349
runnables:
4450
- container:
4551
imageUri: ${imageUri}
4652
commands:
47-
- ${"--import_name=" + args.importName}
48-
- ${"--import_config=" + args.importConfig}
53+
- ${"--import_name=" + importName}
54+
- ${"--import_config=" + importConfig}
4955
environment:
5056
variables:
5157
IMPORT_NAME: ${importName}
@@ -73,11 +79,10 @@ main:
7379
body:
7480
actionType: 'update_import_status'
7581
jobId: ${jobId}
76-
importName: ${args.importName}
77-
status: 'FAILED'
82+
importName: ${importName}
83+
status: 'FAILURE'
7884
executionTime: ${int(sys.now() - startTime)}
79-
latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")}
80-
schedule: ${default(map.get(args, "schedule"), "")}
85+
latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(importName, ":", "/")}
8186
result: functionResponse
8287
- failWorkflow:
8388
raise: ${e}
@@ -89,8 +94,8 @@ main:
8994
type: OIDC
9095
body:
9196
actionType: 'update_import_version'
92-
importName: ${args.importName}
93-
version: 'staging'
97+
importName: ${importName}
98+
version: 'STAGING'
9499
override: false
95100
comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}'
96101
result: functionResponse
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import base64
16+
import json
17+
import logging
18+
import os
19+
import croniter
20+
from datetime import datetime, timezone
21+
from google.auth.transport.requests import Request
22+
from google.oauth2 import id_token
23+
from google.cloud import storage
24+
from google.cloud.workflows import executions_v1
25+
import requests
26+
27+
logging.getLogger().setLevel(logging.INFO)
28+
29+
PROJECT_ID = os.environ.get('PROJECT_ID')
30+
LOCATION = os.environ.get('LOCATION')
31+
GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID')
32+
INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper"
33+
WORKFLOW_ID = 'spanner-ingestion-workflow'
34+
35+
def invoke_ingestion_workflow(import_name: str):
36+
"""Triggers the graph ingestion workflows.
37+
38+
Args:
39+
import_name: The name of the import.
40+
"""
41+
workflow_args = {"importList": [import_name.split(':')[-1]]}
42+
43+
logging.info(f"Invoking {WORKFLOW_ID} for {import_name}")
44+
execution_client = executions_v1.ExecutionsClient()
45+
parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}"
46+
execution_req = executions_v1.Execution(argument=json.dumps(workflow_args))
47+
response = execution_client.create_execution(parent=parent,
48+
execution=execution_req)
49+
logging.info(
50+
f"Triggered workflow {WORKFLOW_ID} for {import_name}. Execution ID: {response.name}"
51+
)
52+
53+
54+
def update_import_status(import_name,
55+
import_status,
56+
import_version,
57+
graph_path,
58+
job_id,
59+
cron_schedule=None):
60+
"""Updates the status for the specified import job.
61+
62+
Args:
63+
import_name: The name of the import.
64+
import_status: The new status of the import.
65+
import_version: The version of the import.
66+
graph_path: The graph path for the import.
67+
job_id: The job ID associated with the import.
68+
cron_schedule: The cron schedule for the import (optional).
69+
"""
70+
logging.info(f"Updating {import_name} status: {import_status}")
71+
latest_version = 'gs://' + GCS_BUCKET_ID + '/' + import_name.replace(
72+
':', '/') + '/' + import_version
73+
request = {
74+
'actionType': 'update_import_status',
75+
'importName': import_name,
76+
'status': import_status,
77+
'job_id': job_id,
78+
'latestVersion': latest_version,
79+
'graphPath': graph_path
80+
}
81+
if cron_schedule:
82+
try:
83+
next_refresh = croniter.croniter(
84+
cron_schedule,
85+
datetime.now(timezone.utc)).get_next(datetime).isoformat()
86+
request['nextRefresh'] = next_refresh
87+
except (croniter.CroniterError) as e:
88+
logging.error(
89+
f"Error calculating next refresh from schedule '{cron_schedule}': {e}"
90+
)
91+
logging.info(f"Update request: {request}")
92+
auth_req = Request()
93+
token = id_token.fetch_id_token(auth_req, INGESTION_HELPER_URL)
94+
headers = {'Authorization': f'Bearer {token}'}
95+
response = requests.post(INGESTION_HELPER_URL,
96+
json=request,
97+
headers=headers)
98+
response.raise_for_status()
99+
logging.info(f"Updated status for {import_name}")
100+
101+
102+
def parse_message(request) -> dict:
103+
"""Processes the incoming Pub/Sub message.
104+
105+
Args:
106+
request: The flask request object.
107+
108+
Returns:
109+
A dictionary containing the message data, or None if invalid.
110+
"""
111+
request_json = request.get_json(silent=True)
112+
if not request_json or 'message' not in request_json:
113+
logging.error('Invalid Pub/Sub message format')
114+
return None
115+
116+
pubsub_message = request_json['message']
117+
logging.info(f"Received Pub/Sub message: {pubsub_message}")
118+
try:
119+
data_bytes = base64.b64decode(pubsub_message["data"])
120+
notification_json = data_bytes.decode("utf-8")
121+
logging.info(f"Notification content: {notification_json}")
122+
except Exception as e:
123+
logging.error(f"Error decoding message data: {e}")
124+
125+
return pubsub_message
126+
127+
128+
def check_duplicate(message_id: str):
129+
"""Checks for duplicate messages using a GCS file.
130+
131+
Args:
132+
message_id: The ID of the message to check.
133+
134+
Returns:
135+
True if the message is a duplicate, False otherwise.
136+
"""
137+
duplicate = False
138+
if not message_id:
139+
return duplicate
140+
logging.info(f"Checking for existing message: {message_id}")
141+
storage_client = storage.Client()
142+
bucket = storage_client.bucket(GCS_BUCKET_ID)
143+
blob = bucket.blob(f"google3/transfers/{message_id}")
144+
try:
145+
blob.upload_from_string("", if_generation_match=0)
146+
except Exception:
147+
duplicate = True
148+
return duplicate

0 commit comments

Comments
 (0)