diff --git a/test/integration/controller/jobs/raycluster/suite_test.go b/test/integration/controller/jobs/raycluster/suite_test.go index 6dd6ab1f7e..84e439d211 100644 --- a/test/integration/controller/jobs/raycluster/suite_test.go +++ b/test/integration/controller/jobs/raycluster/suite_test.go @@ -46,7 +46,7 @@ var ( ctx context.Context fwk *framework.Framework crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases") - rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator") + rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator", "crd", "bases") ) func TestAPIs(t *testing.T) { diff --git a/test/integration/controller/jobs/rayjob/suite_test.go b/test/integration/controller/jobs/rayjob/suite_test.go index c59ac16141..959c2ff383 100644 --- a/test/integration/controller/jobs/rayjob/suite_test.go +++ b/test/integration/controller/jobs/rayjob/suite_test.go @@ -45,7 +45,7 @@ var ( ctx context.Context fwk *framework.Framework crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases") - rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator") + rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator", "crd", "bases") ) func TestAPIs(t *testing.T) { diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index d43573d24a..f173d412b1 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -28,6 +28,7 @@ import ( kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -49,6 +50,7 @@ import ( workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob" workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob" + workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" "sigs.k8s.io/kueue/pkg/features" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" @@ -56,6 +58,7 @@ import ( testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob" testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" + testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" testingxgboostjob "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob" "sigs.k8s.io/kueue/pkg/workload" @@ -66,9 +69,10 @@ import ( var defaultEnabledIntegrations sets.Set[string] = sets.New( "batch/job", "kubeflow.org/mpijob", "ray.io/rayjob", "ray.io/raycluster", "jobset.x-k8s.io/jobset", "kubeflow.org/mxjob", "kubeflow.org/paddlejob", - "kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob") + "kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob", + "ray.io/rayjob", "ray.io/raycluster") -var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { +var _ = ginkgo.FDescribe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { var ( managerNs *corev1.Namespace worker1Ns *corev1.Namespace @@ -1442,6 +1446,56 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) }) + + ginkgo.It("Should run a RayJob on worker if admitted", func() { + admission := utiltesting.MakeAdmission(managerCq.Name).PodSets( + kueue.PodSetAssignment{ + Name: "head", + }, kueue.PodSetAssignment{ + Name: "workers-group-0", + }, + ) + rayjob := testingrayjob.MakeJob("rayjob1", managerNs.Name). + Queue(managerLq.Name). + RayJobSpecsDefault(). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, rayjob)).Should(gomega.Succeed()) + wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(rayjob.Name, rayjob.UID), Namespace: managerNs.Name} + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + + admitWorkloadAndCheckWorkerCopies(multikueueAC.Name, wlLookupKey, admission) + + ginkgo.By("changing the status of the RayJob in the worker, updates the manager's RayJob status", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdRayJob := rayv1.RayJob{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed()) + createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRunning + g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + createdRayJob := rayv1.RayJob{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed()) + g.Expect(createdRayJob.Status.JobDeploymentStatus).To(gomega.Equal(rayv1.JobDeploymentStatusRunning)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("finishing the worker RayJob, the manager's wl is marked as finished and the worker2 wl removed", func() { + finishJobReason := "" + gomega.Eventually(func(g gomega.Gomega) { + createdRayJob := rayv1.RayJob{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed()) + createdRayJob.Status.JobStatus = rayv1.JobStatusSucceeded + createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusComplete + g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + waitForWorkloadToFinishAndRemoteWorkloadToBeDeleted(wlLookupKey, finishJobReason) + }) + }) }) var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index 6425fab0dd..fb374a74af 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -49,6 +49,7 @@ import ( workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob" workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob" + workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/util/kubeversion" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" @@ -95,6 +96,7 @@ func createCluster(setupFnc framework.ManagerSetup, apiFeatureGates ...string) c DepCRDPaths: []string{filepath.Join("..", "..", "..", "dep-crds", "jobset-operator"), filepath.Join("..", "..", "..", "dep-crds", "training-operator-crds"), filepath.Join("..", "..", "..", "dep-crds", "mpi-operator"), + filepath.Join("..", "..", "..", "dep-crds", "ray-operator", "crd", "bases"), }, APIServerFeatureGates: apiFeatureGates, } @@ -206,6 +208,18 @@ func managerSetup(ctx context.Context, mgr manager.Manager) { err = workloadmpijob.SetupMPIJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues)) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = workloadrayjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + rayJobReconciler := workloadrayjob.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName)) + err = rayJobReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = workloadrayjob.SetupRayJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) { diff --git a/test/integration/webhook/jobs/suite_test.go b/test/integration/webhook/jobs/suite_test.go index 8260b74a96..f49b5ea480 100644 --- a/test/integration/webhook/jobs/suite_test.go +++ b/test/integration/webhook/jobs/suite_test.go @@ -46,7 +46,7 @@ var ( webhookPath = filepath.Join("..", "..", "..", "..", "config", "components", "webhook") mpiCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "mpi-operator") jobsetCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "jobset-operator") - rayCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "ray-operator") + rayCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "ray-operator", "crd", "bases") kubeflowCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "training-operator-crds") )