Skip to content

Commit f0dc316

Browse files
Add workaround for failed exports of queries: create a fallback table
For each query to export, optionally provide a fallback `CREATE TABLE` query which will run if the export job for the query fails. Implement this by calling an API route `/api/create-fallback-table-after-failed-export` after an export for a query fails for any reason. This works around the bug where queries with an empty result fail to export to Seafowl, see: splitgraph/seafowl#423
1 parent 7e1ae87 commit f0dc316

11 files changed

Lines changed: 551 additions & 70 deletions

File tree

examples/nextjs-import-airbyte-github-export-seafowl/.env.test.local

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,13 @@ GITHUB_PAT_SECRET="github_pat_**********************_***************************
1919
# e.g. To intercept requests to Splitgraph API sent from madatdata libraries in API routes
2020
# You can also set this by running: yarn dev-mitm (see package.json)
2121
# MITMPROXY_ADDRESS="http://localhost:7979"
22+
23+
# OPTIONAL: Set Seafowl environment variables to use for creating fallback tables when exports fail
24+
# NOTE 1: At the moment the instance URL must be https://demo.seafowl.cloud because that's where
25+
# the Splitgraph export API exports tables to when no instance URL is specified, and we are
26+
# currently not specifying the instance URL when starting exports, and only use it when creating fallback tables.
27+
# NOTE 2: The dbname (SEAFOWL_INSTANCE_DATABASE) MUST match NEXT_PUBLIC_SPLITGRAPH_GITHUB_ANALYTICS_META_NAMESPACE
28+
#
29+
# SEAFOWL_INSTANCE_URL="https://demo.seafowl.cloud"
30+
# SEAFOWL_INSTANCE_SECRET="********************************"
31+
# SEAFOWL_INSTANCE_DATABASE="**********"

examples/nextjs-import-airbyte-github-export-seafowl/components/ImportExportStepper/DebugPanel.tsx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,15 @@ export const DebugPanel = () => {
88
<pre style={{ minWidth: "80%", minHeight: "300px" }}>
99
{JSON.stringify(
1010
state,
11-
(_key, value) => (value instanceof Set ? Array.from(value) : value),
11+
(_key, value) => {
12+
if (value instanceof Set) {
13+
return Array.from(value);
14+
} else if (value instanceof Map) {
15+
return Object.fromEntries(value);
16+
} else {
17+
return value;
18+
}
19+
},
1220
2
1321
)}
1422
</pre>

examples/nextjs-import-airbyte-github-export-seafowl/components/ImportExportStepper/ExportPanel.tsx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ export const ExportPanel = () => {
8787
destinationTable,
8888
destinationSchema,
8989
sourceQuery,
90+
fallbackCreateTableQuery: queriesToExport.find(
91+
(q) =>
92+
q.destinationSchema === destinationSchema &&
93+
q.destinationTable === destinationTable
94+
)?.fallbackCreateTableQuery,
9095
})
9196
),
9297
...data["tables"].map(

examples/nextjs-import-airbyte-github-export-seafowl/components/ImportExportStepper/export-hooks.tsx

Lines changed: 122 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@ export const useFindMatchingExportTable = (
4646
};
4747

4848
export const usePollExportTasks = () => {
49-
const [{ stepperState, loadingExportTasks }, dispatch] = useStepper();
49+
const [
50+
{ stepperState, loadingExportTasks, exportedTablesLoading },
51+
dispatch,
52+
] = useStepper();
5053

5154
useEffect(() => {
5255
if (stepperState !== "awaiting_export") {
@@ -71,11 +74,50 @@ export const usePollExportTasks = () => {
7174
type: "export_task_complete",
7275
completedTask: { taskId },
7376
}),
74-
onError: ({ taskId, error }) =>
77+
onError: async ({ taskId, error }) => {
78+
// If the task failed but we're not going to retry, then check if
79+
// there is a fallback query to create the table, and if so,
80+
// create it before marking the task as complete.
81+
if (!error.retryable) {
82+
// NOTE: There is an implicit assumption that `exportedTablesLoading`
83+
// and `loadingExportTasks` are updated at the same time, which they
84+
// are, by the reducer that handles the `export_task_start` and
85+
// `export_task_complete` actions.
86+
const maybeExportedQueryWithCreateTableFallback = Array.from(
87+
exportedTablesLoading
88+
).find(
89+
(t) => t.taskId === taskId && t.fallbackCreateTableQuery
90+
);
91+
92+
if (maybeExportedQueryWithCreateTableFallback) {
93+
await createFallbackTableAfterFailedExport({
94+
destinationSchema:
95+
maybeExportedQueryWithCreateTableFallback.destinationSchema,
96+
destinationTable:
97+
maybeExportedQueryWithCreateTableFallback.destinationTable,
98+
fallbackCreateTableQuery:
99+
maybeExportedQueryWithCreateTableFallback.fallbackCreateTableQuery,
100+
101+
// On error or success, we mutate the error variable which
102+
// will be passed by `dispatch` outside of this conditional.
103+
onError: (errorCreatingFallbackTable) => {
104+
error.message = `${error.message} (and also error creating fallback: ${errorCreatingFallbackTable.message})`;
105+
},
106+
onSuccess: () => {
107+
error = undefined; // No error because we consider the task complete after creating the fallback table.
108+
},
109+
});
110+
}
111+
}
112+
75113
dispatch({
76-
type: "export_error",
77-
error: `Error exporting ${taskId}: ${error.message}`,
78-
}),
114+
type: "export_task_complete",
115+
completedTask: {
116+
taskId,
117+
error,
118+
},
119+
});
120+
},
79121
abortSignal: abortController.signal,
80122
})
81123
)
@@ -86,7 +128,7 @@ export const usePollExportTasks = () => {
86128
clearInterval(interval);
87129
abortController.abort();
88130
};
89-
}, [loadingExportTasks, stepperState, dispatch]);
131+
}, [loadingExportTasks, exportedTablesLoading, stepperState, dispatch]);
90132
};
91133

92134
const pollExportTaskOnce = async ({
@@ -97,7 +139,13 @@ const pollExportTaskOnce = async ({
97139
}: {
98140
taskId: string;
99141
onSuccess: ({ taskId }: { taskId: string }) => void;
100-
onError: ({ taskId, error }: { taskId: string; error: any }) => void;
142+
onError: ({
143+
taskId,
144+
error,
145+
}: {
146+
taskId: string;
147+
error: { message: string; retryable: boolean };
148+
}) => void;
101149
abortSignal: AbortSignal;
102150
}) => {
103151
try {
@@ -118,6 +166,7 @@ const pollExportTaskOnce = async ({
118166
} else if (data.error) {
119167
if (!data.completed) {
120168
console.log("WARN: Failed status, not completed:", data.error);
169+
onError({ taskId, error: { message: data.error, retryable: false } });
121170
} else {
122171
throw new Error(data.error);
123172
}
@@ -127,6 +176,71 @@ const pollExportTaskOnce = async ({
127176
return;
128177
}
129178

130-
onError({ taskId, error });
179+
onError({
180+
taskId,
181+
error: {
182+
message: `Error exporting ${taskId}: ${
183+
error.message ?? error.name ?? "unknown"
184+
}`,
185+
retryable: true,
186+
},
187+
});
188+
}
189+
};
190+
191+
/**
192+
* Call the API route to create a fallback table after a failed export.
193+
*
194+
* Note that both `destinationTable` and `destinationSchema` should already
195+
* be included in the `fallbackCreateTableQuery`, but we need them so that
196+
* the endpoint can separately `CREATE SCHEMA` and `DROP TABLE` in case the
197+
* schema does not yet exist, or the table already exists (we overwrite it to
198+
* be consistent with behavior of Splitgraph export API).
199+
*/
200+
const createFallbackTableAfterFailedExport = async ({
201+
destinationSchema,
202+
destinationTable,
203+
fallbackCreateTableQuery,
204+
onSuccess,
205+
onError,
206+
}: Required<
207+
Pick<
208+
ExportTable,
209+
"destinationSchema" | "destinationTable" | "fallbackCreateTableQuery"
210+
>
211+
> & {
212+
onSuccess: () => void;
213+
onError: (error: { message: string }) => void;
214+
}) => {
215+
try {
216+
const response = await fetch(
217+
"/api/create-fallback-table-after-failed-export",
218+
{
219+
method: "POST",
220+
headers: {
221+
"Content-Type": "application/json",
222+
},
223+
body: JSON.stringify({
224+
destinationSchema,
225+
destinationTable,
226+
fallbackCreateTableQuery,
227+
}),
228+
}
229+
);
230+
const data = await response.json();
231+
if (data.error || !data.success) {
232+
console.log(
233+
`FAIL: error from endpoint creating fallback table: ${data.error}`
234+
);
235+
onError({ message: data.error ?? "unknown" });
236+
} else {
237+
console.log("SUCCESS: created fallback table");
238+
onSuccess();
239+
}
240+
} catch (error) {
241+
console.log(`FAIL: caught error while creating fallback table: ${error}`);
242+
onError({
243+
message: `${error.message ?? error.name ?? "unknown"}`,
244+
});
131245
}
132246
};

examples/nextjs-import-airbyte-github-export-seafowl/components/ImportExportStepper/stepper-states.ts

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ export type ExportTable = {
88
destinationTable: string;
99
taskId: string;
1010
sourceQuery?: string;
11+
fallbackCreateTableQuery?: string;
1112
};
1213

1314
// NOTE: Multiple tables can have the same taskId, so we track them separately
1415
// in order to not need to redundantly poll the API for each table individually
1516
export type ExportTask = {
1617
taskId: string;
18+
error?: { message: string; retryable: boolean };
1719
};
1820

1921
export type StepperState = {
@@ -35,6 +37,7 @@ export type StepperState = {
3537
debug?: string | null;
3638
loadingExportTasks?: Set<ExportTask>;
3739
completedExportTasks?: Set<ExportTask>;
40+
tasksWithError?: Map<string, string[]>; // taskId -> errors
3841
};
3942

4043
export type StepperAction =
@@ -65,6 +68,7 @@ const initialState: StepperState = {
6568
exportedTablesCompleted: new Set<ExportTable>(),
6669
loadingExportTasks: new Set<ExportTask>(),
6770
completedExportTasks: new Set<ExportTask>(),
71+
tasksWithError: new Map<string, string[]>(),
6872
importError: null,
6973
exportError: null,
7074
debug: null,
@@ -217,12 +221,14 @@ const stepperReducer = (
217221
destinationTable,
218222
destinationSchema,
219223
sourceQuery,
224+
fallbackCreateTableQuery,
220225
taskId,
221226
} of tables) {
222227
exportedTablesLoading.add({
223228
destinationTable,
224229
destinationSchema,
225230
sourceQuery,
231+
fallbackCreateTableQuery,
226232
taskId,
227233
});
228234
}
@@ -248,11 +254,47 @@ const stepperReducer = (
248254
stepperState: "awaiting_export",
249255
};
250256

257+
/**
258+
* NOTE: A task is "completed" even if it received an error, in which case
259+
* we will retry it up to maxRetryCount if `error.retryable` is `true`
260+
*
261+
* That is, _all tasks_ will eventually "complete," whether successfully or not.
262+
*/
251263
case "export_task_complete":
252264
const {
253-
completedTask: { taskId: completedTaskId },
265+
completedTask: { taskId: completedTaskId, error: maybeError },
254266
} = action;
255267

268+
const maxRetryCount = 3;
269+
270+
const updatedTasksWithError = new Map(state.tasksWithError);
271+
const previousErrors = updatedTasksWithError.get(completedTaskId) ?? [];
272+
const hadPreviousError = previousErrors.length > 0;
273+
274+
if (!maybeError && hadPreviousError) {
275+
updatedTasksWithError.delete(completedTaskId);
276+
} else if (maybeError) {
277+
updatedTasksWithError.set(completedTaskId, [
278+
...previousErrors,
279+
maybeError.message,
280+
]);
281+
const numAttempts = updatedTasksWithError.get(completedTaskId).length;
282+
283+
if (maybeError.retryable && numAttempts < maxRetryCount) {
284+
console.log("RETRY: ", completedTaskId, `(${numAttempts} so far)`);
285+
return {
286+
...state,
287+
tasksWithError: updatedTasksWithError,
288+
};
289+
} else {
290+
console.log(
291+
"FAIL: ",
292+
completedTaskId,
293+
`(${numAttempts} reached max ${maxRetryCount})`
294+
);
295+
}
296+
}
297+
256298
// One taskId could match multiple tables, so find reference to each of them
257299
// and then use that reference to delete them from loading set and add them to completed set
258300
const completedTables = Array.from(state.exportedTablesLoading).filter(
@@ -394,7 +436,7 @@ const useMarkAsComplete = (
394436

395437
if (!data.status) {
396438
throw new Error(
397-
"Got unexpected resposne shape when marking import/export complete"
439+
"Got unexpected response shape when marking import/export complete"
398440
);
399441
}
400442

examples/nextjs-import-airbyte-github-export-seafowl/env-vars.d.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,30 @@ namespace NodeJS {
3838
*/
3939
MITMPROXY_ADDRESS?: string;
4040

41+
/**
42+
* Optionally provide the SEAFOWL_INSTANCE_URL to use for creating fallback tables
43+
* when an export fails.
44+
*
45+
* Note that at the moment, this must only be set to https://demo.seafowl.cloud
46+
* because that's where Splitgraph exports to by default, and we are not currently
47+
* passing any instance URL to the Splitgraph export API.
48+
*/
49+
SEAFOWL_INSTANCE_URL?: "https://demo.seafowl.cloud";
50+
51+
/**
52+
* Optionally provide the SEAFOWL_INSTANCE_SECRET to use for creating fallback tables
53+
* when an export fails.
54+
*/
55+
SEAFOWL_INSTANCE_SECRET?: string;
56+
57+
/**
58+
* Optionally provide the dbname to use for creating fallback tables
59+
* when an export fails.
60+
*
61+
* Note this MUST match the NEXT_PUBLIC_SPLITGRAPH_GITHUB_ANALYTICS_META_NAMESPACE
62+
*/
63+
SEAFOWL_INSTANCE_DATABASE?: string;
64+
4165
/**
4266
* The namespace of the repository in Splitgraph where metadata is stored
4367
* containing the state of imported GitHub repositories, which should contain

0 commit comments

Comments
 (0)