Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/v1alpha1/nodeset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ type NodeSetSpec struct {
// +kubebuilder:default="20%"
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`

// MaxConcurrentStartup caps the number of worker pods created in parallel
// during initial NodeSet scale-out (i.e. cluster creation or NodeSet growth).
// Value can be an absolute number (ex: 500) or a percentage of desired pods (ex: 10%).
// Maps to the underlying kruise AdvancedStatefulSet's scaleStrategy.maxUnavailable.
// Prevents overloading the Slurm controller with simultaneous slurmd registrations
// on large clusters.
// Defaults to 500.
//
// +kubebuilder:validation:Optional
// +kubebuilder:default=500
MaxConcurrentStartup *intstr.IntOrString `json:"maxConcurrentStartup,omitempty"`

// EphemeralNodes enables ephemeral node behavior for this NodeSet.
// When true, nodes will use dynamic topology injection instead of legacy topology.conf.
// Topology data is read from the topology-node-labels ConfigMap at runtime
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions config/crd/bases/slurm.nebius.ai_nodesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2562,6 +2562,20 @@ spec:
format: int32
minimum: 0
type: integer
maxConcurrentStartup:
anyOf:
- type: integer
- type: string
default: 500
description: |-
MaxConcurrentStartup caps the number of worker pods created in parallel
during initial NodeSet scale-out (i.e. cluster creation or NodeSet growth).
Value can be an absolute number (ex: 500) or a percentage of desired pods (ex: 10%).
Maps to the underlying kruise AdvancedStatefulSet's scaleStrategy.maxUnavailable.
Prevents overloading the Slurm controller with simultaneous slurmd registrations
on large clusters.
Defaults to 500.
x-kubernetes-int-or-string: true
maxUnavailable:
anyOf:
- type: integer
Expand Down
4 changes: 4 additions & 0 deletions helm/nodesets/templates/nodeset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ spec:
maxUnavailable: {{ . }}
{{- end }}

{{- with .maxConcurrentStartup }}
maxConcurrentStartup: {{ . }}
{{- end }}

{{- if .ephemeralNodes }}
ephemeralNodes: {{ .ephemeralNodes }}
{{- end }}
Expand Down
53 changes: 53 additions & 0 deletions helm/nodesets/tests/node_config_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,59 @@ tests:
value: 2
documentIndex: 1

- it: should configure maxConcurrentStartup when set and omit it when unset
set:
nodesets:
- name: gpu-workers
replicas: 3
maxConcurrentStartup: 250
slurmd:
image:
repository: "test/slurm"
resources:
cpu: "4"
memory: "8Gi"
volumes:
spool:
emptyDir: {}
jail:
emptyDir: {}
jailSubMounts: []
munge:
image:
repository: "test/munge"
resources:
cpu: "100m"
memory: "128Mi"
- name: cpu-workers
replicas: 5
slurmd:
image:
repository: "test/slurm"
resources:
cpu: "2"
memory: "4Gi"
volumes:
spool:
emptyDir: {}
jail:
emptyDir: {}
jailSubMounts: []
munge:
image:
repository: "test/munge"
resources:
cpu: "50m"
memory: "64Mi"
asserts:
- equal:
path: spec.maxConcurrentStartup
value: 250
documentIndex: 0
- notExists:
path: spec.maxConcurrentStartup
documentIndex: 1

- it: should configure worker annotations correctly
set:
nodesets:
Expand Down
6 changes: 6 additions & 0 deletions helm/nodesets/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ nodesets:
# Could be a count (number) or percent (string)
# Optional, defaults to 20%
maxUnavailable: 1
# Maximum number of worker pods that can be created in parallel during
# initial scale-out, to avoid overloading the Slurm controller with
# simultaneous slurmd registrations.
# Could be a count (number) or percent (string).
# Optional, defaults to 500
maxConcurrentStartup: 500
# Enable ephemeral node behavior for this NodeSet.
# When true, nodes will use dynamic topology injection instead of legacy topology.conf.
# Topology data is read from the topology-node-labels ConfigMap at runtime.
Expand Down
14 changes: 14 additions & 0 deletions helm/soperator-crds/templates/slurmcluster-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17656,6 +17656,20 @@ spec:
format: int32
minimum: 0
type: integer
maxConcurrentStartup:
anyOf:
- type: integer
- type: string
default: 500
description: |-
MaxConcurrentStartup caps the number of worker pods created in parallel
during initial NodeSet scale-out (i.e. cluster creation or NodeSet growth).
Value can be an absolute number (ex: 500) or a percentage of desired pods (ex: 10%).
Maps to the underlying kruise AdvancedStatefulSet's scaleStrategy.maxUnavailable.
Prevents overloading the Slurm controller with simultaneous slurmd registrations
on large clusters.
Defaults to 500.
x-kubernetes-int-or-string: true
maxUnavailable:
anyOf:
- type: integer
Expand Down
14 changes: 14 additions & 0 deletions helm/soperator/crds/slurmcluster-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17656,6 +17656,20 @@ spec:
format: int32
minimum: 0
type: integer
maxConcurrentStartup:
anyOf:
- type: integer
- type: string
default: 500
description: |-
MaxConcurrentStartup caps the number of worker pods created in parallel
during initial NodeSet scale-out (i.e. cluster creation or NodeSet growth).
Value can be an absolute number (ex: 500) or a percentage of desired pods (ex: 10%).
Maps to the underlying kruise AdvancedStatefulSet's scaleStrategy.maxUnavailable.
Prevents overloading the Slurm controller with simultaneous slurmd registrations
on large clusters.
Defaults to 500.
x-kubernetes-int-or-string: true
maxUnavailable:
anyOf:
- type: integer
Expand Down
1 change: 1 addition & 0 deletions internal/controller/reconciler/k8s_statefulset_advanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (r *AdvancedStatefulSetReconciler) patch(existing, desired client.Object) (
}
dst.Spec.Replicas = src.Spec.Replicas
dst.Spec.UpdateStrategy = src.Spec.UpdateStrategy
dst.Spec.ScaleStrategy = src.Spec.ScaleStrategy
dst.Spec.Template.Spec = src.Spec.Template.Spec
dst.Spec.ReserveOrdinals = src.Spec.ReserveOrdinals
dst.Spec.PersistentVolumeClaimRetentionPolicy = src.Spec.PersistentVolumeClaimRetentionPolicy
Expand Down
43 changes: 43 additions & 0 deletions internal/controller/reconciler/k8s_statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

slurmv1 "nebius.ai/slurm-operator/api/v1"
Expand Down Expand Up @@ -184,6 +185,48 @@ func TestAdvancedStatefulSetPatchCopiesPVCDeletionPolicy(t *testing.T) {
}
}

func TestAdvancedStatefulSetPatchCopiesScaleStrategy(t *testing.T) {
tests := []struct {
name string
existing *kruisev1b1.StatefulSetScaleStrategy
desired *kruisev1b1.StatefulSetScaleStrategy
}{
{
name: "absolute MaxUnavailable is propagated",
existing: &kruisev1b1.StatefulSetScaleStrategy{MaxUnavailable: ptrIntOrString(intstr.FromInt32(100))},
desired: &kruisev1b1.StatefulSetScaleStrategy{MaxUnavailable: ptrIntOrString(intstr.FromInt32(500))},
},
{
name: "percentage MaxUnavailable is propagated",
existing: &kruisev1b1.StatefulSetScaleStrategy{MaxUnavailable: ptrIntOrString(intstr.FromString("5%"))},
desired: &kruisev1b1.StatefulSetScaleStrategy{MaxUnavailable: ptrIntOrString(intstr.FromString("25%"))},
},
{
name: "newly set ScaleStrategy is populated",
existing: nil,
desired: &kruisev1b1.StatefulSetScaleStrategy{MaxUnavailable: ptrIntOrString(intstr.FromInt32(500))},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
existing := &kruisev1b1.StatefulSet{Spec: kruisev1b1.StatefulSetSpec{ScaleStrategy: tt.existing}}
desired := &kruisev1b1.StatefulSet{Spec: kruisev1b1.StatefulSetSpec{ScaleStrategy: tt.desired}}

r := &AdvancedStatefulSetReconciler{}
if _, err := r.patch(existing, desired); err != nil {
t.Fatalf("patch returned error: %v", err)
}

if !equality.Semantic.DeepEqual(existing.Spec.ScaleStrategy, tt.desired) {
t.Fatalf("expected ScaleStrategy=%+v, got %+v", tt.desired, existing.Spec.ScaleStrategy)
}
})
}
}

func ptrIntOrString(v intstr.IntOrString) *intstr.IntOrString { return &v }

func TestStatefulSetPatchCopiesPVCDeletionPolicy(t *testing.T) {
existing := &appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{
Expand Down
109 changes: 57 additions & 52 deletions internal/render/common/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,15 @@ func AddNodeSetsToSlurmConfig(res *renderutils.PropertiesConfig, cluster *values
}
}

// AddNodesToSlurmConfig adds all node names to the slurm config
// AddNodesToSlurmConfig adds all node names to the slurm config.
//
// All replicas in a NodeSet share an identical body, so we emit one line per
// NodeSet using Slurm hostlist range syntax. This keeps the ConfigMap well
// under the Kubernetes 1 MiB object size limit on large clusters.
//
// Example output:
// NodeName=gb200-0-0 State=CLOUD NodeHostname=gb200-0-0 NodeAddr=gb200-0-0.gb200-0.soperator.svc RealMemory=1612639 Features=platform-gb200,gb200-rack-0 Gres=gpu:nvidia-b200:4 NodeCPUs=128 Boards=1 SocketsPerBoard=2 CoresPerSocket=32 ThreadsPerCode=2
// NodeName=gb200-0-1 State=CLOUD NodeHostname=gb200-0-1 NodeAddr=gb200-0-1.gb200-0.soperator.svc RealMemory=1612639 Features=platform-gb200,gb200-rack-0 Gres=gpu:nvidia-b200:4 NodeCPUs=128 Boards=1 SocketsPerBoard=2 CoresPerSocket=32 ThreadsPerCode=2
// NodeName=gb200-0-0 State=CLOUD NodeAddr=gb200-0-0.gb200-0.soperator.svc RealMemory=1612639 Feature=platform-gb200,gb200-rack-0 Gres=gpu:nvidia-b200:4 NodeCPUs=128 Boards=1 SocketsPerBoard=2 CoresPerSocket=32 ThreadsPerCode=2
// NodeName=gb200-1-[0-17] State=CLOUD NodeAddr=gb200-1-[0-17].gb200-1.soperator.svc RealMemory=1612639 Feature=platform-gb200,gb200-rack-1 Gres=gpu:nvidia-b200:4 NodeCPUs=128 Boards=1 SocketsPerBoard=2 CoresPerSocket=32 ThreadsPerCode=2
func AddNodesToSlurmConfig(res *renderutils.PropertiesConfig, cluster *values.SlurmCluster) {
res.AddComment("Nodes section")

Expand All @@ -125,59 +129,60 @@ func AddNodesToSlurmConfig(res *renderutils.PropertiesConfig, cluster *values.Sl
continue
}

for i := int32(0); i < nodeSet.Spec.Replicas; i++ {
nodeName := fmt.Sprintf("%s-%d", nodeSet.Name, i)

nodeAddr := fmt.Sprintf(
"%s.%s",
nodeName,
naming.BuildNodeSetUmbrellaServiceFQDN(nodeSet.Namespace, cluster.Name),
)
realMemory := strconv.FormatInt(
RenderRealMemorySlurmd(corev1.ResourceRequirements{Requests: nodeSet.Spec.Slurmd.Resources}),
10,
)

var nodeConfigParts []string
nodeConfigParts = append(nodeConfigParts,
fmt.Sprintf("NodeHostname=%s", nodeName),
fmt.Sprintf("NodeAddr=%s", nodeAddr),
fmt.Sprintf("RealMemory=%s", realMemory),
)
if nodeSet.Spec.Slurmd.Port != 0 {
nodeConfigParts = append(nodeConfigParts, fmt.Sprintf("Port=%d", nodeSet.Spec.Slurmd.Port))
}
nodeConfig := strings.Join(nodeConfigParts, " ")
var nodeRange string
if nodeSet.Spec.Replicas == 1 {
nodeRange = fmt.Sprintf("%s-0", nodeSet.Name)
} else {
nodeRange = fmt.Sprintf("%s-[0-%d]", nodeSet.Name, nodeSet.Spec.Replicas-1)
}

if len(nodeSet.Spec.NodeConfig.Features) > 0 {
features := strings.Join(nodeSet.Spec.NodeConfig.Features, ",")
nodeConfig = fmt.Sprintf("%s %s%s", nodeConfig, nodeFeatureKey, features)
}
nodeAddr := fmt.Sprintf(
"%s.%s",
nodeRange,
naming.BuildNodeSetUmbrellaServiceFQDN(nodeSet.Namespace, cluster.Name),
)
realMemory := strconv.FormatInt(
RenderRealMemorySlurmd(corev1.ResourceRequirements{Requests: nodeSet.Spec.Slurmd.Resources}),
10,
)

var nodeConfigParts []string
nodeConfigParts = append(nodeConfigParts,
fmt.Sprintf("NodeAddr=%s", nodeAddr),
fmt.Sprintf("RealMemory=%s", realMemory),
)
if nodeSet.Spec.Slurmd.Port != 0 {
nodeConfigParts = append(nodeConfigParts, fmt.Sprintf("Port=%d", nodeSet.Spec.Slurmd.Port))
}
nodeConfig := strings.Join(nodeConfigParts, " ")

// Remove feature and state overrides
staticConfig := strings.Join(
slices.Filter(
nil,
strings.Split(nodeSet.Spec.NodeConfig.Static, " "),
func(s string) bool {
return !strings.HasPrefix(s, nodeFeatureKey) &&
!strings.HasPrefix(s, stateKey)
},
),
" ",
)

if len(nodeConfig) > 0 {
nodeConfig = fmt.Sprintf("%s %s", nodeConfig, staticConfig)
}
if len(nodeSet.Spec.NodeConfig.Features) > 0 {
features := strings.Join(nodeSet.Spec.NodeConfig.Features, ",")
nodeConfig = fmt.Sprintf("%s %s%s", nodeConfig, nodeFeatureKey, features)
}

// Create static nodes with state CLOUD.
// Otherwise, nodes will disappear from the Slurm state every time the corresponding K8s pods don't run.
res.AddProperty(
"NodeName",
fmt.Sprintf("%s State=CLOUD %s", nodeName, nodeConfig),
)
staticConfig := strings.Join(
slices.Filter(
nil,
strings.Split(nodeSet.Spec.NodeConfig.Static, " "),
func(s string) bool {
return !strings.HasPrefix(s, nodeFeatureKey) &&
!strings.HasPrefix(s, stateKey)
},
),
" ",
)

if len(nodeConfig) > 0 {
nodeConfig = fmt.Sprintf("%s %s", nodeConfig, staticConfig)
}

// State=CLOUD keeps nodes registered in Slurm even when the
// corresponding K8s pods are not running.
res.AddProperty(
"NodeName",
fmt.Sprintf("%s State=CLOUD %s", nodeRange, nodeConfig),
)
}
}

Expand Down
Loading
Loading