Skip to content

Commit 162fb10

Browse files
committed
Add script to retry failed ingestion job
1 parent dc738c8 commit 162fb10

1 file changed

Lines changed: 298 additions & 0 deletions

File tree

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2026 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
"""Script to automate the retry process for failed Dataflow ingestion jobs.
16+
17+
This script identifies failed imports within a specific Dataflow job,
18+
reverts the failed imports in the Spanner database to their last known
19+
good version, resets any 'PENDING' imports back to 'STAGING', and
20+
optionally retriggers the Spanner ingestion workflow.
21+
"""
22+
23+
import json
24+
import sys
25+
26+
from absl import app
27+
from absl import flags
28+
from absl import logging
29+
from google.cloud import spanner
30+
from googleapiclient.discovery import build
31+
from googleapiclient.errors import HttpError
32+
33+
FLAGS = flags.FLAGS
34+
35+
flags.DEFINE_string("project_id", "datcom-import-automation-prod",
36+
"GCP Project ID")
37+
flags.DEFINE_string("location", "us-central1", "GCP Location")
38+
flags.DEFINE_string("job_id", None, "Failed Dataflow Job ID", required=True)
39+
flags.DEFINE_string("spanner_project", "datcom-store", "Spanner Project ID")
40+
flags.DEFINE_string("spanner_instance", "dc-kg-test", "Spanner Instance ID")
41+
flags.DEFINE_string("spanner_database", "dc_graph_import",
42+
"Spanner Database ID")
43+
flags.DEFINE_string("workflow_name", "spanner-ingestion-workflow",
44+
"Workflow name")
45+
46+
47+
def get_failed_imports(dataflow, project_id, location, job_id):
48+
"""Identifies failed import names from Dataflow job messages and parameters.
49+
50+
Args:
51+
dataflow: An authenticated Google Cloud Dataflow API client.
52+
project_id: The GCP Project ID where the Dataflow job ran.
53+
location: The GCP location of the Dataflow job (e.g., 'us-central1').
54+
job_id: The unique ID of the failed Dataflow job.
55+
56+
Returns:
57+
A list of strings representing the names of the imports that failed
58+
during the Dataflow job.
59+
"""
60+
failed_imports = set()
61+
try:
62+
logging.info(f"Fetching job details for {job_id}...")
63+
job = dataflow.projects().locations().jobs().get(
64+
projectId=project_id,
65+
location=location,
66+
jobId=job_id,
67+
view='JOB_VIEW_ALL').execute()
68+
69+
# Get all imports involved in this job from displayData.
70+
all_import_names = set()
71+
import_list_json = None
72+
for item in job.get('pipelineDescription', {}).get('displayData', []):
73+
if item.get('key') == 'importList':
74+
import_list_json = item.get('strValue')
75+
break
76+
77+
if import_list_json:
78+
try:
79+
all_imports_data = json.loads(import_list_json)
80+
all_import_names = {
81+
i['importName'].split(':')[-1] for i in all_imports_data
82+
}
83+
logging.info(
84+
f"Imports involved in this job: {all_import_names}")
85+
except Exception as e:
86+
logging.error(f"Error parsing importList parameter: {e}")
87+
88+
logging.info(f"Fetching error messages for Dataflow job {job_id}...")
89+
messages = dataflow.projects().locations().jobs().messages().list(
90+
projectId=project_id,
91+
location=location,
92+
jobId=job_id,
93+
minimumImportance='JOB_MESSAGE_ERROR').execute()
94+
95+
for msg in messages.get('jobMessages', []):
96+
text = msg.get('messageText', '')
97+
# If we find a valid import name in the error message, add it.
98+
for name in all_import_names:
99+
if name in text:
100+
failed_imports.add(name)
101+
102+
# If the job failed globally and we haven't identified specific imports,
103+
# or if we want to be safe, include all imports if the state is failed.
104+
if job.get('currentState') in [
105+
'JOB_STATE_FAILED', 'JOB_STATE_CANCELLED'
106+
]:
107+
if not failed_imports:
108+
logging.warning(
109+
"Job failed globally. Reverting all involved imports.")
110+
failed_imports.update(all_import_names)
111+
else:
112+
logging.info(
113+
f"Job failed. Identified specific failed imports: {failed_imports}"
114+
)
115+
116+
except HttpError as e:
117+
logging.error(f"Error querying Dataflow API: {e}")
118+
119+
return list(failed_imports)
120+
121+
122+
def revert_import(database, import_name, job_id):
123+
"""Reverts an import to its previous version in Spanner and sets it to STAGING.
124+
125+
This function updates the ImportStatus and ImportVersionHistory tables in
126+
Spanner to effectively rollback an import that failed during the Dataflow job.
127+
128+
Args:
129+
database: An initialized Spanner Database object.
130+
import_name: The name of the import to revert (e.g., 'foo:bar:import').
131+
job_id: The ID of the failed Dataflow job, used for commenting.
132+
133+
Returns:
134+
True if the revert was successful, 'ALREADY_REVERTED' if the import
135+
was recently reverted, or False if no version history was found or an
136+
error occurred.
137+
"""
138+
short_name = import_name.split(':')[-1]
139+
140+
def _revert_txn(transaction):
141+
# 1. Fetch the most recent version and comment from ImportVersionHistory.
142+
sql = """
143+
SELECT Version, Comment FROM ImportVersionHistory
144+
WHERE ImportName = @importName
145+
ORDER BY UpdateTimestamp DESC
146+
LIMIT 1
147+
"""
148+
results = transaction.execute_sql(
149+
sql,
150+
params={'importName': short_name},
151+
param_types={'importName': spanner.param_types.STRING})
152+
rows = list(results)
153+
154+
if not rows:
155+
logging.warning(
156+
f"No version history found for '{short_name}'. Cannot revert.")
157+
return False
158+
159+
previous_version = rows[0][0]
160+
previous_comment = rows[0][1] if len(rows[0]) > 1 else ""
161+
162+
if previous_comment and "Reverted due to Dataflow failure" in previous_comment:
163+
logging.error(
164+
f"Import '{short_name}' was already reverted recently. Aborting to prevent infinite loop."
165+
)
166+
return "ALREADY_REVERTED"
167+
168+
logging.info(
169+
f"Reverting '{short_name}' to last known good version: {previous_version}"
170+
)
171+
172+
# 2. Update ImportStatus to point to the previous version and set state to STAGING.
173+
param_types = {
174+
'version': spanner.param_types.STRING,
175+
'importName': spanner.param_types.STRING
176+
}
177+
178+
transaction.execute_update("""
179+
UPDATE ImportStatus
180+
SET LatestVersion = @version, State = 'STAGING', StatusUpdateTimestamp = PENDING_COMMIT_TIMESTAMP()
181+
WHERE ImportName = @importName
182+
""",
183+
params={
184+
'version': previous_version,
185+
'importName': short_name
186+
},
187+
param_types=param_types)
188+
189+
# 3. Add an entry to ImportVersionHistory to record the revert action.
190+
transaction.execute_update(
191+
"""
192+
INSERT INTO ImportVersionHistory (ImportName, Version, UpdateTimestamp, Comment)
193+
VALUES (@importName, @version, PENDING_COMMIT_TIMESTAMP(), @comment)
194+
""",
195+
params={
196+
'importName': short_name,
197+
'version': previous_version,
198+
'comment': f"Reverted due to Dataflow failure ({job_id})"
199+
},
200+
param_types={
201+
'importName': spanner.param_types.STRING,
202+
'version': spanner.param_types.STRING,
203+
'comment': spanner.param_types.STRING
204+
})
205+
return True
206+
207+
try:
208+
database.run_in_transaction(_revert_txn)
209+
except Exception as e:
210+
logging.error(f"Transaction failed for '{short_name}': {e}")
211+
212+
213+
def reset_pending_imports(database):
214+
"""Resets all imports with state 'PENDING' back to 'STAGING'.
215+
216+
This ensures that any imports that were stuck in a PENDING state due to
217+
the failure are put back into the queue for processing.
218+
219+
Args:
220+
database: An initialized Spanner Database object.
221+
"""
222+
logging.info("Resetting all PENDING imports to STAGING...")
223+
224+
def _reset_txn(transaction):
225+
return transaction.execute_update("""
226+
UPDATE ImportStatus
227+
SET State = 'STAGING', StatusUpdateTimestamp = PENDING_COMMIT_TIMESTAMP()
228+
WHERE State = 'PENDING'
229+
""")
230+
231+
try:
232+
count = database.run_in_transaction(_reset_txn)
233+
logging.info(f"Successfully reset {count} PENDING imports to STAGING.")
234+
except Exception as e:
235+
logging.error(f"Failed to reset PENDING imports: {e}")
236+
237+
238+
def retrigger_workflow(project_id, location, workflow_name):
239+
"""Retriggers the Spanner ingestion workflow with a retry flag.
240+
241+
Args:
242+
project_id: The GCP Project ID where the workflow is hosted.
243+
location: The GCP location of the workflow (e.g., 'us-central1').
244+
workflow_name: The name of the workflow to execute.
245+
"""
246+
logging.info(f"Retriggering workflow '{workflow_name}'...")
247+
try:
248+
service = build('workflowexecutions', 'v1', cache_discovery=False)
249+
parent = f"projects/{project_id}/locations/{location}/workflows/{workflow_name}"
250+
# Pass is_retry=True as a workflow argument.
251+
execution = service.projects().locations().workflows().executions(
252+
).create(parent=parent,
253+
body={
254+
"argument": json.dumps({"is_retry": True})
255+
}).execute()
256+
logging.info(
257+
f"Retriggered successfully. New execution: {execution.get('name')}")
258+
except Exception as e:
259+
logging.error(f"Failed to retrigger workflow: {e}")
260+
261+
262+
def main(argv):
263+
"""Main entry point for the script.
264+
265+
Args:
266+
argv: Command-line arguments passed to the script.
267+
"""
268+
dataflow = build('dataflow', 'v1b3', cache_discovery=False)
269+
failed_imports = get_failed_imports(dataflow, FLAGS.project_id,
270+
FLAGS.location, FLAGS.job_id)
271+
272+
if not failed_imports:
273+
logging.error("No failed imports identified.")
274+
sys.exit(1)
275+
276+
logging.info(f"Processing failed imports: {failed_imports}")
277+
spanner_client = spanner.Client(project=FLAGS.spanner_project)
278+
instance = spanner_client.instance(FLAGS.spanner_instance)
279+
database = instance.database(FLAGS.spanner_database)
280+
281+
can_retrigger = True
282+
for imp in failed_imports:
283+
status = revert_import(database, imp, FLAGS.job_id)
284+
if status == "ALREADY_REVERTED":
285+
can_retrigger = False
286+
287+
if can_retrigger:
288+
reset_pending_imports(database)
289+
retrigger_workflow(FLAGS.project_id, FLAGS.location,
290+
FLAGS.workflow_name)
291+
else:
292+
logging.warning(
293+
"Skipping re-trigger because at least one import was already reverted."
294+
)
295+
296+
297+
if __name__ == "__main__":
298+
app.run(main)

0 commit comments

Comments
 (0)