Skip to content

Commit f8ee99c

Browse files
nsinglaNelesh SinglaclaudeCopilotmprahl
authored
fix(backend): resolve use_secret_as_env pipeline params in ParallelFor (#13128)
* fix(sdk): propagate platform-config inputs across sub-DAG boundaries When `use_secret_as_env()` (or other Kubernetes platform helpers) reference a `PipelineParameterChannel`, the channel was not registered in the task's `_channel_inputs`. This meant the compiler did not "punch through" the parameter across sub-DAG boundaries (e.g. ParallelFor), causing the driver to fail with "parent DAG does not have input parameter". Additionally, the platform config references were not rewritten to use the prefixed parameter names that the compiler creates when surfacing parameters through sub-DAG boundaries. Changes: - Add `ensure_channel_input()` shared helper in common.py that registers PipelineChannel values in `task._channel_inputs` for sub-DAG propagation - Call it from `parse_k8s_parameter_input()` (secrets) and `_assign_pvc_name_to_msg()` (PVC mounts) for consistent behavior - Add `_rewrite_platform_config_input_references()` that rewrites both `componentInputParameter` and `taskOutputParameter` references in platform config when tasks are inside sub-DAGs - Pass parent component inputs and DAG task list to `platform_config_to_platform_spec()` for rewriting context - Apply rewriting in both `build_spec_by_group()` and exit handler paths Fixes #13078 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Nelesh Singla <117123879+nsingla@users.noreply.github.com> * test: add ParallelFor pipeline param propagation test to local execution tests Add a new test pipeline exercising ParallelFor with pipeline parameter propagation (without use_secret_as_env) and include it in the local execution test matrix for both SubprocessRunner and DockerRunner. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Nelesh Singla <117123879+nsingla@users.noreply.github.com> * test: add Argo compiled workflow golden files for secret and ParallelFor pipelines Add KFP IR and Argo compiled workflow golden files for: - pipeline_2_parallel_for_secret: use_secret_as_env with pipeline param inside ParallelFor - pipeline_4_nested_parallel_for_secret: use_secret_as_env with outer task output inside ParallelFor - pipeline_with_parallelfor_pipeline_param: ParallelFor with pipeline param propagation (no secrets) Generated via: cd backend/test/compiler && ginkgo -v -- -createGoldenFiles=true Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Nelesh Singla <117123879+nsingla@users.noreply.github.com> * fixes to the newly added pipelines with parallelFor and secrets Signed-off-by: Nelesh Singla <117123879+nsingla@users.noreply.github.com> * fix: address review feedback on task channel registration Co-authored-by: mprahl <11711106+mprahl@users.noreply.github.com> Agent-Logs-Url: https://github.com/kubeflow/pipelines/sessions/495cd171-f647-454f-b39a-5b303ee802c0 (cherry picked from commit 5d146fd) * fixing formatting issues and resolving pr comments incresing the pipeline run wait time to 12min splitting parallel and nested pipelines fixing argo versions in api tests Signed-off-by: Nelesh Singla <117123879+nsingla@users.noreply.github.com> --------- Signed-off-by: Nelesh Singla <117123879+nsingla@users.noreply.github.com> Co-authored-by: Nelesh Singla <bot@ambient-code.local> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: mprahl <11711106+mprahl@users.noreply.github.com>
1 parent e6f578f commit f8ee99c

185 files changed

Lines changed: 10642 additions & 315 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/actions/deploy/action.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ name: "Set up KFP on KinD"
22
description: "Step to start and configure KFP on Kind"
33

44
inputs:
5+
cluster_name:
6+
description: "Kind cluster name"
7+
default: 'kfp'
8+
required: false
59
pipeline_store:
610
description: "Flag to deploy KFP with K8s Native API"
711
default: 'database'
@@ -70,6 +74,12 @@ runs:
7074
docker image rm ${{ inputs.image_registry }}/$app:${{ inputs.image_tag }}
7175
docker image rm localhost:5000/$app:${{ inputs.image_tag }}
7276
done
77+
docker pull python:3.11
78+
docker pull registry.access.redhat.com/ubi9/python-311:latest
79+
kind load docker-image python:3.11 --name ${{ inputs.cluster_name }}
80+
kind load docker-image registry.access.redhat.com/ubi9/python-311:latest --name ${{ inputs.cluster_name }}
81+
docker image rm python:3.11
82+
docker image rm registry.access.redhat.com/ubi9/python-311:latest
7383
7484
- name: Configure Args
7585
shell: bash

.github/workflows/api-server-tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,13 @@ jobs:
6666
k8s_version: [ "v1.34.0" ]
6767
cache_enabled: [ "true", "false" ]
6868
proxy: [ "true", "false" ]
69-
argo_version: [ "v3.7.3", "v3.6.7" ]
69+
argo_version: [ "v3.7.3", "v3.5.14" ]
7070
pipeline_store: [ "database" ]
7171
pod_to_pod_tls_enabled: [ "false" ]
7272
include:
7373
- k8s_version: "v1.29.2"
7474
cache_enabled: "true"
75-
argo_version: "v3.6.7"
75+
argo_version: "v3.7.3"
7676
- k8s_version: "v1.29.2"
7777
cache_enabled: "true"
7878
argo_version: "v3.5.14"
@@ -167,7 +167,7 @@ jobs:
167167
k8s_version: [ "v1.34.0", "v1.29.2" ]
168168
cache_enabled: [ "true" ]
169169
uploadPipelinesWithKubernetesClient: [ "true", "false" ]
170-
argo_version: [ "v3.7.3", "v3.6.7" ]
170+
argo_version: [ "v3.7.3", "v3.5.14" ]
171171
pipeline_store: [ "kubernetes" ]
172172
pod_to_pod_tls_enabled: [ "false" ]
173173
include:

.github/workflows/e2e-test.yml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name: KFP E2E Pipeline tests
22
env:
33
E2E_TESTS_DIR: "./backend/test/end2end"
4-
NUMBER_OF_PARALLEL_NODES: 10
4+
NUMBER_OF_PARALLEL_NODES: 2
55
CLUSTER_NAME: "kfp"
66
NAMESPACE: "kubeflow"
77
PYTHON_VERSION: "3.9"
@@ -63,9 +63,9 @@ jobs:
6363
matrix:
6464
k8s_version: ["v1.34.0"]
6565
cache_enabled: ["true", "false"]
66-
argo_version: [ "v3.7.3", "v3.6.7", "v3.5.14"]
66+
argo_version: [ "v3.7.3", "v3.5.14"]
6767
proxy: [ "false" ]
68-
test_label: [ "E2ECritical" ]
68+
test_label: [ "E2ECritical", "E2EEssential", "E2EParallelNested" ]
6969
pod_to_pod_tls_enabled: [ "false" ]
7070
include:
7171
- k8s_version: "v1.29.2"
@@ -76,9 +76,6 @@ jobs:
7676
cache_enabled: "false"
7777
proxy: "true"
7878
test_label: "E2EProxy"
79-
- k8s_version: "v1.34.0"
80-
cache_enabled: "false"
81-
test_label: "E2EEssential"
8279
- k8s_version: "v1.34.0"
8380
cache_enabled: "false"
8481
test_label: "E2EFailure"
@@ -131,6 +128,10 @@ jobs:
131128
NAMESPACE=${{ inputs.namespace }}
132129
fi
133130
131+
if [ "$TEST_LABEL" == "E2EParallelNested" ]; then
132+
NUMBER_OF_NODES=1
133+
fi
134+
134135
{
135136
echo "NUMBER_OF_NODES=$NUMBER_OF_NODES"
136137
echo "TEST_LABEL=$TEST_LABEL"

.github/workflows/kfp-sdk-tests.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ concurrency:
2020
jobs:
2121
sdk-tests:
2222
runs-on: ubuntu-latest
23-
timeout-minutes: 45
23+
timeout-minutes: 50
2424
strategy:
2525
matrix:
2626
python-version: ['3.9', '3.13']
@@ -49,6 +49,7 @@ jobs:
4949
pip install pytest
5050
pip install docker
5151
pip install pytest-cov
52+
pip install pytest-xdist
5253
pip install google_cloud_pipeline_components
5354
pip install requests==2.28.1
5455
@@ -58,5 +59,6 @@ jobs:
5859
SETUP_ENV: false
5960
REPO_NAME: ${{ github.repository }}
6061
PULL_NUMBER: ${{ github.event.pull_request.number }}
62+
PYTEST_PARALLEL_WORKERS: 2
6163
run: |
6264
./test/presubmit-tests-sdk.sh

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ The Kubeflow pipelines service has the following goals:
2626

2727
### Dependencies Compatibility Matrix
2828

29-
| Dependency | Versions |
30-
| -------------- |------------------|
31-
| Argo Workflows | v3.5, v3.6, v3.7 |
32-
| MySQL | v8 |
29+
| Dependency | Versions |
30+
| -------------- |--------------|
31+
| Argo Workflows | v3.5, v3.7 |
32+
| MySQL | v8 |
3333

3434
## Documentation
3535

backend/src/v2/cmd/driver/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func handleExecution(execution *driver.Execution, driverType string, executionPa
312312
return fmt.Errorf("failed to write iteration count to file: %w", err)
313313
}
314314
} else {
315-
if driverType == ROOT_DAG {
315+
if driverType == ROOT_DAG || driverType == DAG {
316316
if err := writeFile(executionPaths.IterationCount, []byte("0")); err != nil {
317317
return fmt.Errorf("failed to write iteration count to file: %w", err)
318318
}
@@ -329,7 +329,7 @@ func handleExecution(execution *driver.Execution, driverType string, executionPa
329329
}
330330
} else {
331331
// nil is a valid value for Condition
332-
if driverType == ROOT_DAG || driverType == CONTAINER {
332+
if driverType == ROOT_DAG || driverType == DAG || driverType == CONTAINER {
333333
if err := writeFile(executionPaths.Condition, []byte("nil")); err != nil {
334334
return fmt.Errorf("failed to write condition to file: %w", err)
335335
}

backend/src/v2/cmd/driver/main_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,26 @@ func Test_handleExecutionRootDAG(t *testing.T) {
8989
cleanup(t, executionPaths)
9090
}
9191

92+
func Test_handleExecutionDAG(t *testing.T) {
93+
execution := &driver.Execution{}
94+
95+
executionPaths := &ExecutionPaths{
96+
IterationCount: "iteration_count.txt",
97+
Condition: "condition.txt",
98+
}
99+
100+
err := handleExecution(execution, DAG, executionPaths)
101+
102+
if err != nil {
103+
t.Errorf("Unexpected error: %v", err)
104+
}
105+
106+
verifyFileContent(t, executionPaths.IterationCount, "0")
107+
verifyFileContent(t, executionPaths.Condition, "nil")
108+
109+
cleanup(t, executionPaths)
110+
}
111+
92112
func cleanup(t *testing.T, executionPaths *ExecutionPaths) {
93113
removeIfExists(t, executionPaths.IterationCount)
94114
removeIfExists(t, executionPaths.ExecutionID)

backend/test/compiler/compiler_visitor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ var _ = Describe("Verify iteration over the pipeline components >", Label(POSITI
9292
},
9393
},
9494
{
95-
pipelineSpecPath: "critical/nested_pipeline_opt_input_child_level_compiled.yaml",
95+
pipelineSpecPath: "parallel_and_nested/nested_pipeline_opt_input_child_level_compiled.yaml",
9696
expectedVisited: []string{
9797
"container(name=\"comp-component-a-bool\")",
9898
"container(name=\"comp-component-a-int\")",

backend/test/constants/test_features.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ const (
2828
E2eFailed string = "E2EFailure"
2929
// E2eCritical - For pipelines that verify the critical functionality of the system
3030
E2eCritical string = "E2ECritical"
31+
// E2eParallelNested - For parallel or nested pipelines for regression testing
32+
E2eParallelNested string = "E2EParallelNested"
3133
// E2eProxy - For pipeline that runs with a proxy
3234
E2eProxy string = "E2EProxy"
3335

backend/test/end2end/e2e_suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var randomName string
3838
var experimentID *string = nil
3939
var userToken string
4040

41-
const maxPipelineWaitTime = 540 // In Seconds
41+
const maxPipelineWaitTime = 720 // In Seconds
4242

4343
var (
4444
pipelineUploadClient apiserver.PipelineUploadInterface

0 commit comments

Comments
 (0)