Skip to content

Commit

Permalink
Introduce Kuberay Multikueue integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Dec 19, 2024
1 parent 728d32e commit d27f12c
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 5 deletions.
2 changes: 1 addition & 1 deletion test/integration/controller/jobs/raycluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
ctx context.Context
fwk *framework.Framework
crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator", "crd", "bases")
)

func TestAPIs(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controller/jobs/rayjob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
ctx context.Context
fwk *framework.Framework
crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "ray-operator", "crd", "bases")
)

func TestAPIs(t *testing.T) {
Expand Down
58 changes: 56 additions & 2 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -49,13 +50,15 @@ import (
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob"
workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
"sigs.k8s.io/kueue/pkg/features"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob"
testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob"
testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob"
testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob"
testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
testingxgboostjob "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob"
"sigs.k8s.io/kueue/pkg/workload"
Expand All @@ -66,9 +69,10 @@ import (
var defaultEnabledIntegrations sets.Set[string] = sets.New(
"batch/job", "kubeflow.org/mpijob", "ray.io/rayjob", "ray.io/raycluster",
"jobset.x-k8s.io/jobset", "kubeflow.org/mxjob", "kubeflow.org/paddlejob",
"kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob")
"kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob",
"ray.io/rayjob", "ray.io/raycluster")

var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var _ = ginkgo.FDescribe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var (
managerNs *corev1.Namespace
worker1Ns *corev1.Namespace
Expand Down Expand Up @@ -1442,6 +1446,56 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should run a RayJob on worker if admitted", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Name: "head",
}, kueue.PodSetAssignment{
Name: "workers-group-0",
},
)
rayjob := testingrayjob.MakeJob("rayjob1", managerNs.Name).
Queue(managerLq.Name).
RayJobSpecsDefault().
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, rayjob)).Should(gomega.Succeed())
wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(rayjob.Name, rayjob.UID), Namespace: managerNs.Name}
gomega.Eventually(func(g gomega.Gomega) {
createdWorkload := &kueue.Workload{}
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed())
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())

admitWorkloadAndCheckWorkerCopies(multikueueAC.Name, wlLookupKey, admission)

ginkgo.By("changing the status of the RayJob in the worker, updates the manager's RayJob status", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := rayv1.RayJob{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed())
createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRunning
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := rayv1.RayJob{}
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed())
g.Expect(createdRayJob.Status.JobDeploymentStatus).To(gomega.Equal(rayv1.JobDeploymentStatusRunning))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("finishing the worker RayJob, the manager's wl is marked as finished and the worker2 wl removed", func() {
finishJobReason := ""
gomega.Eventually(func(g gomega.Gomega) {
createdRayJob := rayv1.RayJob{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed())
createdRayJob.Status.JobStatus = rayv1.JobStatusSucceeded
createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusComplete
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

waitForWorkloadToFinishAndRemoteWorkloadToBeDeleted(wlLookupKey, finishJobReason)
})
})
})

var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
Expand Down
14 changes: 14 additions & 0 deletions test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob"
workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
Expand Down Expand Up @@ -95,6 +96,7 @@ func createCluster(setupFnc framework.ManagerSetup, apiFeatureGates ...string) c
DepCRDPaths: []string{filepath.Join("..", "..", "..", "dep-crds", "jobset-operator"),
filepath.Join("..", "..", "..", "dep-crds", "training-operator-crds"),
filepath.Join("..", "..", "..", "dep-crds", "mpi-operator"),
filepath.Join("..", "..", "..", "dep-crds", "ray-operator", "crd", "bases"),
},
APIServerFeatureGates: apiFeatureGates,
}
Expand Down Expand Up @@ -206,6 +208,18 @@ func managerSetup(ctx context.Context, mgr manager.Manager) {

err = workloadmpijob.SetupMPIJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues))
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadrayjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())

rayJobReconciler := workloadrayjob.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName))
err = rayJobReconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadrayjob.SetupRayJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues))
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/webhook/jobs/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
webhookPath = filepath.Join("..", "..", "..", "..", "config", "components", "webhook")
mpiCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "mpi-operator")
jobsetCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "jobset-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "ray-operator")
rayCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "ray-operator", "crd", "bases")
kubeflowCrdPath = filepath.Join("..", "..", "..", "..", "dep-crds", "training-operator-crds")
)

Expand Down

0 comments on commit d27f12c

Please sign in to comment.