From 7d44dba256511f2b669f05107059599b1313c799 Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 29 Jan 2025 19:09:17 -0700 Subject: [PATCH 1/4] attempt: adding satisfy I can format the jobspec correctly, but then when the formatting errors go away it silently returns no response and no errors, so I cannot debug further. Signed-off-by: vsoch --- README.md | 1 + api/v1alpha1/fluxjob_types.go | 4 +- api/v1alpha1/submit.go | 16 +-- chart/templates/fluxjob-crd.yaml | 2 +- ...jobs.converged-computing.org_fluxjobs.yaml | 2 +- go.mod | 15 +-- go.sum | 13 +-- hack/quick-build-kind.sh | 2 +- hack/quick-deploy-kind.sh | 2 +- pkg/fluxqueue/fluxqueue.go | 2 +- pkg/fluxqueue/strategy/workers/job.go | 100 ++++++++++++++---- pkg/jobspec/jobspec.go | 86 +++++++++------ pkg/jobspec/resources.go | 54 ++++++++++ 13 files changed, 214 insertions(+), 85 deletions(-) create mode 100644 pkg/jobspec/resources.go diff --git a/README.md b/README.md index b016456..12eab68 100644 --- a/README.md +++ b/README.md @@ -239,6 +239,7 @@ SELECT * from reservations; - we need to use shrink or partial cancel here. And a shrink down to size 0 I assume is a cancel. - [ ] For cancel, we would issue a cancel for every pod associated with a job. How can we avoid that (or is that OK?) - [ ] we will eventually need another mechanism to move schedule queue aside from new submission +- What if instead of loop we do reservation every N jobs? Then we wouldn't need a loop? - [ ] scheduleAt can be used to AskFlux in the future - [ ] Nodes that are currently assigned need to be taken into account - Right now they aren't included in resources, but instead should be "given" to Fluxion. diff --git a/api/v1alpha1/fluxjob_types.go b/api/v1alpha1/fluxjob_types.go index a6d8468..950aa8c 100644 --- a/api/v1alpha1/fluxjob_types.go +++ b/api/v1alpha1/fluxjob_types.go @@ -104,9 +104,9 @@ type FluxJobSpec struct { // +optional Reservation bool `json:"reservation,omitempty"` - // Nodes needed for the job + // Slots needed for the job // +optional - Nodes int32 `json:"nodes"` + Slots int32 `json:"nodes"` // Resources assigned // +optional diff --git a/api/v1alpha1/submit.go b/api/v1alpha1/submit.go index 6fde320..b742054 100644 --- a/api/v1alpha1/submit.go +++ b/api/v1alpha1/submit.go @@ -2,9 +2,7 @@ package v1alpha1 import ( "context" - "fmt" - jobspec "github.com/compspec/jobspec-go/pkg/jobspec/v1" jspec "github.com/converged-computing/fluxqueue/pkg/jobspec" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -21,13 +19,14 @@ var ( ) // SubmitFluxJob wraps a pod or job spec into a FluxJob -// We essentially create a CRD for a a FluxJob +// We essentially create a CRD for a a FluxJob. Note that we are asking +// for SLOTS and not nodes - a slot can be a part of a node. func SubmitFluxJob( ctx context.Context, jobType JobWrapped, name string, namespace string, - nodes int32, + slots int32, containers []corev1.Container, ) error { @@ -51,14 +50,15 @@ func SubmitFluxJob( slog.Error(err, "Issue with getting job", "Namespace", namespace, "Name", jobName) return err } - resources := jspec.GeneratePodResources(containers) + resources := jspec.GeneratePodResources(containers, slots) // Artificially create a command for the name and namespace - command := fmt.Sprintf("echo %s %s", namespace, name) + command := []string{"echo", namespace, name} // Generate a jobspec for that many nodes (starting simple) // TODO will need to add GPU and memory here... if Flux supports memory - js, err := jobspec.NewSimpleJobspec(name, command, nodes, resources.Cpu) + js, err := jspec.NewJobspec(name, command, resources) + if err != nil { slog.Error(err, "Issue with creating job", "Namespace", namespace, "Name", jobName) return err @@ -80,7 +80,7 @@ func SubmitFluxJob( ObjectMeta: metav1.ObjectMeta{Name: jobName, Namespace: namespace}, Spec: FluxJobSpec{ JobSpec: jsString, - Nodes: nodes, + Slots: slots, Type: jobType, Name: name, }, diff --git a/chart/templates/fluxjob-crd.yaml b/chart/templates/fluxjob-crd.yaml index 7da0897..7a3f797 100644 --- a/chart/templates/fluxjob-crd.yaml +++ b/chart/templates/fluxjob-crd.yaml @@ -76,7 +76,7 @@ spec: description: Original name of the job type: string nodes: - description: Nodes needed for the job + description: Slots needed for the job format: int32 type: integer object: diff --git a/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml b/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml index 4ee48dc..753389f 100644 --- a/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml +++ b/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml @@ -63,7 +63,7 @@ spec: description: Original name of the job type: string nodes: - description: Nodes needed for the job + description: Slots needed for the job format: int32 type: integer object: diff --git a/go.mod b/go.mod index a64e7a1..0e89a0a 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,6 @@ module github.com/converged-computing/fluxqueue go 1.22.0 require ( - github.com/compspec/jobspec-go v0.0.0-20240510054255-ee02cdc7d3d4 - github.com/converged-computing/fluxion v0.0.0-20250105140137-04388a62d0fa - github.com/jackc/pgx v3.6.2+incompatible github.com/jackc/pgx/v5 v5.7.2 github.com/onsi/ginkgo/v2 v2.19.0 github.com/onsi/gomega v1.33.1 @@ -14,14 +11,18 @@ require ( github.com/riverqueue/river/rivershared v0.15.0 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 + gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.31.2 k8s.io/apimachinery v0.31.2 k8s.io/client-go v0.31.2 - k8s.io/klog v1.0.0 - k8s.io/klog/v2 v2.130.1 sigs.k8s.io/controller-runtime v0.19.3 ) +require ( + github.com/compspec/jobspec-go v0.0.0-20250130015255-577b7ffe2599 // indirect + github.com/converged-computing/fluxion v0.0.0-20250130005615-dcc147715430 // indirect +) + require ( github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect @@ -30,7 +31,7 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // jo github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect @@ -106,11 +107,11 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.31.2 // indirect k8s.io/apiserver v0.31.2 // indirect k8s.io/component-base v0.31.2 // indirect + k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 // indirect diff --git a/go.sum b/go.sum index f13573e..7b0736e 100644 --- a/go.sum +++ b/go.sum @@ -10,10 +10,10 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/compspec/jobspec-go v0.0.0-20240510054255-ee02cdc7d3d4 h1:4MaTp3OcUmp6HFEojeI//GthUt7GMYnB8K5OSZdKxZA= -github.com/compspec/jobspec-go v0.0.0-20240510054255-ee02cdc7d3d4/go.mod h1:BaJyxaOhESe2DD4lqBdwTEWOw0TaTZVJGPrFh6KyXQM= -github.com/converged-computing/fluxion v0.0.0-20250105140137-04388a62d0fa h1:F5/pXXI5F0jC4XG/nAgN65gBS7+2SYKlwdzkVzawPdI= -github.com/converged-computing/fluxion v0.0.0-20250105140137-04388a62d0fa/go.mod h1:tNlvJY1yFWpp/QqdqJnq8YMGYG99K6YMxDmpu9IVS1E= +github.com/compspec/jobspec-go v0.0.0-20250130015255-577b7ffe2599 h1:GbFDx46/I/hkB+GJ+7/xqIX8fg9JJcNkhAQwmy5Eofw= +github.com/compspec/jobspec-go v0.0.0-20250130015255-577b7ffe2599/go.mod h1:BaJyxaOhESe2DD4lqBdwTEWOw0TaTZVJGPrFh6KyXQM= +github.com/converged-computing/fluxion v0.0.0-20250130005615-dcc147715430 h1:IvAuMqxEVmjbq112eRIYuqE4zf2IGFFo2ADF4ueSy1I= +github.com/converged-computing/fluxion v0.0.0-20250130005615-dcc147715430/go.mod h1:tNlvJY1yFWpp/QqdqJnq8YMGYG99K6YMxDmpu9IVS1E= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -32,7 +32,6 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= -github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -81,8 +80,6 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= -github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= @@ -280,8 +277,6 @@ k8s.io/client-go v0.31.2 h1:Y2F4dxU5d3AQj+ybwSMqQnpZH9F30//1ObxOKlTI9yc= k8s.io/client-go v0.31.2/go.mod h1:NPa74jSVR/+eez2dFsEIHNa+3o09vtNaWwWwb1qSxSs= k8s.io/component-base v0.31.2 h1:Z1J1LIaC0AV+nzcPRFqfK09af6bZ4D1nAOpWsy9owlA= k8s.io/component-base v0.31.2/go.mod h1:9PeyyFN/drHjtJZMCTkSpQJS3U9OXORnHQqMLDz0sUQ= -k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= -k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= diff --git a/hack/quick-build-kind.sh b/hack/quick-build-kind.sh index 12fe2e4..f3acbdd 100755 --- a/hack/quick-build-kind.sh +++ b/hack/quick-build-kind.sh @@ -28,7 +28,7 @@ helm install \ --set scheduler.image=${REGISTRY}/fluxqueue-scheduler:latest \ --set postgres.image=${REGISTRY}/fluxqueue-postgres:latest \ --set controllerManager.manager.imagePullPolicy=Never \ - --set controllerManager.fluxion.image.tag=grow-api \ + --set controllerManager.fluxion.image.tag=satisfy \ --namespace ${NAMESPACE} \ --create-namespace \ --set scheduler.pullPolicy=Never \ diff --git a/hack/quick-deploy-kind.sh b/hack/quick-deploy-kind.sh index c7b57fc..c6ca9df 100755 --- a/hack/quick-deploy-kind.sh +++ b/hack/quick-deploy-kind.sh @@ -25,7 +25,7 @@ helm install \ --set scheduler.image=${REGISTRY}/fluxqueue-scheduler:latest \ --set postgres.image=${REGISTRY}/fluxqueue-postgres:latest \ --set controllerManager.manager.imagePullPolicy=Never \ - --set controllerManager.fluxion.image.tag=grow-api \ + --set controllerManager.fluxion.image.tag=satisfy \ --namespace ${NAMESPACE} \ --create-namespace \ --set scheduler.pullPolicy=Never \ diff --git a/pkg/fluxqueue/fluxqueue.go b/pkg/fluxqueue/fluxqueue.go index 819154a..47e4899 100644 --- a/pkg/fluxqueue/fluxqueue.go +++ b/pkg/fluxqueue/fluxqueue.go @@ -192,7 +192,7 @@ func (q *Queue) Enqueue(spec *api.FluxJob) (types.EnqueueStatus, error) { spec.Spec.Type, reservation, spec.Spec.Duration, - spec.Spec.Nodes, + spec.Spec.Slots, ) // If unknown, we won't give status submit, and it should requeue to try again diff --git a/pkg/fluxqueue/strategy/workers/job.go b/pkg/fluxqueue/strategy/workers/job.go index 3459a28..cc2fe40 100644 --- a/pkg/fluxqueue/strategy/workers/job.go +++ b/pkg/fluxqueue/strategy/workers/job.go @@ -10,7 +10,9 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "gopkg.in/yaml.v2" + jobspec "github.com/compspec/jobspec-go/pkg/jobspec/v1" "github.com/converged-computing/fluxion/pkg/client" pb "github.com/converged-computing/fluxion/pkg/fluxion-grpc" api "github.com/converged-computing/fluxqueue/api/v1alpha1" @@ -66,14 +68,24 @@ type JobArgs struct { // Work performs the AskFlux action. Any error returned that is due to not having resources means // the job will remain in the worker queue to AskFluxion again. func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { - wlog.Info("Asking Fluxion to schedule job", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Nodes", job.Args.Size) + wlog.Info("Asking Fluxion to schedule job", + "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Nodes", job.Args.Size) + + fmt.Println(job.Args.Jobspec) // Let's ask Flux if we can allocate nodes for the job! fluxionCtx, cancel := context.WithTimeout(context.Background(), 200*time.Second) defer cancel() + // Get rid of the attributes section (which should be empty) + parts := strings.Split(job.Args.Jobspec, "resources:") + satisfyJobspec := "attributes: {}\nresources:\n" + parts[len(parts)-1] + fmt.Println(satisfyJobspec) + // Prepare the request to allocate - convert string to bytes - request := &pb.MatchRequest{Jobspec: job.Args.Jobspec, Reservation: job.Args.Reservation == 1} + // This Jobspec includes all slots (pods) so we get an allocation that considers that + // We assume reservation allows for the satisfy to be in the future + request := &pb.SatisfyRequest{Jobspec: satisfyJobspec, Reservation: job.Args.Reservation == 1} // This is the host where fluxion is running, will be localhost 4242 for sidecar fluxion, err := client.NewClient("127.0.0.1:4242") @@ -83,14 +95,14 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } defer fluxion.Close() - // An error here is an error with making the request, nothing about - // the match/allocation itself. - response, err := fluxion.Match(fluxionCtx, request) + // An error here is an error with making the request, nothing about the allocation + response, err := fluxion.Satisfy(fluxionCtx, request) if err != nil { - wlog.Info("[WORK] Fluxion did not receive any match response", "Error", err) + wlog.Info("[WORK] Fluxion did not receive any satisfy response", "Error", err) return err } + // For each node assignment, we make an exact job with that request // If we asked for a reservation, and it wasn't reserved AND not allocated, this means it's not possible // We currently don't have grow/shrink added so this means it will never be possible. // We will unsuspend the job but add a label that indicates it is not schedulable. @@ -100,35 +112,79 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { return river.JobCancel(fmt.Errorf("fluxion could not allocate nodes for %s/%s, likely Unsatisfiable", job.Args.Namespace, job.Args.Name)) } - // Flux job identifier (known to fluxion) - fluxID := response.GetJobid() - - // If it's reserved, add the id to our reservation table - // TODO need to clean up this table... but these tasks run async... - if response.Reserved { - w.reserveJob(fluxionCtx, job.Args, fluxID) - } - - // This means we didn't get an allocation - we might have a reservation + // This means we didn't get an allocation - proceeding is not possible if response.GetAllocation() == "" { // This will have the job be retried in the queue, still based on sorted schedule time and priority return fmt.Errorf("fluxion could not allocate nodes for job %s/%s", job.Args.Namespace, job.Args.Name) } + // Parse the jobspec back into form to add the constraint for each node + var js jobspec.Jobspec + err = yaml.Unmarshal([]byte(job.Args.Jobspec), &js) + if err != nil { + wlog.Error(err, "Reparsing Jobspec to add constraint") + return err + } + + // Add the constraint - we will be added one node assignment per pod. + // This means that instead of slot N, each request is for one slot + js.Attributes = jobspec.Attributes{ + System: jobspec.System{ + Constraints: jobspec.Constraints{}, + }, + } + js.Resources[0].Count = 1 + // Now get the nodes. These are actually cores assigned to nodes, so we need to keep count nodes, err := parseNodes(response.Allocation) if err != nil { wlog.Info("Error parsing nodes from fluxion response", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err) return err } - wlog.Info("Fluxion allocation response", "Nodes", nodes) + // TODO is this nodes or slots? + wlog.Info("Fluxion allocation response", "Slots", nodes) + + for slotNumber, node := range nodes { + + // Update the jobspec for one node + js.Attributes.System.Constraints.Node = []string{node} + slotJobspec, err := yaml.Marshal(js) + + // TODO we probably want to cancel here + if err != nil { + wlog.Info("Error creating single slot jobspec", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err) + return err + } + + // The strategy will allow some reservation depth of jobs, so we allow for all slots of the job + request := &pb.MatchRequest{Jobspec: string(slotJobspec), Reservation: job.Args.Reservation == 1} + + // An error here is an error with making the request, nothing about the allocation + match, err := fluxion.Match(fluxionCtx, request) + if err != nil { + wlog.Info("[WORK] Fluxion did not receive any match response", "Error", err) + return err + } + + // Try printing first. + // Flux job identifier (known to fluxion) + fluxID := match.GetJobid() + + // If it's reserved, add the id to our reservation table + // TODO need to clean up this table... but these tasks run async... + if response.Reserved { + w.reserveJob(fluxionCtx, job.Args, fluxID) + } + + // Unsuspend the job or ungate the pods, adding the node assignments as labels for the scheduler + err = w.releaseJob(ctx, job.Args, fluxID, nodes) + if err != nil { + return err + } + wlog.Info("Slot allocated for job", "Slot", slotNumber, "JobId", fluxID, "Namespace", job.Args.Namespace, "Name", job.Args.Name) - // Unsuspend the job or ungate the pods, adding the node assignments as labels for the scheduler - err = w.releaseJob(ctx, job.Args, fluxID, nodes) - if err != nil { - return err } - wlog.Info("Fluxion finished allocating nodes for job", "JobId", fluxID, "Nodes", nodes, "Namespace", job.Args.Namespace, "Name", job.Args.Name) + wlog.Info("Fluxion finished allocating nodes for job", "Nodes", nodes, "Namespace", job.Args.Namespace, "Name", job.Args.Name) return nil } diff --git a/pkg/jobspec/jobspec.go b/pkg/jobspec/jobspec.go index 97fd03c..39500f4 100644 --- a/pkg/jobspec/jobspec.go +++ b/pkg/jobspec/jobspec.go @@ -1,45 +1,67 @@ package jobspec import ( - corev1 "k8s.io/api/core/v1" + v1 "github.com/compspec/jobspec-go/pkg/jobspec/v1" ) -// https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go#L4211-L4213 -type Resources struct { - Cpu int32 - Memory int64 - Gpu int64 - Storage int64 - Labels []string -} - -// GeneratePodResources returns resources for a pod, which can -// be used to populate the flux JobSpec -func GeneratePodResources(containers []corev1.Container) *Resources { +// NewJobSpec generates a jobspec for some number of slots in a cluster +// We associate each "slot" with a pod, so the request asks for a specific number of cpu. +// We also are assuming now that each pod is equivalent, so slots are equivalent. +// If we want to change this, we will need an ability to define slots of different types. +func NewJobspec(name string, command []string, resources *Resources) (*v1.Jobspec, error) { - // We will sum cpu and memory across containers - // For GPU, we could make a more complex jobspec, but for now - // assume one container is representative for GPU needed. - resources := Resources{} + // This is creating the resources for the slot Cores are always set to minimally 1 + slotSpec := newSlotSpec(resources) - for _, container := range containers { + // Create the top level resources spec (with a slot) + rSpec := []v1.Resource{ + { + Type: "slot", + Count: resources.Count, + Label: "default", + With: slotSpec, + }, + } - // Add on Cpu, Memory, GPU from container requests - // This is a limited set of resources owned by the pod - resources.Cpu += int32(container.Resources.Requests.Cpu().Value()) - resources.Memory += container.Resources.Requests.Memory().Value() - resources.Storage += container.Resources.Requests.StorageEphemeral().Value() + // Create the task spec + tasks := []v1.Tasks{ + { + Command: command, + Slot: "default", + Count: v1.Count{PerSlot: 1}, + }, + } - // We assume that a pod (node) only has access to the same GPU - gpus, ok := container.Resources.Limits["nvidia.com/gpu"] - if ok { - resources.Gpu += gpus.Value() - } + // Start preparing the spec + spec := v1.Jobspec{ + Version: 1, + Resources: rSpec, + Tasks: tasks, } - // If we have zero cpus, assume 1 - if resources.Cpu == 0 { - resources.Cpu = 1 + // Attributes are for the system, we aren't going to add them yet + // attributes: + // system: + // duration: 3600. + // cwd: "/home/flux" + // environment: + // HOME: "/home/flux" + // This is verison 1 as defined by v1 above + return &spec, nil +} + +// newSlotSpec creates a spec for one slot, which is one pod (a set of containers) +func newSlotSpec(resources *Resources) []v1.Resource { + slotSpec := []v1.Resource{ + {Type: "core", Count: resources.Slot.Cpu}, + } + // If we have memory or gpu specified, they are appended + if resources.Slot.Gpu > 0 { + slotSpec = append(slotSpec, v1.Resource{Type: "gpu", Count: int32(resources.Slot.Gpu)}) + } + if resources.Slot.Memory > 0 { + toMB := resources.Slot.Memory >> 20 + slotSpec = append(slotSpec, v1.Resource{Type: "memory", Count: int32(toMB)}) } - return &resources + return slotSpec } diff --git a/pkg/jobspec/resources.go b/pkg/jobspec/resources.go new file mode 100644 index 0000000..fe1d671 --- /dev/null +++ b/pkg/jobspec/resources.go @@ -0,0 +1,54 @@ +package jobspec + +import ( + corev1 "k8s.io/api/core/v1" +) + +// https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go#L4211-L4213 +// We want each slot to coincide with one pod. We will ask flux for all N slots to schedule, +// and then based on the response we get back, assign specific nodes. This currently +// assumes slots are each the same, but this is subject to change. +// QUESTION: what to add here for labels? +type Resources struct { + Labels []string + Slot Slot + Count int32 +} + +type Slot struct { + Cpu int32 + Memory int64 + Gpu int64 + Storage int64 +} + +// GeneratePodResources returns resources for a pod, which can +// be used to populate the flux JobSpec. +func GeneratePodResources(containers []corev1.Container, slots int32) *Resources { + + // We will sum cpu and memory across containers + // For GPU, we could make a more complex jobspec, but for now + // assume one container is representative for GPU needed. + resources := Resources{Slot: Slot{}, Count: slots} + + for _, container := range containers { + + // Add on Cpu, Memory, GPU from container requests + // This is a limited set of resources owned by the pod + resources.Slot.Cpu += int32(container.Resources.Requests.Cpu().Value()) + resources.Slot.Memory += container.Resources.Requests.Memory().Value() + resources.Slot.Storage += container.Resources.Requests.StorageEphemeral().Value() + + // We assume that a pod (node) only has access to the same GPU + gpus, ok := container.Resources.Limits["nvidia.com/gpu"] + if ok { + resources.Slot.Gpu += gpus.Value() + } + } + + // If we have zero cpus, assume 1 + if resources.Slot.Cpu == 0 { + resources.Slot.Cpu = 1 + } + return &resources +} From 3a8341c9bd414c45381162d22002f8ef83430d0e Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 29 Jan 2025 20:28:26 -0700 Subject: [PATCH 2/4] save state of trying with one node Signed-off-by: vsoch --- api/v1alpha1/submit.go | 2 -- go.mod | 4 ++-- go.sum | 8 +++---- pkg/fluxqueue/strategy/workers/job.go | 33 ++++++++++++--------------- pkg/jobspec/jobspec.go | 9 -------- 5 files changed, 21 insertions(+), 35 deletions(-) diff --git a/api/v1alpha1/submit.go b/api/v1alpha1/submit.go index b742054..53ee249 100644 --- a/api/v1alpha1/submit.go +++ b/api/v1alpha1/submit.go @@ -9,7 +9,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -58,7 +57,6 @@ func SubmitFluxJob( // Generate a jobspec for that many nodes (starting simple) // TODO will need to add GPU and memory here... if Flux supports memory js, err := jspec.NewJobspec(name, command, resources) - if err != nil { slog.Error(err, "Issue with creating job", "Namespace", namespace, "Name", jobName) return err diff --git a/go.mod b/go.mod index 0e89a0a..5f21e20 100644 --- a/go.mod +++ b/go.mod @@ -19,8 +19,8 @@ require ( ) require ( - github.com/compspec/jobspec-go v0.0.0-20250130015255-577b7ffe2599 // indirect - github.com/converged-computing/fluxion v0.0.0-20250130005615-dcc147715430 // indirect + github.com/compspec/jobspec-go v0.0.0-20250130030627-58df7d7ed642 // indirect + github.com/converged-computing/fluxion v0.0.0-20250130025038-615e35a80230 // indirect ) require ( diff --git a/go.sum b/go.sum index 7b0736e..91dd108 100644 --- a/go.sum +++ b/go.sum @@ -10,10 +10,10 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/compspec/jobspec-go v0.0.0-20250130015255-577b7ffe2599 h1:GbFDx46/I/hkB+GJ+7/xqIX8fg9JJcNkhAQwmy5Eofw= -github.com/compspec/jobspec-go v0.0.0-20250130015255-577b7ffe2599/go.mod h1:BaJyxaOhESe2DD4lqBdwTEWOw0TaTZVJGPrFh6KyXQM= -github.com/converged-computing/fluxion v0.0.0-20250130005615-dcc147715430 h1:IvAuMqxEVmjbq112eRIYuqE4zf2IGFFo2ADF4ueSy1I= -github.com/converged-computing/fluxion v0.0.0-20250130005615-dcc147715430/go.mod h1:tNlvJY1yFWpp/QqdqJnq8YMGYG99K6YMxDmpu9IVS1E= +github.com/compspec/jobspec-go v0.0.0-20250130030627-58df7d7ed642 h1:kLwazFe8Cl7ZUuF7LidS91IwBjPpcWVVZKEN2VOq6g8= +github.com/compspec/jobspec-go v0.0.0-20250130030627-58df7d7ed642/go.mod h1:BaJyxaOhESe2DD4lqBdwTEWOw0TaTZVJGPrFh6KyXQM= +github.com/converged-computing/fluxion v0.0.0-20250130025038-615e35a80230 h1:Nzr3Jywwinpdo2Tt13RQPiKiMJhwAbxkbmancpPMCTM= +github.com/converged-computing/fluxion v0.0.0-20250130025038-615e35a80230/go.mod h1:tNlvJY1yFWpp/QqdqJnq8YMGYG99K6YMxDmpu9IVS1E= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/pkg/fluxqueue/strategy/workers/job.go b/pkg/fluxqueue/strategy/workers/job.go index cc2fe40..27a15fc 100644 --- a/pkg/fluxqueue/strategy/workers/job.go +++ b/pkg/fluxqueue/strategy/workers/job.go @@ -10,7 +10,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "gopkg.in/yaml.v2" + "sigs.k8s.io/yaml" jobspec "github.com/compspec/jobspec-go/pkg/jobspec/v1" "github.com/converged-computing/fluxion/pkg/client" @@ -77,15 +77,10 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { fluxionCtx, cancel := context.WithTimeout(context.Background(), 200*time.Second) defer cancel() - // Get rid of the attributes section (which should be empty) - parts := strings.Split(job.Args.Jobspec, "resources:") - satisfyJobspec := "attributes: {}\nresources:\n" + parts[len(parts)-1] - fmt.Println(satisfyJobspec) - // Prepare the request to allocate - convert string to bytes // This Jobspec includes all slots (pods) so we get an allocation that considers that // We assume reservation allows for the satisfy to be in the future - request := &pb.SatisfyRequest{Jobspec: satisfyJobspec, Reservation: job.Args.Reservation == 1} + request := &pb.MatchRequest{Jobspec: job.Args.Jobspec, Reservation: job.Args.Reservation == 1} // This is the host where fluxion is running, will be localhost 4242 for sidecar fluxion, err := client.NewClient("127.0.0.1:4242") @@ -96,7 +91,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { defer fluxion.Close() // An error here is an error with making the request, nothing about the allocation - response, err := fluxion.Satisfy(fluxionCtx, request) + response, err := fluxion.Match(fluxionCtx, request) if err != nil { wlog.Info("[WORK] Fluxion did not receive any satisfy response", "Error", err) return err @@ -118,6 +113,15 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { return fmt.Errorf("fluxion could not allocate nodes for job %s/%s", job.Args.Namespace, job.Args.Name) } + // If we get here, we have an allocation. Immediately cancel it. + fluxID := response.GetJobid() + cancelRequest := &pb.CancelRequest{JobID: fluxID} + _, err = fluxion.Cancel(fluxionCtx, cancelRequest) + if err != nil { + wlog.Error(err, "Canceling initial match alloc without constraint") + return err + } + // Parse the jobspec back into form to add the constraint for each node var js jobspec.Jobspec err = yaml.Unmarshal([]byte(job.Args.Jobspec), &js) @@ -125,15 +129,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { wlog.Error(err, "Reparsing Jobspec to add constraint") return err } - - // Add the constraint - we will be added one node assignment per pod. - // This means that instead of slot N, each request is for one slot - js.Attributes = jobspec.Attributes{ - System: jobspec.System{ - Constraints: jobspec.Constraints{}, - }, - } - js.Resources[0].Count = 1 + js.Resources[0].Count = int32(1) // Now get the nodes. These are actually cores assigned to nodes, so we need to keep count nodes, err := parseNodes(response.Allocation) @@ -147,8 +143,9 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { for slotNumber, node := range nodes { // Update the jobspec for one node - js.Attributes.System.Constraints.Node = []string{node} + js.Attributes.System.Constraints.Hostlist = []string{node} slotJobspec, err := yaml.Marshal(js) + fmt.Println(string(slotJobspec)) // TODO we probably want to cancel here if err != nil { diff --git a/pkg/jobspec/jobspec.go b/pkg/jobspec/jobspec.go index 39500f4..da3a025 100644 --- a/pkg/jobspec/jobspec.go +++ b/pkg/jobspec/jobspec.go @@ -38,15 +38,6 @@ func NewJobspec(name string, command []string, resources *Resources) (*v1.Jobspe Resources: rSpec, Tasks: tasks, } - - // Attributes are for the system, we aren't going to add them yet - // attributes: - // system: - // duration: 3600. - // cwd: "/home/flux" - // environment: - // HOME: "/home/flux" - // This is verison 1 as defined by v1 above return &spec, nil } From 43b93faa2a9095dd5dec7be5f05376597ccf2724 Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 29 Jan 2025 23:22:16 -0700 Subject: [PATCH 3/4] feat: start of partial cancel This is the start of work for partial cancel. Specifically, we needed to refactor the match and jobspec to ask for resources on the level of a core for a slot (and not node as previously done). This means we need to get back the graph and parse the cores into units based on the size of the slot (pod). Signed-off-by: vsoch --- api/v1alpha1/fluxjob_types.go | 4 + api/v1alpha1/submit.go | 1 + build/postgres/create-tables.sql | 3 +- chart/templates/fluxjob-crd.yaml | 4 + ...jobs.converged-computing.org_fluxjobs.yaml | 4 + pkg/fluxqueue/fluxqueue.go | 1 + pkg/fluxqueue/queries/queries.go | 4 +- pkg/fluxqueue/strategy/easy.go | 1 + pkg/fluxqueue/strategy/workers/job.go | 141 +++++++++--------- pkg/fluxqueue/types/types.go | 1 + 10 files changed, 88 insertions(+), 76 deletions(-) diff --git a/api/v1alpha1/fluxjob_types.go b/api/v1alpha1/fluxjob_types.go index 950aa8c..36b8af3 100644 --- a/api/v1alpha1/fluxjob_types.go +++ b/api/v1alpha1/fluxjob_types.go @@ -108,6 +108,10 @@ type FluxJobSpec struct { // +optional Slots int32 `json:"nodes"` + // Cores per pod (slot) + // +optional + Cores int32 `json:"cores"` + // Resources assigned // +optional Resources Resources `json:"resources"` diff --git a/api/v1alpha1/submit.go b/api/v1alpha1/submit.go index 53ee249..3125b95 100644 --- a/api/v1alpha1/submit.go +++ b/api/v1alpha1/submit.go @@ -81,6 +81,7 @@ func SubmitFluxJob( Slots: slots, Type: jobType, Name: name, + Cores: resources.Slot.Cpu, }, Status: FluxJobStatus{ SubmitStatus: SubmitStatusNew, diff --git a/build/postgres/create-tables.sql b/build/postgres/create-tables.sql index 180cb4d..d4bd4e0 100644 --- a/build/postgres/create-tables.sql +++ b/build/postgres/create-tables.sql @@ -8,7 +8,8 @@ CREATE TABLE pending_queue ( reservation INTEGER NOT NULL, duration INTEGER NOT NULL, created_at timestamptz NOT NULL default NOW(), - size INTEGER NOT NULL + size INTEGER NOT NULL, + cores INTEGER NOT NULL ); CREATE UNIQUE INDEX pending_index ON pending_queue (name, namespace); diff --git a/chart/templates/fluxjob-crd.yaml b/chart/templates/fluxjob-crd.yaml index 7a3f797..45a56c5 100644 --- a/chart/templates/fluxjob-crd.yaml +++ b/chart/templates/fluxjob-crd.yaml @@ -65,6 +65,10 @@ spec: A FluxJob is a mapping of a Kubernetes abstraction (e.g., job) into a Flux JobSpec, one that Fluxion can digest. properties: + cores: + description: Cores per pod (slot) + format: int32 + type: integer duration: description: Duration is the maximum runtime of the job format: int32 diff --git a/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml b/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml index 753389f..190d832 100644 --- a/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml +++ b/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml @@ -52,6 +52,10 @@ spec: A FluxJob is a mapping of a Kubernetes abstraction (e.g., job) into a Flux JobSpec, one that Fluxion can digest. properties: + cores: + description: Cores per pod (slot) + format: int32 + type: integer duration: description: Duration is the maximum runtime of the job format: int32 diff --git a/pkg/fluxqueue/fluxqueue.go b/pkg/fluxqueue/fluxqueue.go index 47e4899..f685a7d 100644 --- a/pkg/fluxqueue/fluxqueue.go +++ b/pkg/fluxqueue/fluxqueue.go @@ -193,6 +193,7 @@ func (q *Queue) Enqueue(spec *api.FluxJob) (types.EnqueueStatus, error) { reservation, spec.Spec.Duration, spec.Spec.Slots, + spec.Spec.Cores, ) // If unknown, we won't give status submit, and it should requeue to try again diff --git a/pkg/fluxqueue/queries/queries.go b/pkg/fluxqueue/queries/queries.go index 34bda74..6c4fa7b 100644 --- a/pkg/fluxqueue/queries/queries.go +++ b/pkg/fluxqueue/queries/queries.go @@ -8,7 +8,7 @@ const ( IsPendingQuery = "select * from pending_queue where name = $1 and namespace = $2;" // Insert into pending queue (assumes after above query, we've checked it does not exist) - InsertIntoPending = "insert into pending_queue (jobspec, flux_job_name, namespace, name, type, reservation, duration, size) values ($1, $2, $3, $4, $5, $6, $7, $8);" + InsertIntoPending = "insert into pending_queue (jobspec, flux_job_name, namespace, name, type, reservation, duration, size, cores) values ($1, $2, $3, $4, $5, $6, $7, $8, $9);" // TODO add back created_at // We remove from pending to allow another group submission of the same name on cleanup @@ -16,7 +16,7 @@ const ( // Easy Queries to get jobs // Select jobs based on creation timestamp - SelectPendingByCreation = "select jobspec, name, flux_job_name, namespace, type, reservation, duration, size from pending_queue order by created_at desc;" + SelectPendingByCreation = "select jobspec, name, flux_job_name, namespace, type, reservation, duration, size, cores from pending_queue order by created_at desc;" // Reservations AddReservationQuery = "insert into reservations (name, flux_id) values ($1, $2);" diff --git a/pkg/fluxqueue/strategy/easy.go b/pkg/fluxqueue/strategy/easy.go index 5dce6f7..dd1c4f6 100644 --- a/pkg/fluxqueue/strategy/easy.go +++ b/pkg/fluxqueue/strategy/easy.go @@ -199,6 +199,7 @@ func (s EasyBackfill) ReadyJobs(ctx context.Context, pool *pgxpool.Pool) ([]work Reservation: model.Reservation, Size: model.Size, Duration: model.Duration, + Cores: model.Cores, } jobs = append(jobs, jobArgs) } diff --git a/pkg/fluxqueue/strategy/workers/job.go b/pkg/fluxqueue/strategy/workers/job.go index 27a15fc..1920ad8 100644 --- a/pkg/fluxqueue/strategy/workers/job.go +++ b/pkg/fluxqueue/strategy/workers/job.go @@ -10,9 +10,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "sigs.k8s.io/yaml" - jobspec "github.com/compspec/jobspec-go/pkg/jobspec/v1" "github.com/converged-computing/fluxion/pkg/client" pb "github.com/converged-computing/fluxion/pkg/fluxion-grpc" api "github.com/converged-computing/fluxqueue/api/v1alpha1" @@ -56,6 +54,10 @@ type JobArgs struct { FluxJobName string `json:"flux_job_name"` Type string `json:"type"` + // This is the number of cores per pod + // We use this to calculate / create a final node list + Cores int32 `json:"cores"` + // If true, we are allowed to ask Fluxion for a reservation Reservation int32 `json:"reservation"` Duration int32 `json:"duration"` @@ -83,6 +85,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { request := &pb.MatchRequest{Jobspec: job.Args.Jobspec, Reservation: job.Args.Reservation == 1} // This is the host where fluxion is running, will be localhost 4242 for sidecar + // TODO try again to put this client on the class so we don't connect each time fluxion, err := client.NewClient("127.0.0.1:4242") if err != nil { wlog.Error(err, "Fluxion error connecting to server") @@ -107,81 +110,35 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { return river.JobCancel(fmt.Errorf("fluxion could not allocate nodes for %s/%s, likely Unsatisfiable", job.Args.Namespace, job.Args.Name)) } - // This means we didn't get an allocation - proceeding is not possible - if response.GetAllocation() == "" { - // This will have the job be retried in the queue, still based on sorted schedule time and priority - return fmt.Errorf("fluxion could not allocate nodes for job %s/%s", job.Args.Namespace, job.Args.Name) - } - - // If we get here, we have an allocation. Immediately cancel it. + // Flux job identifier (known to fluxion) fluxID := response.GetJobid() - cancelRequest := &pb.CancelRequest{JobID: fluxID} - _, err = fluxion.Cancel(fluxionCtx, cancelRequest) - if err != nil { - wlog.Error(err, "Canceling initial match alloc without constraint") - return err + + // If it's reserved, add the id to our reservation table + // TODO need to clean up this table... but these tasks run async... + if response.Reserved { + w.reserveJob(fluxionCtx, job.Args, fluxID) } - // Parse the jobspec back into form to add the constraint for each node - var js jobspec.Jobspec - err = yaml.Unmarshal([]byte(job.Args.Jobspec), &js) - if err != nil { - wlog.Error(err, "Reparsing Jobspec to add constraint") - return err + // This means we didn't get an allocation - we might have a reservation + if response.GetAllocation() == "" { + // This will have the job be retried in the queue, still based on sorted schedule time and priority + return fmt.Errorf("fluxion could not allocate nodes for job %s/%s", job.Args.Namespace, job.Args.Name) } - js.Resources[0].Count = int32(1) // Now get the nodes. These are actually cores assigned to nodes, so we need to keep count - nodes, err := parseNodes(response.Allocation) + nodes, err := parseNodes(response.Allocation, job.Args.Cores) if err != nil { wlog.Info("Error parsing nodes from fluxion response", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err) return err } - // TODO is this nodes or slots? - wlog.Info("Fluxion allocation response", "Slots", nodes) - - for slotNumber, node := range nodes { - - // Update the jobspec for one node - js.Attributes.System.Constraints.Hostlist = []string{node} - slotJobspec, err := yaml.Marshal(js) - fmt.Println(string(slotJobspec)) - - // TODO we probably want to cancel here - if err != nil { - wlog.Info("Error creating single slot jobspec", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err) - return err - } - - // The strategy will allow some reservation depth of jobs, so we allow for all slots of the job - request := &pb.MatchRequest{Jobspec: string(slotJobspec), Reservation: job.Args.Reservation == 1} - - // An error here is an error with making the request, nothing about the allocation - match, err := fluxion.Match(fluxionCtx, request) - if err != nil { - wlog.Info("[WORK] Fluxion did not receive any match response", "Error", err) - return err - } - - // Try printing first. - // Flux job identifier (known to fluxion) - fluxID := match.GetJobid() - - // If it's reserved, add the id to our reservation table - // TODO need to clean up this table... but these tasks run async... - if response.Reserved { - w.reserveJob(fluxionCtx, job.Args, fluxID) - } - - // Unsuspend the job or ungate the pods, adding the node assignments as labels for the scheduler - err = w.releaseJob(ctx, job.Args, fluxID, nodes) - if err != nil { - return err - } - wlog.Info("Slot allocated for job", "Slot", slotNumber, "JobId", fluxID, "Namespace", job.Args.Namespace, "Name", job.Args.Name) + wlog.Info("Fluxion allocation response", "Nodes", nodes) + // Unsuspend the job or ungate the pods, adding the node assignments as labels for the scheduler + err = w.releaseJob(ctx, job.Args, fluxID, nodes) + if err != nil { + return err } - wlog.Info("Fluxion finished allocating nodes for job", "Nodes", nodes, "Namespace", job.Args.Namespace, "Name", job.Args.Name) + wlog.Info("Fluxion finished allocating nodes for job", "JobId", fluxID, "Nodes", nodes, "Namespace", job.Args.Namespace, "Name", job.Args.Name) return nil } @@ -250,10 +207,13 @@ func (w JobWorker) reserveJob(ctx context.Context, args JobArgs, fluxID int64) e // parseNodes parses the allocation nodes into a lookup with core counts // We will add these as labels onto each pod for the scheduler, or as one -func parseNodes(allocation string) ([]string, error) { +// This means that we get back some allocation graph with the slot defined at cores, +// so the group size will likely not coincide with the number of nodes. For +// this reason, we have to divide to place them. The final number should +// match the group size. +func parseNodes(allocation string, cores int32) ([]string, error) { // We can eventually send over more metadata, for now just a list of nodes - nodesWithCores := map[string]int{} nodes := []string{} // The response is the graph with assignments. Here we parse the graph into a struct to get nodes. @@ -263,17 +223,52 @@ func parseNodes(allocation string) ([]string, error) { return nodes, err } - // To start, just parse nodes and not cores (since we can't bind on level of core) + // Parse nodes first and get containment and name lookup + lookup := map[string]string{} for _, node := range graph.Graph.Nodes { if node.Metadata.Type == "node" { + nodePath := node.Metadata.Paths["containment"] nodeId := node.Metadata.Basename - _, ok := nodesWithCores[nodeId] + lookup[nodePath] = nodeId + } + } + + // We are going to first make a count of cores per node. We do this + // by parsing the containment path. It should always look like: + // "/cluster0/0/kind-worker1/core0 for a core + coreCounts := map[string]int32{} + for _, node := range graph.Graph.Nodes { + if node.Metadata.Type == "core" { + corePath := node.Metadata.Paths["containment"] + coreName := fmt.Sprintf("core%d", node.Metadata.Id) + nodePath := strings.TrimRight(corePath, "/"+coreName) + nodeId, ok := lookup[nodePath] + + // This shouldn't happen, but if it does, we should catch it if !ok { - nodesWithCores[nodeId] = 0 - nodes = append(nodes, nodeId) + return nodes, fmt.Errorf("unknown node path %s", nodePath) } - // Keep a record of cores assigned per node - nodesWithCores[nodeId] += 1 + + // Update core counts for the node + _, ok = coreCounts[nodeId] + if !ok { + coreCounts[nodeId] = int32(0) + } + + // Each core is one + coreCounts[nodeId] += 1 + } + } + fmt.Printf("Distributing %d cores per pod into core counts ", cores) + fmt.Println(coreCounts) + + // Now we need to divide by the slot size (number of cores per pod) + // and add those nodes to a list (there will be repeats) + for nodeId, totalCores := range coreCounts { + fmt.Printf("Node %s has %d cores to fit %d core(s)\n", nodeId, totalCores, cores) + numberSlots := totalCores / cores + for _ = range int32(numberSlots) { + nodes = append(nodes, nodeId) } } return nodes, nil diff --git a/pkg/fluxqueue/types/types.go b/pkg/fluxqueue/types/types.go index c35a914..ce699d6 100644 --- a/pkg/fluxqueue/types/types.go +++ b/pkg/fluxqueue/types/types.go @@ -29,6 +29,7 @@ type JobModel struct { Reservation int32 `db:"reservation"` Duration int32 `db:"duration"` Size int32 `db:"size"` + Cores int32 `db:"cores"` } type ReservationModel struct { From 61cfda997858bd0134172fbc5a846fd82db105d1 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 30 Jan 2025 19:20:32 -0700 Subject: [PATCH 4/4] wip for partial cancel Signed-off-by: vsoch --- pkg/fluxqueue/strategy/workers/job.go | 147 ++++++++++++++++++++++---- pkg/jgf/jgf.go | 9 +- pkg/jgf/types.go | 4 +- pkg/types/types.go | 29 ----- 4 files changed, 137 insertions(+), 52 deletions(-) delete mode 100644 pkg/types/types.go diff --git a/pkg/fluxqueue/strategy/workers/job.go b/pkg/fluxqueue/strategy/workers/job.go index 1920ad8..2e80399 100644 --- a/pkg/fluxqueue/strategy/workers/job.go +++ b/pkg/fluxqueue/strategy/workers/job.go @@ -2,7 +2,6 @@ package workers import ( "context" - "encoding/json" "fmt" "os" "strings" @@ -16,7 +15,7 @@ import ( api "github.com/converged-computing/fluxqueue/api/v1alpha1" "github.com/converged-computing/fluxqueue/pkg/defaults" "github.com/converged-computing/fluxqueue/pkg/fluxqueue/queries" - "github.com/converged-computing/fluxqueue/pkg/types" + jgf "github.com/converged-computing/fluxqueue/pkg/jgf" "github.com/riverqueue/river" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" patchTypes "k8s.io/apimachinery/pkg/types" @@ -126,7 +125,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } // Now get the nodes. These are actually cores assigned to nodes, so we need to keep count - nodes, err := parseNodes(response.Allocation, job.Args.Cores) + nodes, cancelResponses, err := parseNodes(response.Allocation, job.Args.Cores) if err != nil { wlog.Info("Error parsing nodes from fluxion response", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err) return err @@ -134,7 +133,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { wlog.Info("Fluxion allocation response", "Nodes", nodes) // Unsuspend the job or ungate the pods, adding the node assignments as labels for the scheduler - err = w.releaseJob(ctx, job.Args, fluxID, nodes) + err = w.releaseJob(ctx, job.Args, fluxID, nodes, cancelResponses) if err != nil { return err } @@ -143,7 +142,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } // Release job will unsuspend a job or ungate pods to allow for scheduling -func (w JobWorker) releaseJob(ctx context.Context, args JobArgs, fluxID int64, nodes []string) error { +func (w JobWorker) releaseJob(ctx context.Context, args JobArgs, fluxID int64, nodes []string, cancelResponses []string) error { var err error if args.Type == api.JobWrappedJob.String() { @@ -211,42 +210,85 @@ func (w JobWorker) reserveJob(ctx context.Context, args JobArgs, fluxID int64) e // so the group size will likely not coincide with the number of nodes. For // this reason, we have to divide to place them. The final number should // match the group size. -func parseNodes(allocation string, cores int32) ([]string, error) { +func parseNodes(allocation string, cores int32) ([]string, []string, error) { // We can eventually send over more metadata, for now just a list of nodes nodes := []string{} - // The response is the graph with assignments. Here we parse the graph into a struct to get nodes. - var graph types.AllocationResponse - err := json.Unmarshal([]byte(allocation), &graph) + // We also need to save a corresponding cancel request + cancelRequests := []string{} + + // Also try serailizing back into graph + g, err := jgf.LoadFluxJGF(allocation) if err != nil { - return nodes, err + return nodes, cancelRequests, err + } + fmt.Println(g) + + // For each pod, we will need to be able to do partial cancel. + // We can do this by saving the initial graph (without cores) + // and adding them on to the cancel request. We first need a lookup + // for the path between cluster->subnet->nodes->cores. + // This logic will need to be updated if we change the graph. + nodeLookup := map[string]jgf.Node{} + + // Store nodes based on paths + nodePaths := map[string]jgf.Node{} + edgeLookup := map[string][]jgf.Edge{} + + // Parse nodes first so we can match the containment path to the host + for _, node := range g.Graph.Nodes { + nodeLookup[node.Id] = node + nodePaths[node.Metadata.Paths["containment"]] = node } - // Parse nodes first and get containment and name lookup + // The edge lookup will allow us to add connected nodes + // We need to be able to map a node path to a list of edges + // The node path gets us the node id (source) + var addEdge = func(node *jgf.Node, edge *jgf.Edge) { + path := node.Metadata.Paths["containment"] + _, ok := edgeLookup[path] + if !ok { + edgeLookup[path] = []jgf.Edge{} + } + edgeLookup[path] = append(edgeLookup[path], *edge) + } + for _, edge := range g.Graph.Edges { + targetNode := nodeLookup[edge.Target] + sourceNode := nodeLookup[edge.Source] + addEdge(&targetNode, &edge) + addEdge(&sourceNode, &edge) + } + + // Parse nodes first so we can match the containment path to the host lookup := map[string]string{} - for _, node := range graph.Graph.Nodes { + for _, node := range g.Graph.Nodes { + nodePath := node.Metadata.Paths["containment"] + nodeLookup[fmt.Sprintf("%d", node.Metadata.Id)] = node if node.Metadata.Type == "node" { - nodePath := node.Metadata.Paths["containment"] nodeId := node.Metadata.Basename lookup[nodePath] = nodeId } } + // We also need to know the exact cores that are assigned to each node + coresByNode := map[string][]jgf.Node{} + // We are going to first make a count of cores per node. We do this // by parsing the containment path. It should always look like: // "/cluster0/0/kind-worker1/core0 for a core coreCounts := map[string]int32{} - for _, node := range graph.Graph.Nodes { + for _, node := range g.Graph.Nodes { + path := node.Metadata.Paths["containment"] + if node.Metadata.Type == "core" { - corePath := node.Metadata.Paths["containment"] coreName := fmt.Sprintf("core%d", node.Metadata.Id) - nodePath := strings.TrimRight(corePath, "/"+coreName) + nodePath := strings.TrimRight(path, "/"+coreName) nodeId, ok := lookup[nodePath] // This shouldn't happen, but if it does, we should catch it if !ok { - return nodes, fmt.Errorf("unknown node path %s", nodePath) + return nodes, cancelRequests, fmt.Errorf("unknown node path %s", nodePath) } // Update core counts for the node @@ -257,21 +299,86 @@ func parseNodes(allocation string, cores int32) ([]string, error) { // Each core is one coreCounts[nodeId] += 1 + + // This is a list of cores (node) assigned to the physical node + // We do this based on ids so we can use the edge lookup + assignedCores, ok := coresByNode[nodePath] + if !ok { + assignedCores = []jgf.Node{} + } + assignedCores = append(assignedCores, node) + coresByNode[nodeId] = assignedCores } } fmt.Printf("Distributing %d cores per pod into core counts ", cores) fmt.Println(coreCounts) // Now we need to divide by the slot size (number of cores per pod) - // and add those nodes to a list (there will be repeats) + // and add those nodes to a list (there will be repeats). For each slot + // (pod) we need to generate a JGF that includes resources for cancel. for nodeId, totalCores := range coreCounts { - fmt.Printf("Node %s has %d cores to fit %d core(s)\n", nodeId, totalCores, cores) + fmt.Printf("Node %s has %d cores across slots to fit %d core(s) per slot\n", nodeId, totalCores, cores) numberSlots := totalCores / cores for _ = range int32(numberSlots) { + + // Prepare a graph for a cancel response + graph := jgf.NewFluxJGF() + seenEdges := map[string]bool{} + coreNodes := coresByNode[nodeId] + + // addNewEdges to the graph Edges if we haven't yet + var addNewEdges = func(path string) { + addEdges, ok := edgeLookup[path] + if ok { + for _, addEdge := range addEdges { + edgeId := fmt.Sprintf("%s-%s", addEdge.Source, addEdge.Target) + _, alreadyAdded := seenEdges[edgeId] + if !alreadyAdded { + graph.Graph.Edges = append(graph.Graph.Edges, addEdge) + seenEdges[edgeId] = true + } + } + } + } + + // The cancel response needs only units from the graph associated + // with the specific cores assigned. + for _, coreNode := range coreNodes { + path := coreNode.Metadata.Paths["containment"] + _, ok := graph.NodeMap[path] + if !ok { + graph.NodeMap[path] = coreNode + graph.Graph.Nodes = append(graph.Graph.Nodes, coreNode) + addNewEdges(path) + } + // Parse the entire path and add nodes up root + parts := strings.Split(path, "/") + for idx := range len(parts) { + if idx == 0 { + continue + } + path := strings.Join(parts[0:idx], "/") + fmt.Println(path) + _, ok := graph.NodeMap[path] + if !ok { + graph.NodeMap[path] = nodePaths[path] + graph.Graph.Nodes = append(graph.Graph.Nodes, nodePaths[path]) + addNewEdges(path) + } + } + } nodes = append(nodes, nodeId) + + // Serialize the cancel request to string + graphStr, err := graph.ToJson() + if err != nil { + return nodes, cancelRequests, err + } + cancelRequests = append(cancelRequests, graphStr) + fmt.Println(graphStr) } } - return nodes, nil + return nodes, cancelRequests, nil } // Unsuspend the job, adding an annotation for nodes along with the fluxion scheduler diff --git a/pkg/jgf/jgf.go b/pkg/jgf/jgf.go index 284c9d0..12ca953 100644 --- a/pkg/jgf/jgf.go +++ b/pkg/jgf/jgf.go @@ -51,6 +51,13 @@ func NewFluxJGF() FluxJGF { } } +// Load a graph payload into a JGF structure +func LoadFluxJGF(payload string) (FluxJGF, error) { + var graph FluxJGF + err := json.Unmarshal([]byte(payload), &graph) + return graph, err +} + // ToJson returns a Json string of the graph func (g *FluxJGF) ToJson() (string, error) { toprint, err := json.MarshalIndent(g.Graph, "", "\t") @@ -81,7 +88,7 @@ func (g *FluxJGF) MakeBidirectionalEdge(parent, child string) { // MakeEdge creates an edge for the JGF func (g *FluxJGF) MakeEdge(source string, target string, contains string) { - newedge := edge{ + newedge := Edge{ Source: source, Target: target, Metadata: edgeMetadata{Subsystem: containmentKey}, diff --git a/pkg/jgf/types.go b/pkg/jgf/types.go index ca7fe3d..f03cfca 100644 --- a/pkg/jgf/types.go +++ b/pkg/jgf/types.go @@ -8,7 +8,7 @@ type Node struct { Metadata nodeMetadata `json:"metadata,omitempty"` } -type edge struct { +type Edge struct { Source string `json:"source"` Relation string `json:"relation,omitempty"` Target string `json:"target"` @@ -36,7 +36,7 @@ type nodeMetadata struct { type graph struct { Nodes []Node `json:"nodes"` - Edges []edge `json:"edges"` + Edges []Edge `json:"edges"` // Metadata metadata `json:"metadata,omitempty"` Directed bool `json:"directed,omitempty"` } diff --git a/pkg/types/types.go b/pkg/types/types.go deleted file mode 100644 index 96cb6cc..0000000 --- a/pkg/types/types.go +++ /dev/null @@ -1,29 +0,0 @@ -package types - -type AllocationResponse struct { - Graph Graph `json:"graph"` -} -type Paths struct { - Containment string `json:"containment"` -} -type Node struct { - ID string `json:"id"` - Metadata Metadata `json:"metadata,omitempty"` -} -type Edges struct { - Source string `json:"source"` - Target string `json:"target"` -} -type Metadata struct { - Type string `json:"type"` - Id int32 `json:"id"` - Rank int32 `json:"rank"` - Basename string `json:"basename"` - Exclusive bool `json:"exclusive"` - Paths map[string]string `json:"paths"` -} - -type Graph struct { - Nodes []Node `json:"nodes"` - Edges []Edges `json:"edges"` -}