From 3585c81bc3fb404cd96a7ea658b0f6fd7b926154 Mon Sep 17 00:00:00 2001 From: Mykhailo Bobrovskyi Date: Thu, 2 Jan 2025 18:31:42 +0200 Subject: [PATCH] Test --- charts/kueue/templates/rbac/role.yaml | 7 - config/components/rbac/role.yaml | 7 - hack/e2e-common.sh | 35 ---- hack/multikueue-e2e-test.sh | 9 +- pkg/controller/jobframework/validation.go | 5 +- .../jobs/rayjob/rayjob_controller.go | 10 +- .../jobs/rayjob/rayjob_multikueue_adapter.go | 123 -------------- .../rayjob/rayjob_multikueue_adapter_test.go | 158 ------------------ pkg/util/testingjobs/rayjob/wrappers.go | 144 +++------------- .../create-multikueue-kubeconfig.sh | 32 ---- test/e2e/multikueue/e2e_test.go | 86 ++-------- test/e2e/multikueue/suite_test.go | 8 +- .../raycluster/raycluster_controller_test.go | 1 - .../jobs/rayjob/rayjob_controller_test.go | 21 +-- test/integration/framework/framework.go | 1 - .../integration/multikueue/multikueue_test.go | 54 ------ test/integration/multikueue/suite_test.go | 13 -- .../webhook/jobs/raycluster_webhook_test.go | 1 - .../webhook/jobs/rayjob_webhook_test.go | 3 +- test/util/e2e.go | 9 - 20 files changed, 53 insertions(+), 674 deletions(-) delete mode 100644 pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go delete mode 100644 pkg/controller/jobs/rayjob/rayjob_multikueue_adapter_test.go diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index 8a1e090dfa..900b77d8d2 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -298,16 +298,9 @@ rules: - rayclusters/finalizers - rayclusters/status - rayjobs/finalizers - verbs: - - get - - update - - apiGroups: - - ray.io - resources: - rayjobs/status verbs: - get - - patch - update - apiGroups: - scheduling.k8s.io diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index 34609fdab1..8b85ebde6c 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -297,16 +297,9 @@ rules: - rayclusters/finalizers - rayclusters/status - rayjobs/finalizers - verbs: - - get - - update -- apiGroups: - - ray.io - resources: - rayjobs/status verbs: - get - - patch - update - apiGroups: - scheduling.k8s.io diff --git a/hack/e2e-common.sh b/hack/e2e-common.sh index fa472ca13f..75ade48c10 100644 --- a/hack/e2e-common.sh +++ b/hack/e2e-common.sh @@ -40,14 +40,6 @@ if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then export KUBEFLOW_MPI_IMAGE=mpioperator/mpi-operator:${KUBEFLOW_MPI_VERSION/#v} fi -if [[ -n ${KUBERAY_VERSION:-} ]]; then - export KUBERAY_MANIFEST="${ROOT_DIR}/dep-crds/ray-operator/default/" - export KUBERAY_IMAGE=bitnami/kuberay-operator:${KUBERAY_VERSION/#v} - export KUBERAY_RAY_IMAGE=rayproject/ray:2.9.0 - export KUBERAY_RAY_IMAGE_ARM=rayproject/ray:2.9.0-aarch64 - export KUBERAY_CRDS=${ROOT_DIR}/dep-crds/ray-operator/crd/bases -fi - # sleep image to use for testing. export E2E_TEST_SLEEP_IMAGE_OLD=gcr.io/k8s-staging-perf-tests/sleep:v0.0.3@sha256:00ae8e01dd4439edfb7eb9f1960ac28eba16e952956320cce7f2ac08e3446e6b E2E_TEST_SLEEP_IMAGE_OLD_WITHOUT_SHA=${E2E_TEST_SLEEP_IMAGE_OLD%%@*} @@ -97,17 +89,6 @@ function prepare_docker_images { if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then docker pull "${KUBEFLOW_MPI_IMAGE}" fi - if [[ -n ${KUBERAY_VERSION:-} ]]; then - docker pull "${KUBERAY_IMAGE}" - - # Extra e2e images required for Kuberay - unamestr=$(uname) - if [[ "$unamestr" == 'Linux' ]]; then - docker pull "${KUBERAY_RAY_IMAGE}" - elif [[ "$unamestr" == 'Darwin' ]]; then - docker pull "${KUBERAY_RAY_IMAGE_ARM}" - fi - fi } # $1 cluster @@ -155,22 +136,6 @@ function install_mpi { kubectl apply --server-side -f "${KUBEFLOW_MPI_MANIFEST}" } -#$1 - cluster name -function install_kuberay { - # Extra e2e images required for Kuberay - unamestr=$(uname) - if [[ "$unamestr" == 'Linux' ]]; then - cluster_kind_load_image "${1}" "${KUBERAY_RAY_IMAGE}" - elif [[ "$unamestr" == 'Darwin' ]]; then - cluster_kind_load_image "${1}" "${KUBERAY_RAY_IMAGE_ARM}" - fi - - cluster_kind_load_image "${1}" "${KUBERAY_IMAGE}" - kubectl config use-context "kind-${1}" - # create used instead of apply - https://github.com/ray-project/kuberay/issues/504 - kubectl create -k "${KUBERAY_MANIFEST}" -} - INITIAL_IMAGE=$($YQ '.images[] | select(.name == "controller") | [.newName, .newTag] | join(":")' config/components/manager/kustomization.yaml) export INITIAL_IMAGE diff --git a/hack/multikueue-e2e-test.sh b/hack/multikueue-e2e-test.sh index b626452549..8dd3d87dd1 100755 --- a/hack/multikueue-e2e-test.sh +++ b/hack/multikueue-e2e-test.sh @@ -100,17 +100,10 @@ function kind_load { install_kubeflow "$WORKER1_KIND_CLUSTER_NAME" install_kubeflow "$WORKER2_KIND_CLUSTER_NAME" - ## MPI + ## MPI install_mpi "$MANAGER_KIND_CLUSTER_NAME" install_mpi "$WORKER1_KIND_CLUSTER_NAME" install_mpi "$WORKER2_KIND_CLUSTER_NAME" - - ## KUBERAY - kubectl config use-context "kind-${MANAGER_KIND_CLUSTER_NAME}" - kubectl apply --server-side -f "${KUBERAY_CRDS}" - - install_kuberay "$WORKER1_KIND_CLUSTER_NAME" - install_kuberay "$WORKER2_KIND_CLUSTER_NAME" } function kueue_deploy { diff --git a/pkg/controller/jobframework/validation.go b/pkg/controller/jobframework/validation.go index 0576d1c1f7..98be6809da 100644 --- a/pkg/controller/jobframework/validation.go +++ b/pkg/controller/jobframework/validation.go @@ -23,7 +23,6 @@ import ( kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" batchv1 "k8s.io/api/batch/v1" apivalidation "k8s.io/apimachinery/pkg/api/validation" "k8s.io/apimachinery/pkg/util/sets" @@ -48,9 +47,7 @@ var ( kftraining.SchemeGroupVersion.WithKind(kftraining.PaddleJobKind).String(), kftraining.SchemeGroupVersion.WithKind(kftraining.PyTorchJobKind).String(), kftraining.SchemeGroupVersion.WithKind(kftraining.XGBoostJobKind).String(), - kfmpi.SchemeGroupVersion.WithKind(kfmpi.Kind).String(), - rayv1.SchemeGroupVersion.WithKind("RayJob").String(), - rayv1.SchemeGroupVersion.WithKind("RayCluster").String()) + kfmpi.SchemeGroupVersion.WithKind(kfmpi.Kind).String()) ) // ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation diff --git a/pkg/controller/jobs/rayjob/rayjob_controller.go b/pkg/controller/jobs/rayjob/rayjob_controller.go index c93ac18ed7..16488da894 100644 --- a/pkg/controller/jobs/rayjob/rayjob_controller.go +++ b/pkg/controller/jobs/rayjob/rayjob_controller.go @@ -26,7 +26,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -56,13 +55,12 @@ func init() { JobType: &rayv1.RayJob{}, AddToScheme: rayv1.AddToScheme, IsManagingObjectsOwner: isRayJob, - MultiKueueAdapter: &multikueueAdapter{}, })) } // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update // +kubebuilder:rbac:groups=ray.io,resources=rayjobs,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/status,verbs=get;update // +kubebuilder:rbac:groups=ray.io,resources=rayjobs/finalizers,verbs=get;update // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch @@ -84,16 +82,12 @@ func (j *RayJob) Object() client.Object { return (*rayv1.RayJob)(j) } -func fromObject(obj runtime.Object) *RayJob { - return (*RayJob)(obj.(*rayv1.RayJob)) -} - func (j *RayJob) IsSuspended() bool { return j.Spec.Suspend } func (j *RayJob) IsActive() bool { - return (j.Status.JobDeploymentStatus != rayv1.JobDeploymentStatusSuspended) && (j.Status.JobDeploymentStatus != rayv1.JobDeploymentStatusNew) + return j.Status.JobDeploymentStatus != rayv1.JobDeploymentStatusSuspended } func (j *RayJob) Suspend() { diff --git a/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go b/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go deleted file mode 100644 index 79ae8e0598..0000000000 --- a/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go +++ /dev/null @@ -1,123 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package rayjob - -import ( - "context" - "errors" - "fmt" - - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" - "sigs.k8s.io/kueue/pkg/controller/constants" - "sigs.k8s.io/kueue/pkg/controller/jobframework" - "sigs.k8s.io/kueue/pkg/util/api" - clientutil "sigs.k8s.io/kueue/pkg/util/client" -) - -type multikueueAdapter struct{} - -var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) - -func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - log := ctrl.LoggerFrom(ctx) - - localJob := rayv1.RayJob{} - err := localClient.Get(ctx, key, &localJob) - if err != nil { - return err - } - - remoteJob := rayv1.RayJob{} - err = remoteClient.Get(ctx, key, &remoteJob) - if client.IgnoreNotFound(err) != nil { - return err - } - - // if the remote exists, just copy the status - if err == nil { - if fromObject(&localJob).IsSuspended() { - // Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec. - log.V(2).Info("Skipping the sync since the local job is still suspended") - return nil - } - return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) { - localJob.Status = remoteJob.Status - return true, nil - }) - } - - remoteJob = rayv1.RayJob{ - ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), - Spec: *localJob.Spec.DeepCopy(), - } - - // add the prebuilt workload - if remoteJob.Labels == nil { - remoteJob.Labels = make(map[string]string, 2) - } - remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName - remoteJob.Labels[kueue.MultiKueueOriginLabel] = origin - - return remoteClient.Create(ctx, &remoteJob) -} - -func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { - job := rayv1.RayJob{} - job.SetName(key.Name) - job.SetNamespace(key.Namespace) - return client.IgnoreNotFound(remoteClient.Delete(ctx, &job)) -} - -func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { - return false -} - -func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { - return true, "", nil -} - -func (b *multikueueAdapter) GVK() schema.GroupVersionKind { - return gvk -} - -var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil) - -func (*multikueueAdapter) GetEmptyList() client.ObjectList { - return &rayv1.RayJobList{} -} - -func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { - job, isJob := o.(*rayv1.RayJob) - if !isJob { - return types.NamespacedName{}, errors.New("not a rayjob") - } - - prebuiltWl, hasPrebuiltWorkload := job.Labels[constants.PrebuiltWorkloadLabel] - if !hasPrebuiltWorkload { - return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for rayjob: %s", klog.KObj(job)) - } - - return types.NamespacedName{Name: prebuiltWl, Namespace: job.Namespace}, nil -} diff --git a/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter_test.go b/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter_test.go deleted file mode 100644 index f173edda23..0000000000 --- a/pkg/controller/jobs/rayjob/rayjob_multikueue_adapter_test.go +++ /dev/null @@ -1,158 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package rayjob - -import ( - "context" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/interceptor" - - kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" - "sigs.k8s.io/kueue/pkg/controller/constants" - "sigs.k8s.io/kueue/pkg/util/slices" - utiltesting "sigs.k8s.io/kueue/pkg/util/testing" - utiltestingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" -) - -const ( - TestNamespace = "ns" -) - -func TestMultikueueAdapter(t *testing.T) { - objCheckOpts := []cmp.Option{ - cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), - cmpopts.EquateEmpty(), - } - - rayJobBuilder := utiltestingrayjob.MakeJob("rayjob1", TestNamespace).Suspend(false) - - cases := map[string]struct { - managersRayJobs []rayv1.RayJob - workerRayJobs []rayv1.RayJob - - operation func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error - - wantError error - wantManagersRayJobs []rayv1.RayJob - wantWorkerRayJobs []rayv1.RayJob - }{ - "sync creates missing remote rayjob": { - managersRayJobs: []rayv1.RayJob{ - *rayJobBuilder.DeepCopy(), - }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "rayjob1", Namespace: TestNamespace}, "wl1", "origin1") - }, - - wantManagersRayJobs: []rayv1.RayJob{ - *rayJobBuilder.DeepCopy(), - }, - wantWorkerRayJobs: []rayv1.RayJob{ - *rayJobBuilder.Clone(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). - Label(kueue.MultiKueueOriginLabel, "origin1"). - Obj(), - }, - }, - "sync status from remote rayjob": { - managersRayJobs: []rayv1.RayJob{ - *rayJobBuilder.DeepCopy(), - }, - workerRayJobs: []rayv1.RayJob{ - *rayJobBuilder.Clone(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). - Label(kueue.MultiKueueOriginLabel, "origin1"). - JobDeploymentStatus(rayv1.JobDeploymentStatusComplete). - Obj(), - }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "rayjob1", Namespace: TestNamespace}, "wl1", "origin1") - }, - - wantManagersRayJobs: []rayv1.RayJob{ - *rayJobBuilder.Clone(). - JobDeploymentStatus(rayv1.JobDeploymentStatusComplete). - Obj(), - }, - wantWorkerRayJobs: []rayv1.RayJob{ - *rayJobBuilder.Clone(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). - Label(kueue.MultiKueueOriginLabel, "origin1"). - JobDeploymentStatus(rayv1.JobDeploymentStatusComplete). - Obj(), - }, - }, - "remote rayjob is deleted": { - workerRayJobs: []rayv1.RayJob{ - *rayJobBuilder.Clone(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). - Label(kueue.MultiKueueOriginLabel, "origin1"). - Obj(), - }, - operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { - return adapter.DeleteRemoteObject(ctx, workerClient, types.NamespacedName{Name: "rayjob1", Namespace: TestNamespace}) - }, - }, - } - for name, tc := range cases { - t.Run(name, func(t *testing.T) { - managerBuilder := utiltesting.NewClientBuilder(rayv1.AddToScheme).WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) - managerBuilder = managerBuilder.WithLists(&rayv1.RayJobList{Items: tc.managersRayJobs}) - managerBuilder = managerBuilder.WithStatusSubresource(slices.Map(tc.managersRayJobs, func(w *rayv1.RayJob) client.Object { return w })...) - managerClient := managerBuilder.Build() - - workerBuilder := utiltesting.NewClientBuilder(rayv1.AddToScheme).WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) - workerBuilder = workerBuilder.WithLists(&rayv1.RayJobList{Items: tc.workerRayJobs}) - workerClient := workerBuilder.Build() - - ctx, _ := utiltesting.ContextWithLog(t) - - adapter := &multikueueAdapter{} - - gotErr := tc.operation(ctx, adapter, managerClient, workerClient) - - if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { - t.Errorf("unexpected error (-want/+got):\n%s", diff) - } - - gotManagersRayJobs := &rayv1.RayJobList{} - if err := managerClient.List(ctx, gotManagersRayJobs); err != nil { - t.Errorf("unexpected list manager's rayjobs error %s", err) - } else { - if diff := cmp.Diff(tc.wantManagersRayJobs, gotManagersRayJobs.Items, objCheckOpts...); diff != "" { - t.Errorf("unexpected manager's rayjobs (-want/+got):\n%s", diff) - } - } - - gotWorkerRayJobs := &rayv1.RayJobList{} - if err := workerClient.List(ctx, gotWorkerRayJobs); err != nil { - t.Errorf("unexpected list worker's rayjobs error %s", err) - } else { - if diff := cmp.Diff(tc.wantWorkerRayJobs, gotWorkerRayJobs.Items, objCheckOpts...); diff != "" { - t.Errorf("unexpected worker's rayjobs (-want/+got):\n%s", diff) - } - } - }) - } -} diff --git a/pkg/util/testingjobs/rayjob/wrappers.go b/pkg/util/testingjobs/rayjob/wrappers.go index 47f5adad03..4718f7cfae 100644 --- a/pkg/util/testingjobs/rayjob/wrappers.go +++ b/pkg/util/testingjobs/rayjob/wrappers.go @@ -40,59 +40,40 @@ func MakeJob(name, ns string) *JobWrapper { Spec: rayv1.RayJobSpec{ ShutdownAfterJobFinishes: true, RayClusterSpec: &rayv1.RayClusterSpec{ - HeadGroupSpec: rayv1.HeadGroupSpec{}, - WorkerGroupSpecs: make([]rayv1.WorkerGroupSpec, 0, 1), - }, - Suspend: true, - }, - }} -} - -func (j *JobWrapper) RayJobSpecsDefault() *JobWrapper { - j.Spec.RayClusterSpec.HeadGroupSpec = rayv1.HeadGroupSpec{ - RayStartParams: map[string]string{}, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - RestartPolicy: "Never", - Containers: []corev1.Container{ - { - Name: "head-container", - Command: []string{}, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{}, - Limits: corev1.ResourceList{}, + HeadGroupSpec: rayv1.HeadGroupSpec{ + RayStartParams: map[string]string{"p1": "v1"}, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "head-container", + }, + }, }, }, }, - NodeSelector: map[string]string{}, - }, - }, - } - - j.Spec.RayClusterSpec.WorkerGroupSpecs = append(j.Spec.RayClusterSpec.WorkerGroupSpecs, rayv1.WorkerGroupSpec{ - GroupName: "workers-group-0", - Replicas: ptr.To[int32](1), - MinReplicas: ptr.To[int32](0), - MaxReplicas: ptr.To[int32](10), - RayStartParams: map[string]string{}, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - RestartPolicy: "Never", - Containers: []corev1.Container{ + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ { - Name: "worker-container", - Command: []string{}, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{}, - Limits: corev1.ResourceList{}, + GroupName: "workers-group-0", + Replicas: ptr.To[int32](1), + MinReplicas: ptr.To[int32](0), + MaxReplicas: ptr.To[int32](10), + RayStartParams: map[string]string{"p1": "v1"}, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "worker-container", + }, + }, + }, }, }, }, - NodeSelector: map[string]string{}, }, + Suspend: true, }, - }) - return j + }} } // Obj returns the inner Job. @@ -199,78 +180,3 @@ func (j *JobWrapper) Generation(num int64) *JobWrapper { func (j *JobWrapper) Clone() *JobWrapper { return &JobWrapper{*j.DeepCopy()} } - -// Label sets the label key and value -func (j *JobWrapper) Label(key, value string) *JobWrapper { - if j.Labels == nil { - j.Labels = make(map[string]string) - } - j.Labels[key] = value - return j -} - -// JobDeploymentStatus sets a deployment status of the job -func (j *JobWrapper) JobDeploymentStatus(ds rayv1.JobDeploymentStatus) *JobWrapper { - j.Status.JobDeploymentStatus = ds - return j -} - -// JobStatus sets a status of the job -func (j *JobWrapper) JobStatus(s rayv1.JobStatus) *JobWrapper { - j.Status.JobStatus = s - return j -} - -// Request adds a resource request to the default container. -func (j *JobWrapper) Request(rayType rayv1.RayNodeType, r corev1.ResourceName, v string) *JobWrapper { - if rayType == rayv1.HeadNode { - j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v) - } else if rayType == rayv1.WorkerNode { - j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v) - } - return j -} - -func (j *JobWrapper) Image(rayType rayv1.RayNodeType, image string, args []string) *JobWrapper { - if rayType == rayv1.HeadNode { - j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Image = image - j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Args = args - j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent - } else if rayType == rayv1.WorkerNode { - j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Image = image - j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Args = args - j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent - } - return j -} - -func (j *JobWrapper) Entrypoint(e string) *JobWrapper { - j.Spec.Entrypoint = e - return j -} - -func (j *JobWrapper) RayVersion(rv string) *JobWrapper { - j.Spec.RayClusterSpec.RayVersion = rv - return j -} - -func (j *JobWrapper) RayStartParams(sp map[string]string) *JobWrapper { - j.Spec.RayClusterSpec.HeadGroupSpec.RayStartParams = sp - j.Spec.RayClusterSpec.WorkerGroupSpecs[0].RayStartParams = sp - return j -} - -func (j *JobWrapper) Env(rayType rayv1.RayNodeType, name, value string) *JobWrapper { - if rayType == rayv1.HeadNode { - if j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env == nil { - j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env = make([]corev1.EnvVar, 0) - } - j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env = append(j.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: name, Value: value}) - } else if rayType == rayv1.WorkerNode { - if j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env == nil { - j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env = make([]corev1.EnvVar, 0) - } - j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env = append(j.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env, corev1.EnvVar{Name: name, Value: value}) - } - return j -} diff --git a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh index 0790025642..4ad71e36df 100644 --- a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh +++ b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh @@ -165,38 +165,6 @@ rules: - mpijobs/status verbs: - get -- apiGroups: - - ray.io - resources: - - rayjobs - verbs: - - create - - delete - - get - - list - - watch -- apiGroups: - - ray.io - resources: - - rayjobs/status - verbs: - - get -- apiGroups: - - ray.io - resources: - - rayclusters - verbs: - - create - - delete - - get - - list - - watch -- apiGroups: - - ray.io - resources: - - rayclusters/status - verbs: - - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index 38e9d144d7..817ed7e65a 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -19,14 +19,12 @@ package mke2e import ( "fmt" "os/exec" - "runtime" "github.com/google/go-cmp/cmp/cmpopts" kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -42,13 +40,11 @@ import ( workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" workloadpytorchjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob" workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob" - workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob" testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" - testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" ) @@ -495,55 +491,6 @@ var _ = ginkgo.Describe("MultiKueue", func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) }) - - ginkgo.It("Should run a RayJob on worker if admitted", func() { - E2eKuberayTestImage := "rayproject/ray:2.9.0" - if runtime.GOOS == "darwin" { - E2eKuberayTestImage = "rayproject/ray:2.9.0-aarch64" - } - - // Since it requires 1.5 CPU, this job can only be admitted in worker 1. - rayjob := testingrayjob.MakeJob("rayjob1", managerNs.Name). - Queue(managerLq.Name). - RayJobSpecsDefault(). - WithSubmissionMode(rayv1.K8sJobMode). - Request(rayv1.HeadNode, corev1.ResourceCPU, "1"). - Request(rayv1.WorkerNode, corev1.ResourceCPU, "0.5"). - Entrypoint("python -c \"import ray; ray.init(); print(ray.cluster_resources())\""). - RayVersion("2.9.0"). - Image(rayv1.HeadNode, E2eKuberayTestImage, []string{}). - Image(rayv1.WorkerNode, E2eKuberayTestImage, []string{}). - Obj() - - ginkgo.By("Creating the RayJob", func() { - gomega.Expect(k8sManagerClient.Create(ctx, rayjob)).Should(gomega.Succeed()) - }) - - wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(rayjob.Name, rayjob.UID), Namespace: managerNs.Name} - // the execution should be given to the worker1 - waitForJobAdmitted(wlLookupKey, multiKueueAc.Name, "worker1") - - ginkgo.By("Waiting for the RayJob to finish", func() { - gomega.Eventually(func(g gomega.Gomega) { - createdRayJob := &rayv1.RayJob{} - g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(rayjob), createdRayJob)).To(gomega.Succeed()) - g.Expect(createdRayJob.Status.JobDeploymentStatus).To(gomega.Equal(rayv1.JobDeploymentStatusComplete)) - finishReasonMessage := "Job finished successfully." - checkFinishStatusCondition(g, wlLookupKey, finishReasonMessage) - }, 5*util.LongTimeout, util.Interval).Should(gomega.Succeed()) - }) - - ginkgo.By("Checking no objects are left in the worker clusters and the RayJob is completed", func() { - gomega.Eventually(func(g gomega.Gomega) { - workerWl := &kueue.Workload{} - g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) - g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) - workerRayJob := &rayv1.RayJob{} - g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(rayjob), workerRayJob)).To(utiltesting.BeNotFoundError()) - g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(rayjob), workerRayJob)).To(utiltesting.BeNotFoundError()) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - }) - }) }) ginkgo.When("The connection to a worker cluster is unreliable", func() { ginkgo.It("Should update the cluster status to reflect the connection state", func() { @@ -624,22 +571,23 @@ var _ = ginkgo.Describe("MultiKueue", func() { }) func waitForJobAdmitted(wlLookupKey types.NamespacedName, acName, workerName string) { - ginkgo.By(fmt.Sprintf("Waiting to be admitted in %s and manager", workerName)) - gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { - createdWorkload := &kueue.Workload{} - g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ - Type: kueue.WorkloadAdmitted, - Status: metav1.ConditionTrue, - Reason: "Admitted", - Message: "The workload is admitted", - }, util.IgnoreConditionTimestampsAndObservedGeneration)) - g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, acName)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ - Name: acName, - State: kueue.CheckStateReady, - Message: fmt.Sprintf(`The workload got reservation on "%s"`, workerName), - }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) - }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + ginkgo.By(fmt.Sprintf("Waiting to be admitted in %s and manager", workerName), func() { + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + g.Expect(workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, acName)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: acName, + State: kueue.CheckStateReady, + Message: fmt.Sprintf(`The workload got reservation on "%s"`, workerName), + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) } func checkFinishStatusCondition(g gomega.Gomega, wlLookupKey types.NamespacedName, finishReasonMessage string) { diff --git a/test/e2e/multikueue/suite_test.go b/test/e2e/multikueue/suite_test.go index ec6ae2f776..a6a2d4ac9b 100644 --- a/test/e2e/multikueue/suite_test.go +++ b/test/e2e/multikueue/suite_test.go @@ -27,7 +27,6 @@ import ( kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" authenticationv1 "k8s.io/api/authentication/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -99,8 +98,6 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig policyRule(kftraining.SchemeGroupVersion.Group, "xgboostjobs/status", "get"), policyRule(kfmpi.SchemeGroupVersion.Group, "mpijobs", resourceVerbs...), policyRule(kfmpi.SchemeGroupVersion.Group, "mpijobs/status", "get"), - policyRule(rayv1.SchemeGroupVersion.Group, "rayjobs", resourceVerbs...), - policyRule(rayv1.SchemeGroupVersion.Group, "rayjobs/status", "get"), }, } err := c.Create(ctx, cr) @@ -276,10 +273,7 @@ var _ = ginkgo.BeforeSuite(func() { util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker1Client) util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker2Client) - util.WaitForKubeRayOperatorAvailability(ctx, k8sWorker1Client) - util.WaitForKubeRayOperatorAvailability(ctx, k8sWorker2Client) - - ginkgo.GinkgoLogr.Info("Kueue and all integration operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart)) + ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart)) discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerCfg) gomega.Expect(err).NotTo(gomega.HaveOccurred()) diff --git a/test/integration/controller/jobs/raycluster/raycluster_controller_test.go b/test/integration/controller/jobs/raycluster/raycluster_controller_test.go index bc5be30c66..d40cd17652 100644 --- a/test/integration/controller/jobs/raycluster/raycluster_controller_test.go +++ b/test/integration/controller/jobs/raycluster/raycluster_controller_test.go @@ -283,7 +283,6 @@ var _ = ginkgo.Describe("Job controller RayCluster for workloads when only jobs ginkgo.It("Should suspend a cluster if the parent's workload does not exist or is not admitted", func() { ginkgo.By("Creating the parent job which has a queue name") parentJob := testingrayjob.MakeJob("parent-job", ns.Name). - RayJobSpecsDefault(). Queue("test"). Suspend(false). Obj() diff --git a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go index 5230843ec4..ac7c2405f5 100644 --- a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go +++ b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go @@ -99,7 +99,6 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }() job := testingrayjob.MakeJob(jobName, ns.Name). - RayJobSpecsDefault(). Suspend(false). WithPriorityClassName(priorityClassName). Obj() @@ -262,7 +261,6 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu ginkgo.It("A RayJob created in an unmanaged namespace is not suspended and a workload is not created", func() { ginkgo.By("Creating an unsuspended job without a queue-name in unmanaged-ns") job := testingrayjob.MakeJob(jobName, "unmanaged-ns"). - RayJobSpecsDefault(). Suspend(false). Obj() err := k8sClient.Create(ctx, job) @@ -305,7 +303,7 @@ var _ = ginkgo.Describe("Job controller for workloads when only jobs with queue ginkgo.It("Should reconcile jobs only when queue is set", func() { ginkgo.By("checking the workload is not created when queue name is not set") - job := testingrayjob.MakeJob(jobName, ns.Name).RayJobSpecsDefault().Obj() + job := testingrayjob.MakeJob(jobName, ns.Name).Obj() gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) lookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name} createdJob := &rayv1.RayJob{} @@ -373,10 +371,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O ginkgo.DescribeTable("Single job at different stages of progress towards completion", func(podsReadyTestSpec podsReadyTestSpec) { ginkgo.By("Create a job") - job := testingrayjob.MakeJob(jobName, ns.Name). - RayJobSpecsDefault(). - WithSubmissionMode(rayv1.K8sJobMode). - Obj() + job := testingrayjob.MakeJob(jobName, ns.Name).WithSubmissionMode(rayv1.K8sJobMode).Obj() jobQueueName := "test-queue" job.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) @@ -581,9 +576,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) ginkgo.By("checking a dev job starts") - job := testingrayjob.MakeJob("dev-job", ns.Name). - RayJobSpecsDefault(). - Queue(localQueue.Name). + job := testingrayjob.MakeJob("dev-job", ns.Name).Queue(localQueue.Name). RequestHead(corev1.ResourceCPU, "3"). RequestWorkerGroup(corev1.ResourceCPU, "4"). Obj() @@ -657,9 +650,7 @@ var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered ginkgo.It("Should preempt lower priority rayJobs when resource insufficient", func() { ginkgo.By("Create a low priority rayJob") - lowPriorityJob := testingrayjob.MakeJob("rayjob-with-low-priority", ns.Name). - RayJobSpecsDefault(). - Queue(localQueue.Name). + lowPriorityJob := testingrayjob.MakeJob("rayjob-with-low-priority", ns.Name).Queue(localQueue.Name). RequestHead(corev1.ResourceCPU, "1"). RequestWorkerGroup(corev1.ResourceCPU, "2"). Obj() @@ -674,9 +665,7 @@ var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered }, util.Timeout, util.Interval).Should(gomega.Succeed()) ginkgo.By("Create a high priority rayJob which will preempt the lower one") - highPriorityJob := testingrayjob.MakeJob("rayjob-with-high-priority", ns.Name). - RayJobSpecsDefault(). - Queue(localQueue.Name). + highPriorityJob := testingrayjob.MakeJob("rayjob-with-high-priority", ns.Name).Queue(localQueue.Name). RequestHead(corev1.ResourceCPU, "2"). WithPriorityClassName(priorityClassName). RequestWorkerGroup(corev1.ResourceCPU, "2"). diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go index 5db1d397b1..7ad49b5a6e 100644 --- a/test/integration/framework/framework.go +++ b/test/integration/framework/framework.go @@ -96,7 +96,6 @@ func (f *Framework) Init() *rest.Config { var err error cfg, err = f.testEnv.Start() - fmt.Println("KACZKA", err) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) gomega.ExpectWithOffset(1, cfg).NotTo(gomega.BeNil()) }) diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index b352b8e277..d43573d24a 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -28,7 +28,6 @@ import ( kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -50,7 +49,6 @@ import ( workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob" workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob" - workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" "sigs.k8s.io/kueue/pkg/features" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" @@ -58,7 +56,6 @@ import ( testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob" testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" - testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" testingxgboostjob "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob" "sigs.k8s.io/kueue/pkg/workload" @@ -1445,57 +1442,6 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) }) - - ginkgo.It("Should run a RayJob on worker if admitted", func() { - admission := utiltesting.MakeAdmission(managerCq.Name).PodSets( - kueue.PodSetAssignment{ - Name: "head", - }, kueue.PodSetAssignment{ - Name: "workers-group-0", - }, - ) - rayjob := testingrayjob.MakeJob("rayjob1", managerNs.Name). - RayJobSpecsDefault(). - WithSubmissionMode(rayv1.UserMode). - Queue(managerLq.Name). - Obj() - gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, rayjob)).Should(gomega.Succeed()) - wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(rayjob.Name, rayjob.UID), Namespace: managerNs.Name} - gomega.Eventually(func(g gomega.Gomega) { - createdWorkload := &kueue.Workload{} - g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed()) - }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) - - admitWorkloadAndCheckWorkerCopies(multikueueAC.Name, wlLookupKey, admission) - - ginkgo.By("changing the status of the RayJob in the worker, updates the manager's RayJob status", func() { - gomega.Eventually(func(g gomega.Gomega) { - createdRayJob := rayv1.RayJob{} - g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed()) - createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRunning - g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed()) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - gomega.Eventually(func(g gomega.Gomega) { - createdRayJob := rayv1.RayJob{} - g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed()) - g.Expect(createdRayJob.Status.JobDeploymentStatus).To(gomega.Equal(rayv1.JobDeploymentStatusRunning)) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - }) - - ginkgo.By("finishing the worker RayJob, the manager's wl is marked as finished and the worker2 wl removed", func() { - finishJobReason := "" - gomega.Eventually(func(g gomega.Gomega) { - createdRayJob := rayv1.RayJob{} - g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(rayjob), &createdRayJob)).To(gomega.Succeed()) - createdRayJob.Status.JobStatus = rayv1.JobStatusSucceeded - createdRayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusComplete - g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdRayJob)).To(gomega.Succeed()) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - - waitForWorkloadToFinishAndRemoteWorkloadToBeDeleted(wlLookupKey, finishJobReason) - }) - }) }) var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index fb374a74af..c719756e80 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -49,7 +49,6 @@ import ( workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob" workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob" - workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/util/kubeversion" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" @@ -208,18 +207,6 @@ func managerSetup(ctx context.Context, mgr manager.Manager) { err = workloadmpijob.SetupMPIJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues)) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - - err = workloadrayjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - - rayJobReconciler := workloadrayjob.NewReconciler( - mgr.GetClient(), - mgr.GetEventRecorderFor(constants.JobControllerName)) - err = rayJobReconciler.SetupWithManager(mgr) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - - err = workloadrayjob.SetupRayJobWebhook(mgr, jobframework.WithCache(cCache), jobframework.WithQueues(queues)) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) } func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) { diff --git a/test/integration/webhook/jobs/raycluster_webhook_test.go b/test/integration/webhook/jobs/raycluster_webhook_test.go index 8501ec4c27..20aa4ee35a 100644 --- a/test/integration/webhook/jobs/raycluster_webhook_test.go +++ b/test/integration/webhook/jobs/raycluster_webhook_test.go @@ -121,7 +121,6 @@ var _ = ginkgo.Describe("RayCluster Webhook", func() { ginkgo.It("Should not suspend a cluster if the parent's workload exist and is admitted", func() { ginkgo.By("Creating the parent job which has a queue name") parentJob := testingrayjob.MakeJob("parent-job", ns.Name). - RayJobSpecsDefault(). Queue("test"). Suspend(false). Obj() diff --git a/test/integration/webhook/jobs/rayjob_webhook_test.go b/test/integration/webhook/jobs/rayjob_webhook_test.go index 9fec67d1f6..a087608d5f 100644 --- a/test/integration/webhook/jobs/rayjob_webhook_test.go +++ b/test/integration/webhook/jobs/rayjob_webhook_test.go @@ -52,7 +52,7 @@ var _ = ginkgo.Describe("RayJob Webhook", func() { }) ginkgo.It("the creation doesn't succeed if the queue name is invalid", func() { - job := testingjob.MakeJob("rayjob", ns.Name).Queue("indexed_job").RayJobSpecsDefault().Obj() + job := testingjob.MakeJob("rayjob", ns.Name).Queue("indexed_job").Obj() err := k8sClient.Create(ctx, job) gomega.Expect(err).Should(gomega.HaveOccurred()) gomega.Expect(err).Should(testing.BeForbiddenError()) @@ -60,7 +60,6 @@ var _ = ginkgo.Describe("RayJob Webhook", func() { ginkgo.It("invalid configuration shutdown after job finishes", func() { job := testingjob.MakeJob("rayjob", ns.Name). - RayJobSpecsDefault(). Queue("queue-name"). ShutdownAfterJobFinishes(false). Obj() diff --git a/test/util/e2e.go b/test/util/e2e.go index 2119f268b9..722600f00a 100644 --- a/test/util/e2e.go +++ b/test/util/e2e.go @@ -9,7 +9,6 @@ import ( kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/gomega" - rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -61,9 +60,6 @@ func CreateClientUsingCluster(kContext string) (client.WithWatch, *rest.Config) err = kfmpi.AddToScheme(scheme.Scheme) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) - err = rayv1.AddToScheme(scheme.Scheme) - gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) - client, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme.Scheme}) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) return client, cfg @@ -134,8 +130,3 @@ func WaitForKubeFlowMPIOperatorAvailability(ctx context.Context, k8sClient clien kftoKey := types.NamespacedName{Namespace: "mpi-operator", Name: "mpi-operator"} waitForOperatorAvailability(ctx, k8sClient, kftoKey) } - -func WaitForKubeRayOperatorAvailability(ctx context.Context, k8sClient client.Client) { - kroKey := types.NamespacedName{Namespace: "ray-system", Name: "kuberay-operator"} - waitForOperatorAvailability(ctx, k8sClient, kroKey) -}