Skip to content

Commit c6781ba

Browse files
author
aditya-systems-hub
committed
Fix non-idempotent spark-submit in RetryOnConflict causing orphaned driver pods
1 parent 90c6dfc commit c6781ba

2 files changed

Lines changed: 144 additions & 15 deletions

File tree

internal/controller/sparkapplication/controller.go

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -306,20 +306,34 @@ func (r *Reconciler) handleSparkApplicationDeletion(ctx context.Context, req ctr
306306
func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
307307
logger := log.FromContext(ctx)
308308
key := req.NamespacedName
309+
310+
// Fetch and validate state outside the retry loop.
311+
old, err := r.getSparkApplication(ctx, key)
312+
if err != nil {
313+
return ctrl.Result{Requeue: true}, err
314+
}
315+
if old.Status.AppState.State != v1beta2.ApplicationStateNew {
316+
return ctrl.Result{}, nil
317+
}
318+
319+
// Submit once, outside the retry loop. submitSparkApplication is idempotent —
320+
// if a driver pod already exists it will recover state instead of re-submitting.
321+
app := old.DeepCopy()
322+
r.submitSparkApplication(ctx, app)
323+
324+
// Use RetryOnConflict only for the status update.
309325
retryErr := retry.RetryOnConflict(
310326
retry.DefaultRetry,
311327
func() error {
312-
old, err := r.getSparkApplication(ctx, key)
328+
fresh, err := r.getSparkApplication(ctx, key)
313329
if err != nil {
314330
return err
315331
}
316-
if old.Status.AppState.State != v1beta2.ApplicationStateNew {
332+
if fresh.Status.AppState.State != v1beta2.ApplicationStateNew {
317333
return nil
318334
}
319-
app := old.DeepCopy()
320-
321-
r.submitSparkApplication(ctx, app)
322-
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
335+
fresh.Status = app.Status
336+
if err := r.updateSparkApplicationStatus(ctx, fresh); err != nil {
323337
return err
324338
}
325339
return nil
@@ -831,22 +845,34 @@ func (r *Reconciler) reconcileSuspendedSparkApplication(ctx context.Context, req
831845
func (r *Reconciler) reconcileResumingSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
832846
logger := log.FromContext(ctx)
833847
key := req.NamespacedName
848+
849+
// Fetch and validate state outside the retry loop.
850+
old, err := r.getSparkApplication(ctx, key)
851+
if err != nil {
852+
return ctrl.Result{Requeue: true}, err
853+
}
854+
if old.Status.AppState.State != v1beta2.ApplicationStateResuming {
855+
return ctrl.Result{}, nil
856+
}
857+
858+
// Submit once, outside the retry loop. submitSparkApplication is idempotent.
859+
app := old.DeepCopy()
860+
r.recordSparkApplicationEvent(app)
861+
r.submitSparkApplication(ctx, app)
862+
863+
// Use RetryOnConflict only for the status update.
834864
retryErr := retry.RetryOnConflict(
835865
retry.DefaultRetry,
836866
func() error {
837-
old, err := r.getSparkApplication(ctx, key)
867+
fresh, err := r.getSparkApplication(ctx, key)
838868
if err != nil {
839869
return err
840870
}
841-
if old.Status.AppState.State != v1beta2.ApplicationStateResuming {
871+
if fresh.Status.AppState.State != v1beta2.ApplicationStateResuming {
842872
return nil
843873
}
844-
app := old.DeepCopy()
845-
846-
r.recordSparkApplicationEvent(app)
847-
848-
r.submitSparkApplication(ctx, app)
849-
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
874+
fresh.Status = app.Status
875+
if err := r.updateSparkApplicationStatus(ctx, fresh); err != nil {
850876
return err
851877
}
852878
return nil
@@ -897,13 +923,39 @@ func (r *Reconciler) getSparkApplication(ctx context.Context, key types.Namespac
897923

898924
// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
899925
// The submission result are recorded in app.Status.{AppState,ExecutionAttempts}.
926+
// This method is idempotent: if a driver pod already exists for this application (e.g. from a previous
927+
// submission attempt whose status update failed), the submission state is recovered from the existing pod
928+
// instead of re-running spark-submit.
900929
func (r *Reconciler) submitSparkApplication(ctx context.Context, app *v1beta2.SparkApplication) {
901930
logger := log.FromContext(ctx)
902931
logger.Info("Submitting SparkApplication", "state", app.Status.AppState.State)
903932

933+
// Check if a driver pod from a previous submission attempt already exists.
934+
driverPodName := util.GetDriverPodName(app)
935+
existingPod := &corev1.Pod{}
936+
podKey := types.NamespacedName{Name: driverPodName, Namespace: app.Namespace}
937+
if err := r.client.Get(ctx, podKey, existingPod); err == nil {
938+
// Pod exists — verify it belongs to this application and was launched by the operator.
939+
if existingPod.Labels[common.LabelSparkAppName] == app.Name &&
940+
existingPod.Labels[common.LabelLaunchedBySparkOperator] == "true" {
941+
logger.Info("Found existing driver pod from previous submission attempt, recovering state",
942+
"pod", driverPodName, "submissionID", existingPod.Labels[common.LabelSubmissionID])
943+
app.Status.SubmissionID = existingPod.Labels[common.LabelSubmissionID]
944+
app.Status.DriverInfo.PodName = driverPodName
945+
app.Status.LastSubmissionAttemptTime = metav1.Now()
946+
app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1
947+
app.Status.AppState = v1beta2.ApplicationState{
948+
State: v1beta2.ApplicationStateSubmitted,
949+
}
950+
app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1
951+
r.recordSparkApplicationEvent(app)
952+
return
953+
}
954+
}
955+
904956
// SubmissionID must be set before creating any resources to ensure all the resources are labeled.
905957
app.Status.SubmissionID = uuid.New().String()
906-
app.Status.DriverInfo.PodName = util.GetDriverPodName(app)
958+
app.Status.DriverInfo.PodName = driverPodName
907959
app.Status.LastSubmissionAttemptTime = metav1.Now()
908960
app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1
909961

internal/controller/sparkapplication/controller_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,83 @@ var _ = Describe("SparkApplication Controller", func() {
13741374
})
13751375
})
13761376
})
1377+
1378+
Context("When reconciling a new SparkApplication with a pre-existing driver pod", func() {
1379+
ctx := context.Background()
1380+
appName := "test-idempotent"
1381+
appNamespace := "default"
1382+
key := types.NamespacedName{
1383+
Name: appName,
1384+
Namespace: appNamespace,
1385+
}
1386+
existingSubmissionID := "previous-submission-id-12345"
1387+
1388+
BeforeEach(func() {
1389+
By("Creating a SparkApplication in New state")
1390+
app := &v1beta2.SparkApplication{
1391+
ObjectMeta: metav1.ObjectMeta{
1392+
Name: appName,
1393+
Namespace: appNamespace,
1394+
},
1395+
Spec: v1beta2.SparkApplicationSpec{
1396+
MainApplicationFile: ptr.To("local:///dummy.jar"),
1397+
},
1398+
}
1399+
v1beta2.SetSparkApplicationDefaults(app)
1400+
Expect(k8sClient.Create(ctx, app)).To(Succeed())
1401+
1402+
By("Pre-creating a driver pod with submission labels")
1403+
driverPod := createDriverPod(appName, appNamespace)
1404+
driverPod.Labels[common.LabelSubmissionID] = existingSubmissionID
1405+
Expect(k8sClient.Create(ctx, driverPod)).To(Succeed())
1406+
})
1407+
1408+
AfterEach(func() {
1409+
By("Deleting the test SparkApplication")
1410+
app := &v1beta2.SparkApplication{
1411+
ObjectMeta: metav1.ObjectMeta{
1412+
Name: appName,
1413+
Namespace: appNamespace,
1414+
},
1415+
}
1416+
Expect(client.IgnoreNotFound(k8sClient.Delete(ctx, app))).To(Succeed())
1417+
1418+
By("Deleting the driver pod")
1419+
driverKey := getDriverNamespacedName(appName, appNamespace)
1420+
driver := &corev1.Pod{
1421+
ObjectMeta: metav1.ObjectMeta{
1422+
Name: driverKey.Name,
1423+
Namespace: driverKey.Namespace,
1424+
},
1425+
}
1426+
Expect(client.IgnoreNotFound(k8sClient.Delete(ctx, driver))).To(Succeed())
1427+
})
1428+
1429+
It("Should recover submission state from existing driver pod instead of re-submitting", func() {
1430+
By("Reconciling the new SparkApplication")
1431+
reconciler := sparkapplication.NewReconciler(
1432+
nil,
1433+
k8sClient.Scheme(),
1434+
k8sClient,
1435+
record.NewFakeRecorder(3),
1436+
nil,
1437+
&sparkapplication.SparkSubmitter{},
1438+
sparkapplication.Options{Namespaces: []string{appNamespace}},
1439+
)
1440+
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
1441+
Expect(err).NotTo(HaveOccurred())
1442+
Expect(result.Requeue).To(BeFalse())
1443+
1444+
By("Checking that the app transitioned to Submitted with the recovered SubmissionID")
1445+
app := &v1beta2.SparkApplication{}
1446+
Expect(k8sClient.Get(ctx, key, app)).NotTo(HaveOccurred())
1447+
Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateSubmitted))
1448+
Expect(app.Status.SubmissionID).To(Equal(existingSubmissionID))
1449+
Expect(app.Status.DriverInfo.PodName).To(Equal(fmt.Sprintf("%s-driver", appName)))
1450+
Expect(app.Status.SubmissionAttempts).To(Equal(int32(1)))
1451+
Expect(app.Status.ExecutionAttempts).To(Equal(int32(1)))
1452+
})
1453+
})
13771454
})
13781455

13791456
func getDriverNamespacedName(appName string, appNamespace string) types.NamespacedName {

0 commit comments

Comments
 (0)