From b13ce9a7ffad92e6d8f64fea0d2fb2aa6e4c98e2 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 30 Dec 2024 12:02:30 +0100 Subject: [PATCH] Add the RayJob Multikueue e2e tests --- charts/kueue/templates/rbac/role.yaml | 7 +++ config/components/rbac/role.yaml | 7 +++ test/e2e/multikueue/e2e_test.go | 85 +++++++++++++++++++++------ test/e2e/multikueue/suite_test.go | 8 ++- test/util/e2e.go | 9 +++ 5 files changed, 98 insertions(+), 18 deletions(-) diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index 6a27740947..b8435f8801 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -308,9 +308,16 @@ rules: - rayclusters/finalizers - rayclusters/status - rayjobs/finalizers + verbs: + - get + - update + - apiGroups: + - ray.io + resources: - rayjobs/status verbs: - get + - patch - update - apiGroups: - scheduling.k8s.io diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index 9d971bf309..699309bfe9 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -307,9 +307,16 @@ rules: - rayclusters/finalizers - rayclusters/status - rayjobs/finalizers + verbs: + - get + - update +- apiGroups: + - ray.io + resources: - rayjobs/status verbs: - get + - patch - update - apiGroups: - scheduling.k8s.io diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index 817ed7e65a..2d04620a05 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -19,12 +19,14 @@ package mke2e import ( "fmt" "os/exec" + "runtime" "github.com/google/go-cmp/cmp/cmpopts" kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -40,11 +42,13 @@ import ( workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" workloadpytorchjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob" workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob" + workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob" testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" + testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" ) @@ -491,6 +495,54 @@ var _ = ginkgo.Describe("MultiKueue", func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) }) + + ginkgo.It("Should run a RayJob on worker if admitted", func() { + E2eKuberayTestImage := "rayproject/ray:2.9.0" + if runtime.GOOS == "darwin" { + E2eKuberayTestImage = "rayproject/ray:2.9.0-aarch64" + } + + // Since it requires 1.5 CPU, this job can only be admitted in worker 1. + rayjob := testingrayjob.MakeJob("rayjob1", managerNs.Name). + Queue(managerLq.Name). + RayJobSpecsDefault(). + Request(rayv1.HeadNode, corev1.ResourceCPU, "1"). + Request(rayv1.WorkerNode, corev1.ResourceCPU, "0.5"). + Entrypoint("python -c \"import ray; ray.init(); print(ray.cluster_resources())\""). + RayVersion("2.9.0"). + Image(rayv1.HeadNode, E2eKuberayTestImage, []string{}). + Image(rayv1.WorkerNode, E2eKuberayTestImage, []string{}). + Obj() + + ginkgo.By("Creating the RayJob", func() { + gomega.Expect(k8sManagerClient.Create(ctx, rayjob)).Should(gomega.Succeed()) + }) + + wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(rayjob.Name, rayjob.UID), Namespace: managerNs.Name} + // the execution should be given to the worker1 + waitForJobAdmitted(wlLookupKey, multiKueueAc.Name, "worker1") + + ginkgo.By("Waiting for the RayJob to finish", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdRayJob := &rayv1.RayJob{} + g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(rayjob), createdRayJob)).To(gomega.Succeed()) + g.Expect(createdRayJob.Status.JobDeploymentStatus).To(gomega.Equal(rayv1.JobDeploymentStatusComplete)) + finishReasonMessage := "Job finished successfully." + checkFinishStatusCondition(g, wlLookupKey, finishReasonMessage) + }, 5*util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Checking no objects are left in the worker clusters and the RayJob is completed", func() { + gomega.Eventually(func(g gomega.Gomega) { + workerWl := &kueue.Workload{} + g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + workerRayJob := &rayv1.RayJob{} + g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(rayjob), workerRayJob)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(rayjob), workerRayJob)).To(utiltesting.BeNotFoundError()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) }) ginkgo.When("The connection to a worker cluster is unreliable", func() { ginkgo.It("Should update the cluster status to reflect the connection state", func() { @@ -571,23 +623,22 @@ var _ = ginkgo.Describe("MultiKueue", func() { }) func waitForJobAdmitted(wlLookupKey types.NamespacedName, acName, workerName string) { - ginkgo.By(fmt.Sprintf("Waiting to be admitted in %s and manager", workerName), func() { - gomega.Eventually(func(g gomega.Gomega) { - createdWorkload := &kueue.Workload{} - g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ - Type: kueue.WorkloadAdmitted, - Status: metav1.ConditionTrue, - Reason: "Admitted", - Message: "The workload is admitted", - }, util.IgnoreConditionTimestampsAndObservedGeneration)) - g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, acName)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ - Name: acName, - State: kueue.CheckStateReady, - Message: fmt.Sprintf(`The workload got reservation on "%s"`, workerName), - }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - }) + ginkgo.By(fmt.Sprintf("Waiting to be admitted in %s and manager", workerName)) + gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, acName)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: acName, + State: kueue.CheckStateReady, + Message: fmt.Sprintf(`The workload got reservation on "%s"`, workerName), + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) } func checkFinishStatusCondition(g gomega.Gomega, wlLookupKey types.NamespacedName, finishReasonMessage string) { diff --git a/test/e2e/multikueue/suite_test.go b/test/e2e/multikueue/suite_test.go index a6a2d4ac9b..ec6ae2f776 100644 --- a/test/e2e/multikueue/suite_test.go +++ b/test/e2e/multikueue/suite_test.go @@ -27,6 +27,7 @@ import ( kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" authenticationv1 "k8s.io/api/authentication/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -98,6 +99,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig policyRule(kftraining.SchemeGroupVersion.Group, "xgboostjobs/status", "get"), policyRule(kfmpi.SchemeGroupVersion.Group, "mpijobs", resourceVerbs...), policyRule(kfmpi.SchemeGroupVersion.Group, "mpijobs/status", "get"), + policyRule(rayv1.SchemeGroupVersion.Group, "rayjobs", resourceVerbs...), + policyRule(rayv1.SchemeGroupVersion.Group, "rayjobs/status", "get"), }, } err := c.Create(ctx, cr) @@ -273,7 +276,10 @@ var _ = ginkgo.BeforeSuite(func() { util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker1Client) util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker2Client) - ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart)) + util.WaitForKubeRayOperatorAvailability(ctx, k8sWorker1Client) + util.WaitForKubeRayOperatorAvailability(ctx, k8sWorker2Client) + + ginkgo.GinkgoLogr.Info("Kueue and all integration operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart)) discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerCfg) gomega.Expect(err).NotTo(gomega.HaveOccurred()) diff --git a/test/util/e2e.go b/test/util/e2e.go index 722600f00a..2119f268b9 100644 --- a/test/util/e2e.go +++ b/test/util/e2e.go @@ -9,6 +9,7 @@ import ( kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/gomega" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -60,6 +61,9 @@ func CreateClientUsingCluster(kContext string) (client.WithWatch, *rest.Config) err = kfmpi.AddToScheme(scheme.Scheme) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) + err = rayv1.AddToScheme(scheme.Scheme) + gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) + client, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme.Scheme}) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) return client, cfg @@ -130,3 +134,8 @@ func WaitForKubeFlowMPIOperatorAvailability(ctx context.Context, k8sClient clien kftoKey := types.NamespacedName{Namespace: "mpi-operator", Name: "mpi-operator"} waitForOperatorAvailability(ctx, k8sClient, kftoKey) } + +func WaitForKubeRayOperatorAvailability(ctx context.Context, k8sClient client.Client) { + kroKey := types.NamespacedName{Namespace: "ray-system", Name: "kuberay-operator"} + waitForOperatorAvailability(ctx, k8sClient, kroKey) +}