diff --git a/test/e2e/performanceprofile/functests/7_performance_kubelet_node/cgroups.go b/test/e2e/performanceprofile/functests/7_performance_kubelet_node/cgroups.go index fe32b53621..7942c3d8fc 100644 --- a/test/e2e/performanceprofile/functests/7_performance_kubelet_node/cgroups.go +++ b/test/e2e/performanceprofile/functests/7_performance_kubelet_node/cgroups.go @@ -23,8 +23,10 @@ import ( performancev2 "github.com/openshift/cluster-node-tuning-operator/pkg/apis/performanceprofile/v2" testutils "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils" + "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/baseload" "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/cgroup" testclient "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/client" + "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/cluster" "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/discovery" "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/images" "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/label" @@ -37,7 +39,14 @@ import ( "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/systemd" ) -const minRequiredCPUs = 8 +const ( + minRequiredCPUs = 8 + cpuSetReserved = "reserved" + cpuSetIsolated = "isolated" + cpuSetShared = "shared" + cpuSetOfflined = "offlined" + cpuSetGUPod = "guPod" +) var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(label.OVSPinning)), func() { const ( @@ -45,16 +54,17 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab cgroupRoot string = "/rootfs/sys/fs/cgroup" ) var ( - onlineCPUSet cpuset.CPUSet - workerRTNode *corev1.Node - workerRTNodes []corev1.Node - profile, initialProfile *performancev2.PerformanceProfile - poolName string - ovsSliceCgroup string - ctx context.Context = context.Background() - ovsSystemdServices []string - isCgroupV2 bool - err error + reservedCPUSet cpuset.CPUSet + isolatedCPUSet cpuset.CPUSet + workerRTNode *corev1.Node + workerRTNodes []corev1.Node + profile, initialProfile *performancev2.PerformanceProfile + poolName string + ovsSliceCgroup string + ctx context.Context = context.Background() + ovsSystemdServices []string + isCgroupV2 bool + isWorkloadPartitioningEnabled bool ) BeforeAll(func() { @@ -68,6 +78,14 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error looking for the optional selector: %v", err)) workerRTNode = &workerRTNodes[0] + if _, ok := workerRTNode.Labels["node-role.kubernetes.io/control-plane"]; ok { + isSchedulable, err := cluster.IsControlPlaneSchedulable(ctx) + Expect(err).ToNot(HaveOccurred(), "Unable to check if control plane is schedulable") + if !isSchedulable { + Skip("workerRTNode is a control plane node but masters are not schedulable") + } + } + profile, err = profiles.GetByNodeLabels(testutils.NodeSelectorLabels) Expect(err).ToNot(HaveOccurred()) @@ -78,16 +96,15 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab ovsSystemdServices = ovsSystemdServicesOnOvsSlice(ctx, workerRTNode) - }) - - BeforeEach(func() { - By(fmt.Sprintf("Checking the profile %s with cpus %s", profile.Name, cpuSpecToString(profile.Spec.CPU))) + isWorkloadPartitioningEnabled, err = cluster.IsWorkloadPartitioningEnabled(ctx) + Expect(err).ToNot(HaveOccurred(), "Unable to check if workload partitioning is enabled") + testlog.Infof("Workload partitioning enabled: %v", isWorkloadPartitioningEnabled) - Expect(profile.Spec.CPU.Isolated).NotTo(BeNil()) - Expect(profile.Spec.CPU.Reserved).NotTo(BeNil()) - - onlineCPUSet, err = nodes.GetOnlineCPUsSet(context.TODO(), workerRTNode) - Expect(err).ToNot(HaveOccurred()) + profileCPUSets := parseProfileCPUSets(profile) + reservedCPUSet = profileCPUSets[cpuSetReserved] + isolatedCPUSet = profileCPUSets[cpuSetIsolated] + testlog.Infof("Reserved CPUSet: %s", reservedCPUSet) + testlog.Infof("Isolated CPUSet: %s", isolatedCPUSet) }) Describe("[rfe_id: 64006][Dynamic OVS Pinning]", Ordered, Label(string(label.Tier0)), func() { @@ -128,7 +145,15 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab cpus := testutils.ToString(out) containerCpuset, err := cpuset.Parse(cpus) Expect(err).ToNot(HaveOccurred()) - Expect(containerCpuset).To(Equal(onlineCPUSet), "Burstable pod containers cpuset.cpus do not match total online cpus") + if isWorkloadPartitioningEnabled { + Expect(containerCpuset.Equals(reservedCPUSet)).To(BeTrue(), + "Under workload partitioning, OVN pod cpuset.cpus should match reserved cpus, got %s expected %s", containerCpuset, reservedCPUSet) + } else { + onlineCPUSet, err := nodes.GetOnlineCPUsSet(context.TODO(), workerRTNode) + Expect(err).ToNot(HaveOccurred()) + Expect(containerCpuset.Equals(onlineCPUSet)).To(BeTrue(), + "Burstable pod containers cpuset.cpus do not match total online cpus, got %s expected %s", containerCpuset, onlineCPUSet) + } } }) @@ -246,9 +271,11 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab Expect(err).ToNot(HaveOccurred()) Expect(result).To(Not(BeEmpty())) - result, err = chkOvsCgrpCpuset(workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to read cpuset.cpus") - ovsCPUSet, err := cpuset.Parse(result) + ovsCPUSetRaw, err := chkOvsCgrpCpuset(workerRTNode) + Expect(err).ToNot(HaveOccurred()) + ovsCPUSet, err := cpuset.Parse(ovsCPUSetRaw) + Expect(err).ToNot(HaveOccurred()) + onlineCPUSet, err := nodes.GetOnlineCPUsSet(context.TODO(), workerRTNode) Expect(err).ToNot(HaveOccurred()) Expect(ovsCPUSet).To(Equal(onlineCPUSet)) @@ -265,205 +292,91 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab var ctx context.Context = context.TODO() Context("ovn-kubenode Pods affinity ", Label(string(label.Tier2)), func() { It("[test_id:64100] matches with ovs process affinity", func() { - ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") - ovnContainerids, err := ovnPodContainers(&ovnPod) - Expect(err).ToNot(HaveOccurred()) - // Generally there are many containers inside a kubenode pods - // we don't need to check cpus used by all the containers - // we take first container - containerPid, err := nodes.ContainerPid(ctx, workerRTNode, ovnContainerids[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ctnCpuset := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("Cpus used by ovn Containers are %s", ctnCpuset) - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ctnCpuset.Equals(cpumask)).To(BeTrue(), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ctnCpuset, pid, cpumask) - } - + By("Collecting OVN container and OVS process affinities") + ovnAffinity := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + + By("Verifying OVS affinity matches expected") + verifyOvsMatchesExpected(ovnAffinity, ovsAffinities, + isWorkloadPartitioningEnabled, map[string]cpuset.CPUSet{ + cpuSetReserved: reservedCPUSet, + cpuSetIsolated: isolatedCPUSet, + cpuSetGUPod: cpuset.New(), + }) }) It("[test_id:64101] Creating gu pods modifies affinity of ovs", func() { - var testpod *corev1.Pod - var err error - testpod = pods.GetTestPod() - testpod.Namespace = testutils.NamespaceTesting - testpod.Spec.Containers[0].Resources = corev1.ResourceRequirements{Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("200Mi"), - }, - } - testpod.Spec.NodeSelector = map[string]string{testutils.LabelHostname: workerRTNode.Name} - err = testclient.DataPlaneClient.Create(ctx, testpod) - Expect(err).ToNot(HaveOccurred()) - testpod, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - Expect(testpod.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) - - cmd := []string{"taskset", "-pc", "1"} - outputb, err := pods.ExecCommandOnPod(testclient.K8sClient, testpod, "", cmd) - Expect(err).ToNot(HaveOccurred()) - testpodCpus := bytes.Split(outputb, []byte(":")) - testlog.Infof("%v pod is using cpus %v", testpod.Name, string(testpodCpus[1])) + By("Creating a guaranteed pod on the worker node") + testpod := createGuPod(ctx, workerRTNode) + + By("Collecting OVN container and OVS process affinities") + ovnAffinity := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + + By("Verifying OVS affinity excludes guaranteed pod CPUs") + guPodCPUs := getGuPodCPUs(ctx, testpod) + verifyOvsMatchesExpected(ovnAffinity, ovsAffinities, + isWorkloadPartitioningEnabled, map[string]cpuset.CPUSet{ + cpuSetReserved: reservedCPUSet, + cpuSetIsolated: isolatedCPUSet, + cpuSetGUPod: guPodCPUs, + }) - By("Get ovnpods running on the worker cnf node") - ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") - - By("Get cpu used by ovn pod containers") - // We are fetching the container Process pid and - // using taskset we are fetching cpus used by the container process - // instead of using containers cpuset.cpus - ovnContainers, err := ovnPodContainers(&ovnPod) - Expect(err).ToNot(HaveOccurred()) - containerPid, err := nodes.ContainerPid(ctx, workerRTNode, ovnContainers[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ctnCpuset := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("Container of ovn pod %s is using cpus %s", ovnPod.Name, ctnCpuset) - - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ctnCpuset.Equals(cpumask)).To(BeTrue(), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ctnCpuset, pid, cpumask) - } Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, testpod)).To(Succeed()) - }) It("[test_id:64102] Create and remove gu pods to verify affinity of ovs are changed appropriately", func() { - var testpod1, testpod2 *corev1.Pod - var err error - ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred()) checkCpuCount(ctx, workerRTNode) - // Create testpod1 - testpod1 = pods.GetTestPod() - testpod1.Namespace = testutils.NamespaceTesting - testpod1.Spec.Containers[0].Resources = corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("200Mi"), - }, - } - testpod1.Spec.NodeSelector = map[string]string{testutils.LabelHostname: workerRTNode.Name} - err = testclient.DataPlaneClient.Create(ctx, testpod1) - Expect(err).ToNot(HaveOccurred()) - testpod1, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod1), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - Expect(testpod1.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) - - tasksetcmd := []string{"taskset", "-pc", "1"} - testpod1Cpus, err := pods.ExecCommandOnPod(testclient.K8sClient, testpod1, "", tasksetcmd) - Expect(err).ToNot(HaveOccurred()) - testlog.Infof("%v pod is using %v cpus", testpod1.Name, string(testpod1Cpus)) - - // Create testpod2 - testpod2 = pods.GetTestPod() - testpod2.Namespace = testutils.NamespaceTesting - testpod2.Spec.Containers[0].Resources = corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("200Mi"), - }, - } - testpod2.Spec.NodeSelector = map[string]string{testutils.LabelHostname: workerRTNode.Name} - err = testclient.DataPlaneClient.Create(ctx, testpod2) - Expect(err).ToNot(HaveOccurred()) - testpod2, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod2), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - Expect(testpod1.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) - - By("fetch cpus used by container process using taskset") - testpod2Cpus, err := pods.ExecCommandOnPod(testclient.K8sClient, testpod2, "", tasksetcmd) - Expect(err).ToNot(HaveOccurred()) - testlog.Infof("%v pod is using %v cpus", testpod2.Name, string(testpod2Cpus)) - - // Get cpus used by the ovnkubenode-pods containers - // Each kubenode pods have many containers, we check cpus of only 1 container - ovnContainers, err := ovnPodContainers(&ovnPod) - Expect(err).ToNot(HaveOccurred()) - containerPid, err := nodes.ContainerPid(context.TODO(), workerRTNode, ovnContainers[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ovnContainerCpuset1 := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("Container of ovn pod %s is using cpus %s", ovnPod.Name, ovnContainerCpuset1) - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - - // We wait for 30 seconds for ovs process cpu affinity to be updated - time.Sleep(30 * time.Second) - // Verify ovs-vswitchd and ovsdb-server process affinity is updated - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ovnContainerCpuset1.Equals(cpumask)).To(BeTrue(), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ovnContainerCpuset1, pid, cpumask) - } - // Delete testpod1 - testlog.Infof("Deleting pod %v", testpod1.Name) - Expect(pods.DeleteAndSync(context.TODO(), testclient.DataPlaneClient, testpod1)).To(Succeed()) - - time.Sleep(30 * time.Second) - // Check the cpus of ovnkubenode pods - ovnContainerCpuset2 := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("cpus used by ovn kube node pods after deleting pod %v is %v", testpod1.Name, ovnContainerCpuset2) - // we wait some time for ovs process affinity to change - time.Sleep(30 * time.Second) - - // Verify ovs-vswitchd and ovsdb-server process affinity is updated - pidToCPUs, err = getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ovnContainerCpuset2.Equals(cpumask)).To(BeTrue(), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ovnContainerCpuset2, pid, cpumask) - } - // Delete testpod2 - Expect(pods.DeleteAndSync(context.TODO(), testclient.DataPlaneClient, testpod2)).To(Succeed()) + By("Creating two guaranteed pods on the worker node") + testpod1 := createGuPod(ctx, workerRTNode) + testpod2 := createGuPod(ctx, workerRTNode) + + By("Collecting affinities with both GU pods running") + ovnAffinity := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + + By("Verifying OVS affinity excludes both GU pods' CPUs") + bothGUPodCPUs := getGuPodCPUs(ctx, testpod1).Union(getGuPodCPUs(ctx, testpod2)) + verifyOvsMatchesExpected(ovnAffinity, ovsAffinities, + isWorkloadPartitioningEnabled, map[string]cpuset.CPUSet{ + cpuSetReserved: reservedCPUSet, + cpuSetIsolated: isolatedCPUSet, + cpuSetGUPod: bothGUPodCPUs, + }) + + By("Deleting first GU pod and verifying OVS affinity adjusts") + Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, testpod1)).To(Succeed()) + ovnAffinityAfterDelete := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities = getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + verifyOvsMatchesExpected(ovnAffinityAfterDelete, ovsAffinities, + isWorkloadPartitioningEnabled, map[string]cpuset.CPUSet{ + cpuSetReserved: reservedCPUSet, + cpuSetIsolated: isolatedCPUSet, + cpuSetGUPod: getGuPodCPUs(ctx, testpod2), + }) + + By("Cleaning up second GU pod") + Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, testpod2)).To(Succeed()) }) It("[test_id:64103] ovs process affinity still excludes guaranteed pods after reboot", func() { - checkCpuCount(context.TODO(), workerRTNode) - var dp *appsv1.Deployment = newDeployment() - // create a deployment to deploy gu pods - testNode := make(map[string]string) - testNode["kubernetes.io/hostname"] = workerRTNode.Name - dp.Spec.Template.Spec.NodeSelector = testNode - err := testclient.DataPlaneClient.Create(ctx, dp) - Expect(err).ToNot(HaveOccurred(), "Unable to create Deployment") + checkCpuCount(ctx, workerRTNode) + + By("Creating a deployment with guaranteed pods on the worker node") + dp := newDeployment() + dp.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": workerRTNode.Name} + Expect(testclient.DataPlaneClient.Create(ctx, dp)).To(Succeed(), "Unable to create Deployment") defer func() { - // delete deployment - testlog.Infof("Deleting Deployment %v", dp.Name) - err := testclient.DataPlaneClient.Delete(ctx, dp) - Expect(err).ToNot(HaveOccurred()) - // once deployment is deleted - // wait till the ovs process affinity is reverted back - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) + testlog.Infof("Deleting Deployment %s from Namespace %s", dp.Name, dp.Namespace) + Expect(testclient.DataPlaneClient.Delete(ctx, dp)).To(Succeed()) + baselineCpus := reservedCPUSet.Union(isolatedCPUSet) Eventually(func() bool { - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to fetch affinity of ovs services") - for pid, cpumask := range pidToCPUs { - // since cpuset.CPUSet contains map in its struct field we can't compare - // the structs directly. After the deployment is deleted, the cpu mask - // of ovs services should contain all cpus , which is generally 0-N (where - // N is total number of cpus, this should be easy to compare. - if !cpumask.Equals(onlineCPUSet) { - testlog.Warningf("ovs servics pid %s cpu mask is %s instead of %s", pid, cpumask, onlineCPUSet) + affinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range affinities { + if !mask.Equals(baselineCpus) { + testlog.Warningf("OVS pid %s mask is %s instead of %s", pid, mask, baselineCpus) return false } } @@ -471,131 +384,134 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab }, 5*time.Minute, 10*time.Second).Should(BeTrue()) }() - ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") - ovnContainerids, err := ovnPodContainers(&ovnPod) - Expect(err).ToNot(HaveOccurred()) - containerPid, err := nodes.ContainerPid(ctx, workerRTNode, ovnContainerids[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ovnContainerCpuset := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("Container of ovn pod %s is using cpus %s", ovnPod.Name, ovnContainerCpuset) - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) + dpListOpts := &client.ListOptions{ + Namespace: dp.Namespace, + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": workerRTNode.Name}), + LabelSelector: labels.SelectorFromSet(labels.Set{"type": "telco"}), + } - //wait for 30 seconds for ovs process to have its cpu affinity updated - time.Sleep(30 * time.Second) - // Verify ovs-vswitchd and ovsdb-server process affinity is updated - pidToCPUs1, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs1 { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ovnContainerCpuset).To(Equal(cpumask), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ovnContainerCpuset, pid, cpumask) + collectAndVerify := func(phase string) { + ovnAffinity := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + guPodCPUs := collectGuCPUsFromPodList(ctx, dpListOpts) + testlog.Infof("Phase %s: GU CPUs = %s", phase, guPodCPUs) + verifyOvsMatchesExpected(ovnAffinity, ovsAffinities, + isWorkloadPartitioningEnabled, map[string]cpuset.CPUSet{ + cpuSetReserved: reservedCPUSet, + cpuSetIsolated: isolatedCPUSet, + cpuSetGUPod: guPodCPUs, + }) } - testlog.Info("Rebooting the node") - rebootCmd := "chroot /rootfs systemctl reboot" - testlog.TaggedInfof("Reboot", "Node %q: Rebooting", workerRTNode.Name) - _, _ = nodes.ExecCommand(ctx, workerRTNode, []string{"sh", "-c", rebootCmd}) - testlog.Info("Node Rebooted") - By("Waiting for node to go into not ready state after reboot") + waitForDeploymentReady(ctx, dp, dpListOpts, 2) + + By("Verifying affinities before reboot") + collectAndVerify("before-reboot") + + By(fmt.Sprintf("Rebooting the worker node %q", workerRTNode.Name)) + _, _ = nodes.ExecCommand(ctx, workerRTNode, []string{"sh", "-c", "chroot /rootfs systemctl reboot"}) nodes.WaitForNotReadyOrFail("Reboot", workerRTNode.Name, 10*time.Minute, 30*time.Second) - By("Waiting for node to be ready again after reboot") nodes.WaitForReadyOrFail("Reboot", workerRTNode.Name, 10*time.Minute, 30*time.Second) - // After reboot verify test pod created using deployment is running - // Get pods from the deployment - listOptions := &client.ListOptions{ - Namespace: dp.Namespace, - FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": workerRTNode.Name}), - LabelSelector: labels.SelectorFromSet(labels.Set{"type": "telco"}), + By("Waiting for deployment pods to be ready after reboot") + waitForDeploymentReady(ctx, dp, dpListOpts, 2) + + By("Verifying affinities after reboot") + collectAndVerify("after-reboot") + }) + + // In this test setup ROLE_WORKER_CNF can target control-plane nodes + // (for example ROLE_WORKER_CNF=masters). Keep the baseline aligned with + // the selected profile by validating OVS affinity against + // reserved+isolated from that profile. + It("[test_id: 89066] Verify OVS affinity is not restricted to reserved CPUs after control plane node reboot", func() { + isSchedulable, err := cluster.IsControlPlaneSchedulable(ctx) + Expect(err).ToNot(HaveOccurred(), "Unable to check if control plane is schedulable") + if !isSchedulable { + Skip("Control plane nodes are not schedulable") } - podList := &corev1.PodList{} - dpObj := client.ObjectKeyFromObject(dp) - Eventually(func() bool { - if err := testclient.DataPlaneClient.List(context.TODO(), podList, listOptions); err != nil { - return false - } - if err = testclient.DataPlaneClient.Get(context.TODO(), dpObj, dp); err != nil { - return false - } - if dp.Status.ReadyReplicas != int32(2) { - testlog.Warningf("Waiting for deployment: %q to have %d replicas ready, current number of replicas: %d", dpObj.String(), int32(2), dp.Status.ReadyReplicas) - return false - } - for _, s := range podList.Items[0].Status.ContainerStatuses { - if !s.Ready { - return false + + By("Finding a control plane node with OVS dynamic pinning") + var cpNode *corev1.Node + + if _, ok := workerRTNode.Labels["node-role.kubernetes.io/control-plane"]; ok { + cpNode = workerRTNode + } else { + cpNodes, err := nodes.GetByLabels(map[string]string{"node-role.kubernetes.io/control-plane": ""}) + Expect(err).ToNot(HaveOccurred()) + for i := range cpNodes { + cmd := []string{"ls", activation_file} + _, err := nodes.ExecCommand(ctx, &cpNodes[i], cmd) + if err == nil { + cpNode = &cpNodes[i] + break } } - return true - }, 5*time.Minute, 10*time.Second).Should(BeTrue()) - ovnPodAfterReboot, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") - ovnContainerIdsAfterReboot, err := ovnPodContainers(&ovnPodAfterReboot) - Expect(err).ToNot(HaveOccurred()) - containerPid, err = nodes.ContainerPid(ctx, workerRTNode, ovnContainerIdsAfterReboot[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ovnContainerCpusetAfterReboot := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("cpus used by ovn kube node pods %v", ovnContainerCpusetAfterReboot) - pidListAfterReboot, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - - Expect(err).ToNot(HaveOccurred()) + } - // Verify ovs-vswitchd and ovsdb-server process affinity is updated - pidToCPUs2, err := getCPUMaskForPids(ctx, pidListAfterReboot, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs2 { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ovnContainerCpusetAfterReboot).To(Equal(cpumask), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ovnContainerCpusetAfterReboot, pid, cpumask) + if cpNode == nil { + Skip("No control plane node with OVS dynamic pinning found") } + testlog.Infof("Using control plane node: %s", cpNode.Name) + + cpOvsServices := ovsSystemdServicesOnOvsSlice(ctx, cpNode) + baselineCPUs := reservedCPUSet.Union(isolatedCPUSet) + + By("Verify OVS affinity before reboot matches profile reserved+isolated") + ovsBeforeReboot := getOvsAffinities(ctx, cpOvsServices, cpNode) + Expect(ovsBeforeReboot).ToNot(BeEmpty(), "Expected non-empty OVS affinities on control-plane node before reboot") + verifyOvsAffinity(ovsBeforeReboot, baselineCPUs) + + By(fmt.Sprintf("Rebooting the control plane node %q", cpNode.Name)) + _, _ = nodes.ExecCommand(ctx, cpNode, []string{"sh", "-c", "chroot /rootfs systemctl reboot"}) + nodes.WaitForNotReadyOrFail("Reboot", cpNode.Name, 10*time.Minute, 30*time.Second) + nodes.WaitForReadyOrFail("Reboot", cpNode.Name, 10*time.Minute, 30*time.Second) + + By("Waiting for OVN pod to be ready on the control plane node") + Eventually(func() error { + _, err := ovnCnfNodePod(ctx, cpNode) + return err + }, 5*time.Minute, 10*time.Second).Should(Succeed(), + "OVN pod did not become ready on control plane node after reboot") + + By("Verify OVS affinity after reboot matches profile reserved+isolated") + ovsAfterReboot := getOvsAffinities(ctx, cpOvsServices, cpNode) + Expect(ovsAfterReboot).ToNot(BeEmpty(), "Expected non-empty OVS affinities on control-plane node after reboot") + verifyOvsAffinity(ovsAfterReboot, baselineCPUs) }) // Automates OCPBUGS-35347: ovs-vswitchd is using isolated cpu pool instead of reserved pool It("[test_id:75257] verify ovs-switchd threads inherit cpu affinity", func() { - checkCpuCount(context.TODO(), workerRTNode) - By("Get Thread Affinity of ovs-vswitchd process") - //Get ovs-switchd thread affinity + checkCpuCount(ctx, workerRTNode) + + By("Verifying ovs-vswitchd thread affinity covers reserved+isolated CPUs") threadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) Expect(err).ToNot(HaveOccurred()) - - // Verify thread affinity contains all the online cpu's + baselineCpus := reservedCPUSet.Union(isolatedCPUSet) for _, line := range threadAffinity { if line != "" { - cpumask := strings.Split(line, ":") - threadsCpuset, err := cpuset.Parse(strings.TrimSpace(cpumask[1])) + parts := strings.Split(line, ":") + threadsCpuset, err := cpuset.Parse(strings.TrimSpace(parts[1])) Expect(err).ToNot(HaveOccurred()) - Expect(threadsCpuset.Equals(onlineCPUSet), "actual cpuset %s not equals to expected cpuset %s", threadsCpuset, onlineCPUSet) + Expect(threadsCpuset.Equals(baselineCpus)).To(BeTrue(), + "actual cpuset %s not equals to expected cpuset %s", threadsCpuset, baselineCpus) } } - // create deployment with 2 replicas and each pod have 2 cpus - var dp *appsv1.Deployment = newDeployment() - testNode := make(map[string]string) - testNode["kubernetes.io/hostname"] = workerRTNode.Name - dp.Spec.Template.Spec.NodeSelector = testNode - err = testclient.DataPlaneClient.Create(ctx, dp) - Expect(err).ToNot(HaveOccurred(), "Unable to create Deployment") + By("Creating a deployment with 2 guaranteed replicas") + dp := newDeployment() + dp.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": workerRTNode.Name} + Expect(testclient.DataPlaneClient.Create(ctx, dp)).To(Succeed(), "Unable to create Deployment") - // Delete deployment defer func() { - By("Delete Deployment") - testlog.Infof("Deleting Deployment %v", dp.Name) - err := testclient.DataPlaneClient.Delete(ctx, dp) - Expect(err).ToNot(HaveOccurred()) - // once deployment is deleted - // wait till the ovs process affinity is reverted back - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) + testlog.Infof("Deleting Deployment %s", dp.Name) + Expect(testclient.DataPlaneClient.Delete(ctx, dp)).To(Succeed()) + baselineCpus := reservedCPUSet.Union(isolatedCPUSet) Eventually(func() bool { - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - if !cpumask.Equals(onlineCPUSet) { - testlog.Warningf("ovs servics pid %s cpu mask is %s instead of %s", pid, cpumask, onlineCPUSet) + affinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range affinities { + if !mask.Equals(baselineCpus) { + testlog.Warningf("OVS pid %s mask is %s instead of %s", pid, mask, baselineCpus) return false } } @@ -603,135 +519,233 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab }, 2*time.Minute, 10*time.Second).Should(BeTrue()) }() - testlog.Info("Get the pods list form the deployment") - // Get pods from the deployment - listOptions := &client.ListOptions{ + listOpts := &client.ListOptions{ Namespace: dp.Namespace, FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": workerRTNode.Name}), LabelSelector: labels.SelectorFromSet(labels.Set{"type": "telco"}), } - podList := &corev1.PodList{} - dpObj := client.ObjectKeyFromObject(dp) - Eventually(func() bool { - if err := testclient.DataPlaneClient.List(context.TODO(), podList, listOptions); err != nil { - return false - } - if err = testclient.DataPlaneClient.Get(context.TODO(), dpObj, dp); err != nil { - return false - } - if dp.Status.ReadyReplicas != int32(2) { - testlog.Warningf("Waiting for deployment: %q to have %d replicas ready, current number of replicas: %d", dpObj.String(), int32(2), dp.Status.ReadyReplicas) - return false - } - for _, s := range podList.Items[0].Status.ContainerStatuses { - if !s.Ready { - return false - } - } - return true - }, 10*time.Second, 5*time.Second).Should(BeTrue()) - testlog.Info("Get ovs-vswitchd threads affinity post deployment") - postDeploymentThreadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) + podList := waitForDeploymentReady(ctx, dp, listOpts, 2) + + By("Verifying GU pod CPUs are not a subset of ovs-vswitchd thread affinity") + postDeployThreadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) Expect(err).ToNot(HaveOccurred()) - for _, pod := range podList.Items { - cmd := []string{"taskset", "-pc", "1"} - outputb, err := pods.ExecCommandOnPod(testclient.K8sClient, &pod, "", cmd) - Expect(err).ToNot(HaveOccurred()) - testpodCpus := bytes.Split(outputb, []byte(":")) - testlog.Infof("%v pod is using cpus %v", pod.Name, string(testpodCpus[1])) - podcpus, err := cpuset.Parse(strings.TrimSpace(string(testpodCpus[1]))) - Expect(err).ToNot(HaveOccurred()) - for _, line := range postDeploymentThreadAffinity { + for i := range podList.Items { + podcpus := getGuPodCPUs(ctx, &podList.Items[i]) + for _, line := range postDeployThreadAffinity { if line != "" { - cpumask := strings.Split(line, ":") - threadsCpuset, err := cpuset.Parse(strings.TrimSpace(cpumask[1])) + parts := strings.Split(line, ":") + threadsCpuset, err := cpuset.Parse(strings.TrimSpace(parts[1])) Expect(err).ToNot(HaveOccurred()) - testlog.Infof("ovs-switchd thread CpuAffinity: %s, pod %s Affinity: %s", threadsCpuset, pod.Name, podcpus) - Expect(podcpus.IsSubsetOf(threadsCpuset)).To(Equal(false)) + testlog.Infof("ovs-vswitchd thread affinity: %s, pod %s affinity: %s", threadsCpuset, podList.Items[i].Name, podcpus) + Expect(podcpus.IsSubsetOf(threadsCpuset)).To(BeFalse()) } } } - testlog.Infof("Deleting one of the pods of the deployment %q", dpObj.String()) + + By("Deleting one pod and waiting for replacement") podToDelete := podList.Items[0] Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, &podToDelete)).To(Succeed()) + podList = waitForDeploymentReady(ctx, dp, listOpts, 2) - Eventually(func() bool { - err = testclient.DataPlaneClient.Get(context.TODO(), dpObj, dp) - if dp.Status.ReadyReplicas != int32(2) { - testlog.Warningf("Waiting for deployment: %q to have %d replicas ready, current number of replicas: %d", dpObj.String(), int32(2), dp.Status.ReadyReplicas) - return false + By("Verifying GU pod CPUs are still not a subset of ovs-vswitchd thread affinity") + refreshedThreadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) + Expect(err).ToNot(HaveOccurred()) + for i := range podList.Items { + podcpus := getGuPodCPUs(ctx, &podList.Items[i]) + for _, line := range refreshedThreadAffinity { + if line != "" { + parts := strings.Split(line, ":") + threadsCpuset, err := cpuset.Parse(strings.TrimSpace(parts[1])) + Expect(err).ToNot(HaveOccurred()) + testlog.Infof("ovs-vswitchd thread affinity: %s, pod %s affinity: %s", threadsCpuset, podList.Items[i].Name, podcpus) + Expect(podcpus.IsSubsetOf(threadsCpuset)).To(BeFalse()) + } } - return true - }).WithTimeout(time.Minute*5).WithPolling(time.Second*30).Should(BeTrue(), "deployment %q failed to have %d running replicas within the defined period", dpObj.String(), int32(2)) + } + }) + }) - // Get pods from the deployment - listOptions = &client.ListOptions{ - Namespace: dp.Namespace, - FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": workerRTNode.Name}), - LabelSelector: labels.SelectorFromSet(labels.Set{"type": "telco"}), + Context("Workload Partitioning OVS affinity", Label(string(label.Tier2)), func() { + BeforeEach(func() { + if !isWorkloadPartitioningEnabled { + Skip("Workload partitioning is not enabled on this cluster") } - podList = &corev1.PodList{} - Eventually(func() bool { - if err := testclient.DataPlaneClient.List(context.TODO(), podList, listOptions); err != nil { - return false - } - if len(podList.Items) < 1 { - return false + }) + + It("[test_id:89062] Verify OVN pod is restricted to reserved CPUs under workload partitioning", func() { + ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) + Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") + containerIds, err := ovnPodContainers(&ovnPod) + Expect(err).ToNot(HaveOccurred()) + + By("Verify each OVN container's cgroup cpuset and process affinity are restricted to reserved CPUs") + for _, ctn := range containerIds { + pid, err := nodes.ContainerPid(ctx, workerRTNode, ctn) + Expect(err).ToNot(HaveOccurred()) + + ctnAffinity := taskSet(ctx, pid, workerRTNode) + testlog.Infof("OVN container pid %s affinity: %s", pid, ctnAffinity) + Expect(ctnAffinity).To(Equal(reservedCPUSet), + "Under workload partitioning, OVN container pid %s affinity (%s) should equal reserved CPUs (%s)", + pid, ctnAffinity, reservedCPUSet) + } + }) + + It("[test_id:89063] Verify OVS affinity is wider than OVN pod under workload partitioning", func() { + By("Get OVN container affinity") + ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) + Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") + ovnContainerids, err := ovnPodContainers(&ovnPod) + Expect(err).ToNot(HaveOccurred()) + containerPid, err := nodes.ContainerPid(ctx, workerRTNode, ovnContainerids[0]) + Expect(err).ToNot(HaveOccurred()) + var ctnCpuset cpuset.CPUSet + Eventually(func() cpuset.CPUSet { + ctnCpuset = taskSet(ctx, containerPid, workerRTNode) + return ctnCpuset + }, 2*time.Minute, 5*time.Second).Should(Equal(reservedCPUSet), + "OVN container should be restricted to reserved CPUs under workload partitioning") + testlog.Infof("OVN container affinity: %s", ctnCpuset) + + By("Get OVS process affinity and verify it is wider") + pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) + Expect(err).ToNot(HaveOccurred()) + pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) + Expect(err).ToNot(HaveOccurred()) + + for pid, ovsAffinity := range pidToCPUs { + testlog.Infof("OVS service pid %s affinity: %s", pid, ovsAffinity) + Expect(reservedCPUSet.IsSubsetOf(ovsAffinity)).To(BeTrue(), + "Reserved CPUs (%s) should be a subset of OVS affinity (%s)", reservedCPUSet, ovsAffinity) + Expect(ovsAffinity.Equals(reservedCPUSet)).To(BeFalse(), + "OVS affinity (%s) should be wider than reserved CPUs (%s) when no GU pods exist", ovsAffinity, reservedCPUSet) + } + }) + + It("[test_id:89064] Verify reserved CPUs are always included in OVS affinity under workload partitioning", func() { + checkCpuCount(ctx, workerRTNode) + + By("Verify reserved CPUs are part of OVS affinity before creating GU pods") + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range ovsAffinities { + Expect(reservedCPUSet.IsSubsetOf(mask)).To(BeTrue(), + "Reserved CPUs (%s) should be a subset of OVS affinity (%s) for pid %s", reservedCPUSet, mask, pid) + } + + By("Create a GU pod and verify reserved CPUs remain in OVS affinity") + testpod := createGuPod(ctx, workerRTNode) + guPodCPUs := getGuPodCPUs(ctx, testpod) + + ovsAffinities = getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range ovsAffinities { + testlog.Infof("OVS pid %s affinity with GU pod: %s", pid, mask) + Expect(reservedCPUSet.IsSubsetOf(mask)).To(BeTrue(), + "Reserved CPUs (%s) should still be a subset of OVS affinity (%s) for pid %s", reservedCPUSet, mask, pid) + Expect(guPodCPUs.IsSubsetOf(mask)).To(BeFalse(), + "GU pod CPUs (%s) should NOT be a subset of OVS affinity (%s)", guPodCPUs, mask) + } + + By("Delete the GU pod and verify reserved CPUs are still in OVS affinity") + Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, testpod)).To(Succeed()) + + baselineCpus := reservedCPUSet.Union(isolatedCPUSet) + ovsAffinities = getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range ovsAffinities { + testlog.Infof("OVS pid %s affinity after GU pod deletion: %s", pid, mask) + Expect(reservedCPUSet.IsSubsetOf(mask)).To(BeTrue(), + "Reserved CPUs (%s) should be a subset of OVS affinity (%s) for pid %s after deletion", reservedCPUSet, mask, pid) + Expect(mask).To(Equal(baselineCpus), + "OVS affinity should return to reserved+isolated after GU pod deletion") + } + }) + + It("[test_id:89065] Verify OVS affinity excludes CPUs pinned by guaranteed pods under workload partitioning", func() { + checkCpuCount(ctx, workerRTNode) + + isolatedCPUs := isolatedCPUSet + isolatedCount := isolatedCPUs.Size() + if isolatedCount < 2 { + Skip("Not enough isolated CPUs to run this test") + } + + nodeLoad, err := baseload.ForNode(ctx, testclient.DataPlaneClient, workerRTNode.Name) + Expect(err).ToNot(HaveOccurred()) + availableCPUs := nodeLoad.AvailableCPUs(isolatedCount) + availableCPUs = availableCPUs &^ 1 // round down to even to satisfy SMT alignment + testlog.Infof("Isolated CPUs: %d, already consumed: %d, available (even-aligned): %d", + isolatedCount, nodeLoad.CPURequestedCores(), availableCPUs) + if availableCPUs < 2 { + Skip(fmt.Sprintf("Not enough available isolated CPUs: %d available out of %d total", availableCPUs, isolatedCount)) + } + + By(fmt.Sprintf("Creating GU pods to consume %d available isolated CPUs", availableCPUs)) + var guPods []*corev1.Pod + remainingIsolated := availableCPUs + for remainingIsolated > 0 { + cpuRequest := remainingIsolated + if cpuRequest > 2 { + cpuRequest = 2 } - for _, s := range podList.Items[0].Status.ContainerStatuses { - if !s.Ready { - return false - } + testpod := pods.GetTestPod() + testpod.Namespace = testutils.NamespaceTesting + testpod.Spec.Containers[0].Resources = corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", cpuRequest)), + corev1.ResourceMemory: resource.MustParse("200Mi"), + }, } - return true - }, 10*time.Second, 5*time.Second).Should(BeTrue()) - // Post deletion of deployment pods verify ovs-vswitchd thread affinity - refresshedThreadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for _, pod := range podList.Items { - cmd := []string{"taskset", "-pc", "1"} - outputb, err := pods.ExecCommandOnPod(testclient.K8sClient, &pod, "", cmd) + testpod.Spec.NodeSelector = map[string]string{testutils.LabelHostname: workerRTNode.Name} + err := testclient.DataPlaneClient.Create(ctx, testpod) Expect(err).ToNot(HaveOccurred()) - testpodCpus := bytes.Split(outputb, []byte(":")) - testlog.Infof("%v pod is using cpus %v", pod.Name, string(testpodCpus[1])) - podcpus, err := cpuset.Parse(strings.TrimSpace(string(testpodCpus[1]))) + testpod, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) Expect(err).ToNot(HaveOccurred()) - for _, line := range refresshedThreadAffinity { - if line != "" { - cpumask := strings.Split(line, ":") - threadsCpuset, err := cpuset.Parse(strings.TrimSpace(cpumask[1])) - Expect(err).ToNot(HaveOccurred()) - testlog.Infof("ovs-switchd thread CpuAffinity: %s, pod %s Affinity: %s", threadsCpuset, pod.Name, podcpus) - Expect(podcpus.IsSubsetOf(threadsCpuset)).To(Equal(false)) - } - } + Expect(testpod.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) + guPods = append(guPods, testpod) + remainingIsolated -= cpuRequest } + defer func() { + for _, p := range guPods { + testlog.Infof("Cleaning up GU pod %s", p.Name) + Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, p)).To(Succeed()) + } + baselineCpus := reservedCPUSet.Union(isolatedCPUSet) + Eventually(func() bool { + affinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range affinities { + if !mask.Equals(baselineCpus) { + testlog.Warningf("OVS pid %s mask is %s instead of %s", pid, mask, baselineCpus) + return false + } + } + return true + }, 5*time.Minute, 10*time.Second).Should(BeTrue()) + }() + + guPodCPUs := cpuset.New() + for _, p := range guPods { + guPodCPUs = guPodCPUs.Union(getGuPodCPUs(ctx, p)) + } + expected := expectedOvsAffinity(reservedCPUSet, isolatedCPUSet, guPodCPUs) + testlog.Infof("GU pod CPUs: %s, expected OVS affinity: %s", guPodCPUs, expected) + + By("Verify OVS affinity excludes the guaranteed pods' pinned CPUs") + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range ovsAffinities { + testlog.Infof("OVS pid %s affinity: %s (expected: %s)", pid, mask, expected) + Expect(mask.Equals(expected)).To(BeTrue(), + "OVS pid %s affinity (%s) should equal expected (%s) after GU pod pinning", + pid, mask, expected) + } }) }) }) }) -func cpuSpecToString(cpus *performancev2.CPU) string { - if cpus == nil { - return "" - } - sb := strings.Builder{} - if cpus.Reserved != nil { - fmt.Fprintf(&sb, "reserved=[%s]", *cpus.Reserved) - } - if cpus.Isolated != nil { - fmt.Fprintf(&sb, " isolated=[%s]", *cpus.Isolated) - } - if cpus.BalanceIsolated != nil { - fmt.Fprintf(&sb, " balanceIsolated=%t", *cpus.BalanceIsolated) - } - return sb.String() -} - // checkCpuCount check if the node has sufficient cpus func checkCpuCount(ctx context.Context, workerNode *corev1.Node) { + GinkgoHelper() out, err := nodes.ExecCommand(ctx, workerNode, []string{"nproc", "--all"}) if err != nil { Fail(fmt.Sprintf("Failed to fetch online CPUs: %v", err)) @@ -850,6 +864,7 @@ func newDeployment() *appsv1.Deployment { // ovsSystemdServicesOnOvsSlice returns the systemd services dependent on ovs.slice cgroup func ovsSystemdServicesOnOvsSlice(ctx context.Context, workerRTNode *corev1.Node) []string { + GinkgoHelper() ovsServices, err := systemd.ShowProperty(context.TODO(), "ovs.slice", "RequiredBy", workerRTNode) Expect(err).ToNot(HaveOccurred()) serviceList := strings.Split(strings.TrimSpace(ovsServices), "=") @@ -883,6 +898,7 @@ func ovsPids(ctx context.Context, ovsSystemdServices []string, workerRTNode *cor // taskSet returns cpus used by the pid func taskSet(ctx context.Context, pid string, workerRTNode *corev1.Node) cpuset.CPUSet { + GinkgoHelper() cmd := []string{"taskset", "-pc", pid} out, err := nodes.ExecCommand(ctx, workerRTNode, cmd) Expect(err).ToNot(HaveOccurred(), "unable to fetch cpus using taskset") @@ -910,3 +926,179 @@ func ovsSwitchdThreadAffinity(ctx context.Context, workerRTNode *corev1.Node) ([ threadAffinity := strings.Split(string(out), "\n") return threadAffinity, nil } + +// expectedOvsAffinity computes the expected OVN/OVS CPU affinity set. +// Formula: (reserved + isolated) - GU_Pinned +// reserved+isolated is the profile-derived baseline for OVS. Subtracting +// GU-pinned CPUs yields the expected affinity when GU pods are running. +func expectedOvsAffinity(reservedCPUs, isolatedCPUs, guPodCPUs cpuset.CPUSet) cpuset.CPUSet { + return reservedCPUs.Union(isolatedCPUs).Difference(guPodCPUs) +} + +// getOvnContainerAffinity returns the CPU affinity of the first OVN container +// on the given node, waiting 30s for affinity to stabilise. +func getOvnContainerAffinity(ctx context.Context, node *corev1.Node) cpuset.CPUSet { + GinkgoHelper() + ovnPod, err := ovnCnfNodePod(ctx, node) + Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") + containerIds, err := ovnPodContainers(&ovnPod) + Expect(err).ToNot(HaveOccurred()) + pid, err := nodes.ContainerPid(ctx, node, containerIds[0]) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(30 * time.Second) + affinity := taskSet(ctx, pid, node) + testlog.Infof("OVN container %s affinity: %s", ovnPod.Name, affinity) + return affinity +} + +// getOvsAffinities returns a pid-to-cpuset map for all OVS service processes +// on the given node, waiting 30s for affinity to stabilise. +func getOvsAffinities(ctx context.Context, services []string, node *corev1.Node) map[string]cpuset.CPUSet { + GinkgoHelper() + pidList, err := ovsPids(ctx, services, node) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(30 * time.Second) + affinities, err := getCPUMaskForPids(ctx, pidList, node) + Expect(err).ToNot(HaveOccurred()) + return affinities +} + +// verifyOvsAffinity asserts that every OVS process has the given expected CPU set. +func verifyOvsAffinity(ovsAffinities map[string]cpuset.CPUSet, expected cpuset.CPUSet) { + GinkgoHelper() + for pid, mask := range ovsAffinities { + testlog.Infof("OVS pid %s affinity: %s (expected: %s)", pid, mask, expected) + Expect(mask).To(Equal(expected)) + } +} + +// verifyOvsMatchesExpected validates OVS affinity in both workload-partitioning +// and non-workload-partitioning modes. +func verifyOvsMatchesExpected(ovnAffinity cpuset.CPUSet, + ovsAffinities map[string]cpuset.CPUSet, + isWorkloadPartitioningEnabled bool, + expectedCPUSets map[string]cpuset.CPUSet) { + GinkgoHelper() + reservedCPUs, ok := expectedCPUSets[cpuSetReserved] + Expect(ok).To(BeTrue(), "expected CPU map should include %q", cpuSetReserved) + isolatedCPUs, ok := expectedCPUSets[cpuSetIsolated] + Expect(ok).To(BeTrue(), "expected CPU map should include %q", cpuSetIsolated) + guPodCPUs, ok := expectedCPUSets[cpuSetGUPod] + Expect(ok).To(BeTrue(), "expected CPU map should include %q", cpuSetGUPod) + + if !isWorkloadPartitioningEnabled { + expectedAffinity := expectedOvsAffinity(reservedCPUs, isolatedCPUs, guPodCPUs) + Expect(ovnAffinity).To(Equal(expectedAffinity), + "Without WP, OVN container should be restricted to reserved+isolated CPUs excluding GU pod CPUs") + verifyOvsAffinity(ovsAffinities, expectedAffinity) + return + } + Expect(ovnAffinity).To(Equal(reservedCPUs), + "Under WP, OVN container should be restricted to reserved cpus") + verifyOvsAffinity(ovsAffinities, expectedOvsAffinity(reservedCPUs, isolatedCPUs, guPodCPUs)) +} + +func parseProfileCPUSets(profile *performancev2.PerformanceProfile) map[string]cpuset.CPUSet { + GinkgoHelper() + cpuSets := map[string]cpuset.CPUSet{ + cpuSetReserved: cpuset.New(), + cpuSetIsolated: cpuset.New(), + cpuSetShared: cpuset.New(), + cpuSetOfflined: cpuset.New(), + } + + parseCPUSet := func(name string, raw *performancev2.CPUSet) { + if raw == nil { + return + } + parsed, err := cpuset.Parse(string(*raw)) + Expect(err).ToNot(HaveOccurred(), "Failed to parse %s CPUs from profile", name) + cpuSets[name] = parsed + } + + parseCPUSet(cpuSetReserved, profile.Spec.CPU.Reserved) + parseCPUSet(cpuSetIsolated, profile.Spec.CPU.Isolated) + parseCPUSet(cpuSetShared, profile.Spec.CPU.Shared) + parseCPUSet(cpuSetOfflined, profile.Spec.CPU.Offlined) + + return cpuSets +} + +// createGuPod creates a 2-CPU Guaranteed QoS pod on the given node, waits for +// it to be ready, and returns it. +func createGuPod(ctx context.Context, node *corev1.Node) *corev1.Pod { + GinkgoHelper() + testpod := pods.GetTestPod() + testpod.Namespace = testutils.NamespaceTesting + testpod.Spec.Containers[0].Resources = corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("200Mi"), + }, + } + testpod.Spec.NodeSelector = map[string]string{testutils.LabelHostname: node.Name} + err := testclient.DataPlaneClient.Create(ctx, testpod) + Expect(err).ToNot(HaveOccurred()) + testpod, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) + Expect(testpod.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) + testlog.Infof("GU pod %s pinned to cpus %s", testpod.Name, getGuPodCPUs(ctx, testpod)) + return testpod +} + +// waitForDeploymentReady polls until the deployment has the expected number of +// ready replicas and returns the matching pod list. +func waitForDeploymentReady(ctx context.Context, dp *appsv1.Deployment, listOpts *client.ListOptions, replicas int32) *corev1.PodList { + GinkgoHelper() + podList := &corev1.PodList{} + dpObj := client.ObjectKeyFromObject(dp) + Eventually(func() bool { + if err := testclient.DataPlaneClient.List(ctx, podList, listOpts); err != nil { + return false + } + if err := testclient.DataPlaneClient.Get(ctx, dpObj, dp); err != nil { + return false + } + if dp.Status.ReadyReplicas != replicas { + testlog.Warningf("Deployment %q: %d/%d replicas ready", dpObj.String(), dp.Status.ReadyReplicas, replicas) + return false + } + if len(podList.Items) == 0 { + return false + } + for _, s := range podList.Items[0].Status.ContainerStatuses { + if !s.Ready { + return false + } + } + return true + }, 5*time.Minute, 10*time.Second).Should(BeTrue()) + return podList +} + +// collectGuCPUsFromPodList lists pods matching the given options and returns +// the union of all their exclusively pinned CPUs. +func collectGuCPUsFromPodList(ctx context.Context, listOpts *client.ListOptions) cpuset.CPUSet { + GinkgoHelper() + podList := &corev1.PodList{} + err := testclient.DataPlaneClient.List(ctx, podList, listOpts) + Expect(err).ToNot(HaveOccurred()) + allCPUs := cpuset.New() + for i := range podList.Items { + allCPUs = allCPUs.Union(getGuPodCPUs(ctx, &podList.Items[i])) + } + return allCPUs +} + +// getGuPodCPUs extracts the exclusively pinned CPUs from a Guaranteed QoS pod +// by running taskset inside the pod. +func getGuPodCPUs(ctx context.Context, pod *corev1.Pod) cpuset.CPUSet { + GinkgoHelper() + cmd := []string{"taskset", "-pc", "1"} + outputb, err := pods.ExecCommandOnPod(testclient.K8sClient, pod, "", cmd) + Expect(err).ToNot(HaveOccurred()) + parts := bytes.Split(outputb, []byte(":")) + podcpus, err := cpuset.Parse(strings.TrimSpace(string(parts[1]))) + Expect(err).ToNot(HaveOccurred()) + return podcpus +} diff --git a/test/e2e/performanceprofile/functests/utils/cluster/cluster.go b/test/e2e/performanceprofile/functests/utils/cluster/cluster.go index 44bc1e2dba..03ce1a9ea0 100644 --- a/test/e2e/performanceprofile/functests/utils/cluster/cluster.go +++ b/test/e2e/performanceprofile/functests/utils/cluster/cluster.go @@ -4,12 +4,10 @@ import ( "context" "time" - clientconfigv1 "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1" + configv1 "github.com/openshift/api/config/v1" testclient "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/client" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/config" ) // IsSingleNode validates if the environment is single node cluster @@ -34,16 +32,19 @@ func ComputeTestTimeout(baseTimeout time.Duration, isSno bool) time.Duration { // Check if the control plane nodes are schedulable, returns true if schedulable else false func IsControlPlaneSchedulable(ctx context.Context) (bool, error) { - // Get the rest.Config using the helper function - cfg, err := config.GetConfig() - if err != nil { + scheduler := &configv1.Scheduler{} + if err := testclient.ControlPlaneClient.Get(ctx, client.ObjectKey{Name: "cluster"}, scheduler); err != nil { return false, err } + return scheduler.Spec.MastersSchedulable, nil +} - openshiftConfigClient := clientconfigv1.NewForConfigOrDie(cfg) - schedulerInfo, err := openshiftConfigClient.Schedulers().Get(ctx, "cluster", metav1.GetOptions{}) - if err != nil { +// IsWorkloadPartitioningEnabled checks whether CPU partitioning is enabled +// cluster-wide by querying the Infrastructure resource's CPUPartitioning status. +func IsWorkloadPartitioningEnabled(ctx context.Context) (bool, error) { + infra := &configv1.Infrastructure{} + if err := testclient.ControlPlaneClient.Get(ctx, client.ObjectKey{Name: "cluster"}, infra); err != nil { return false, err } - return schedulerInfo.Spec.MastersSchedulable, nil + return infra.Status.CPUPartitioning == configv1.CPUPartitioningAllNodes, nil }