Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Kuberay RayJob MultiKueue adapter #3892

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions Makefile-deps.mk
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,22 @@ kf-training-operator-manifests: ## Copy whole manifests folder from the training
RAY_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" github.com/ray-project/kuberay/ray-operator)
.PHONY: ray-operator-crd
ray-operator-crd: ## Copy the CRDs from the ray-operator to the dep-crds directory.
mkdir -p $(EXTERNAL_CRDS_DIR)/ray-operator/
cp -f $(RAY_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/ray-operator/
mkdir -p $(EXTERNAL_CRDS_DIR)/ray-operator-crds/
cp -f $(RAY_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/ray-operator-crds/

.PHONY: ray-operator-manifests
ray-operator-manifests: ## Copy the whole manifests content from the ray-operator to the dep-crds directory.
## Full version of the manifest is required for e2e multikueue tests.
if [ -d "$(EXTERNAL_CRDS_DIR)/ray-operator" ]; then \
chmod -R u+w "$(EXTERNAL_CRDS_DIR)/ray-operator" && \
rm -rf "$(EXTERNAL_CRDS_DIR)/ray-operator"; \
fi
mkdir -p "$(EXTERNAL_CRDS_DIR)/ray-operator"; \
cp -rf "$(RAY_ROOT)/config/crd" "$(EXTERNAL_CRDS_DIR)/ray-operator"
cp -rf "$(RAY_ROOT)/config/default" "$(EXTERNAL_CRDS_DIR)/ray-operator"
cp -rf "$(RAY_ROOT)/config/rbac" "$(EXTERNAL_CRDS_DIR)/ray-operator"
cp -rf "$(RAY_ROOT)/config/manager" "$(EXTERNAL_CRDS_DIR)/ray-operator"


JOBSET_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" sigs.k8s.io/jobset)
.PHONY: jobset-operator-crd
Expand All @@ -144,7 +158,7 @@ cluster-autoscaler-crd: ## Copy the CRDs from the cluster-autoscaler to the dep-
cp -f $(CLUSTER_AUTOSCALER_ROOT)/config/crd/* $(EXTERNAL_CRDS_DIR)/cluster-autoscaler/

.PHONY: dep-crds
dep-crds: mpi-operator-crd kf-training-operator-crd ray-operator-crd jobset-operator-crd cluster-autoscaler-crd kf-training-operator-manifests ## Copy the CRDs from the external operators to the dep-crds directory.
dep-crds: mpi-operator-crd kf-training-operator-crd ray-operator-crd jobset-operator-crd cluster-autoscaler-crd kf-training-operator-manifests ray-operator-manifests## Copy the CRDs from the external operators to the dep-crds directory.
@echo "Copying CRDs from external operators to dep-crds directory"

.PHONY: kueuectl-docs
Expand Down
10 changes: 6 additions & 4 deletions Makefile-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ ENVTEST_K8S_VERSION ?= 1.31
# Number of processes to use during integration tests to run specs within a
# suite in parallel. Suites still run sequentially. User may set this value to 1
# to run without parallelism.
INTEGRATION_NPROCS ?= 4
INTEGRATION_NPROCS ?= 3
# Folder where the integration tests are located.
INTEGRATION_TARGET ?= ./test/integration/...
# Verbosity level for apiserver logging.
# The logging is disabled if 0.
INTEGRATION_API_LOG_LEVEL ?= 0
INTEGRATION_API_LOG_LEVEL ?= 1
# Integration filters
INTEGRATION_RUN_ALL?=true
ifneq ($(INTEGRATION_RUN_ALL),true)
Expand Down Expand Up @@ -70,6 +70,7 @@ IMAGE_TAG ?= $(IMAGE_REPO):$(GIT_TAG)
JOBSET_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" sigs.k8s.io/jobset)
KUBEFLOW_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/training-operator)
KUBEFLOW_MPI_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/mpi-operator)
KUBERAY_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/ray-project/kuberay/ray-operator)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far it's the version without managedBy.
We don't have a ray-operator image that supports managedBy, that would have to be a custom one...
latest -> 1.2.2, which is the last release


##@ Tests

Expand All @@ -83,7 +84,7 @@ test-integration: gomod-download envtest ginkgo dep-crds kueuectl ginkgo-top ##
KUEUE_BIN=$(PROJECT_DIR)/bin \
ENVTEST_K8S_VERSION=$(ENVTEST_K8S_VERSION) \
API_LOG_LEVEL=$(INTEGRATION_API_LOG_LEVEL) \
$(GINKGO) $(INTEGRATION_FILTERS) $(GINKGO_ARGS) -procs=$(INTEGRATION_NPROCS) --race --junit-report=junit.xml --json-report=integration.json --output-dir=$(ARTIFACTS) -v $(INTEGRATION_TARGET)
$(GINKGO) $(INTEGRATION_FILTERS) $(GINKGO_ARGS) -procs=$(INTEGRATION_NPROCS) --output-interceptor-mode=none --race --junit-report=junit.xml --json-report=integration.json --output-dir=$(ARTIFACTS) -v $(INTEGRATION_TARGET)
$(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/integration.json > $(ARTIFACTS)/integration-top.yaml

CREATE_KIND_CLUSTER ?= true
Expand Down Expand Up @@ -118,7 +119,8 @@ run-test-multikueue-e2e-%: FORCE
@echo Running multikueue e2e for k8s ${K8S_VERSION}
E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) \
ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" \
JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) \
JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) \
KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) KUBERAY_VERSION=$(KUBERAY_VERSION) \
./hack/multikueue-e2e-test.sh
$(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/$@/e2e.json > $(ARTIFACTS)/$@/e2e-top.yaml

Expand Down
7 changes: 7 additions & 0 deletions charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,16 @@ rules:
- rayclusters/finalizers
- rayclusters/status
- rayjobs/finalizers
verbs:
- get
- update
- apiGroups:
- ray.io
resources:
- rayjobs/status
verbs:
- get
- patch
- update
- apiGroups:
- scheduling.k8s.io
Expand Down
7 changes: 7 additions & 0 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,16 @@ rules:
- rayclusters/finalizers
- rayclusters/status
- rayjobs/finalizers
verbs:
- get
- update
- apiGroups:
- ray.io
resources:
- rayjobs/status
verbs:
- get
- patch
- update
- apiGroups:
- scheduling.k8s.io
Expand Down
35 changes: 35 additions & 0 deletions hack/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then
export KUBEFLOW_MPI_IMAGE=mpioperator/mpi-operator:${KUBEFLOW_MPI_VERSION/#v}
fi

if [[ -n ${KUBERAY_VERSION:-} ]]; then
export KUBERAY_MANIFEST="${ROOT_DIR}/dep-crds/ray-operator/default/"
export KUBERAY_IMAGE=bitnami/kuberay-operator:${KUBERAY_VERSION/#v}
export KUBERAY_RAY_IMAGE=rayproject/ray:2.9.0
export KUBERAY_RAY_IMAGE_ARM=rayproject/ray:2.9.0-aarch64
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is for us people working on macOS, it's vital for development, not so much for prod - I think it should stay

export KUBERAY_CRDS=${ROOT_DIR}/dep-crds/ray-operator/crd/bases
fi

# sleep image to use for testing.
export E2E_TEST_SLEEP_IMAGE_OLD=gcr.io/k8s-staging-perf-tests/sleep:v0.0.3@sha256:00ae8e01dd4439edfb7eb9f1960ac28eba16e952956320cce7f2ac08e3446e6b
E2E_TEST_SLEEP_IMAGE_OLD_WITHOUT_SHA=${E2E_TEST_SLEEP_IMAGE_OLD%%@*}
Expand Down Expand Up @@ -89,6 +97,17 @@ function prepare_docker_images {
if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then
docker pull "${KUBEFLOW_MPI_IMAGE}"
fi
if [[ -n ${KUBERAY_VERSION:-} ]]; then
docker pull "${KUBERAY_IMAGE}"

# Extra e2e images required for Kuberay
unamestr=$(uname)
if [[ "$unamestr" == 'Linux' ]]; then
docker pull "${KUBERAY_RAY_IMAGE}"
elif [[ "$unamestr" == 'Darwin' ]]; then
docker pull "${KUBERAY_RAY_IMAGE_ARM}"
fi
fi
}

# $1 cluster
Expand Down Expand Up @@ -136,6 +155,22 @@ function install_mpi {
kubectl apply --server-side -f "${KUBEFLOW_MPI_MANIFEST}"
}

#$1 - cluster name
function install_kuberay {
# Extra e2e images required for Kuberay
unamestr=$(uname)
if [[ "$unamestr" == 'Linux' ]]; then
cluster_kind_load_image "${1}" "${KUBERAY_RAY_IMAGE}"
elif [[ "$unamestr" == 'Darwin' ]]; then
cluster_kind_load_image "${1}" "${KUBERAY_RAY_IMAGE_ARM}"
fi

cluster_kind_load_image "${1}" "${KUBERAY_IMAGE}"
kubectl config use-context "kind-${1}"
# create used instead of apply - https://github.com/ray-project/kuberay/issues/504
kubectl create -k "${KUBERAY_MANIFEST}"
}

INITIAL_IMAGE=$($YQ '.images[] | select(.name == "controller") | [.newName, .newTag] | join(":")' config/components/manager/kustomization.yaml)
export INITIAL_IMAGE

Expand Down
9 changes: 8 additions & 1 deletion hack/multikueue-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,17 @@ function kind_load {
install_kubeflow "$WORKER1_KIND_CLUSTER_NAME"
install_kubeflow "$WORKER2_KIND_CLUSTER_NAME"

## MPI
## MPI
install_mpi "$MANAGER_KIND_CLUSTER_NAME"
install_mpi "$WORKER1_KIND_CLUSTER_NAME"
install_mpi "$WORKER2_KIND_CLUSTER_NAME"

## KUBERAY
kubectl config use-context "kind-${MANAGER_KIND_CLUSTER_NAME}"
kubectl apply --server-side -f "${KUBERAY_CRDS}"

install_kuberay "$WORKER1_KIND_CLUSTER_NAME"
install_kuberay "$WORKER2_KIND_CLUSTER_NAME"
}

function kueue_deploy {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/jobframework/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
batchv1 "k8s.io/api/batch/v1"
apivalidation "k8s.io/apimachinery/pkg/api/validation"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -47,7 +48,8 @@ var (
kftraining.SchemeGroupVersion.WithKind(kftraining.PaddleJobKind).String(),
kftraining.SchemeGroupVersion.WithKind(kftraining.PyTorchJobKind).String(),
kftraining.SchemeGroupVersion.WithKind(kftraining.XGBoostJobKind).String(),
kfmpi.SchemeGroupVersion.WithKind(kfmpi.Kind).String())
kfmpi.SchemeGroupVersion.WithKind(kfmpi.Kind).String(),
rayv1.SchemeGroupVersion.WithKind("RayJob").String())
)

// ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/jobs/rayjob/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -55,12 +56,13 @@ func init() {
JobType: &rayv1.RayJob{},
AddToScheme: rayv1.AddToScheme,
IsManagingObjectsOwner: isRayJob,
MultiKueueAdapter: &multikueueAdapter{},
}))
}

// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update
// +kubebuilder:rbac:groups=ray.io,resources=rayjobs,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/status,verbs=get;update
// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=ray.io,resources=rayjobs/finalizers,verbs=get;update
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch
Expand All @@ -82,12 +84,16 @@ func (j *RayJob) Object() client.Object {
return (*rayv1.RayJob)(j)
}

func fromObject(obj runtime.Object) *RayJob {
return (*RayJob)(obj.(*rayv1.RayJob))
}

func (j *RayJob) IsSuspended() bool {
return j.Spec.Suspend
}

func (j *RayJob) IsActive() bool {
return j.Status.JobDeploymentStatus != rayv1.JobDeploymentStatusSuspended
return (j.Status.JobDeploymentStatus != rayv1.JobDeploymentStatusSuspended) && (j.Status.JobDeploymentStatus != rayv1.JobDeploymentStatusNew)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change needed? Maybe this is a fix for regular RayJobs too? In that case we should do it in a separate PR so that it can be cherry-picked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe it's needed fix.
Otherwise RayJob is stuck in handleJobWithNoWorkload() with Job is suspended but still has active pods, waiting and workload is never created.
I can create separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, maybe it is a race condition? Please open a separate PR, we may consider cherry-picking the fix then.

}

func (j *RayJob) Suspend() {
Expand Down
123 changes: 123 additions & 0 deletions pkg/controller/jobs/rayjob/rayjob_multikueue_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rayjob

import (
"context"
"errors"
"fmt"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/util/api"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

type multikueueAdapter struct{}

var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil)

func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
log := ctrl.LoggerFrom(ctx)

localJob := rayv1.RayJob{}
err := localClient.Get(ctx, key, &localJob)
if err != nil {
return err
}

remoteJob := rayv1.RayJob{}
err = remoteClient.Get(ctx, key, &remoteJob)
if client.IgnoreNotFound(err) != nil {
return err
}

// if the remote exists, just copy the status
if err == nil {
mszadkow marked this conversation as resolved.
Show resolved Hide resolved
if fromObject(&localJob).IsSuspended() {
// Ensure the job is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local job is still suspended")
return nil
}
return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) {
localJob.Status = remoteJob.Status
return true, nil
})
}

remoteJob = rayv1.RayJob{
ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta),
Spec: *localJob.Spec.DeepCopy(),
}

// add the prebuilt workload
if remoteJob.Labels == nil {
remoteJob.Labels = make(map[string]string, 2)
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueue.MultiKueueOriginLabel] = origin

return remoteClient.Create(ctx, &remoteJob)
}

func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
job := rayv1.RayJob{}
job.SetName(key.Name)
job.SetNamespace(key.Namespace)
return client.IgnoreNotFound(remoteClient.Delete(ctx, &job))
}

func (b *multikueueAdapter) KeepAdmissionCheckPending() bool {
return false
}

func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) {
return true, "", nil
}

func (b *multikueueAdapter) GVK() schema.GroupVersionKind {
return gvk
}

var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil)

func (*multikueueAdapter) GetEmptyList() client.ObjectList {
return &rayv1.RayJobList{}
}

func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) {
job, isJob := o.(*rayv1.RayJob)
if !isJob {
return types.NamespacedName{}, errors.New("not a rayjob")
}

prebuiltWl, hasPrebuiltWorkload := job.Labels[constants.PrebuiltWorkloadLabel]
if !hasPrebuiltWorkload {
return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for rayjob: %s", klog.KObj(job))
}

return types.NamespacedName{Name: prebuiltWl, Namespace: job.Namespace}, nil
}
Loading