Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow mutating queue name in StatefulSet Webhook. #3520

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ rules:
- apps
resources:
- replicasets
- statefulsets
verbs:
- get
- list
Expand Down
1 change: 0 additions & 1 deletion config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ rules:
- apps
resources:
- replicasets
- statefulsets
verbs:
- get
- list
Expand Down
26 changes: 1 addition & 25 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package pod
import (
"cmp"
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"slices"
Expand Down Expand Up @@ -563,29 +561,7 @@ func getRoleHash(p corev1.Pod) (string, error) {
if roleHash, ok := p.Annotations[RoleHashAnnotation]; ok {
return roleHash, nil
}

shape := map[string]interface{}{
"spec": map[string]interface{}{
"initContainers": containersShape(p.Spec.InitContainers),
"containers": containersShape(p.Spec.Containers),
"nodeSelector": p.Spec.NodeSelector,
"affinity": p.Spec.Affinity,
"tolerations": p.Spec.Tolerations,
"runtimeClassName": p.Spec.RuntimeClassName,
"priority": p.Spec.Priority,
"topologySpreadConstraints": p.Spec.TopologySpreadConstraints,
"overhead": p.Spec.Overhead,
"resourceClaims": p.Spec.ResourceClaims,
},
}

shapeJSON, err := json.Marshal(shape)
if err != nil {
return "", err
}

// Trim hash to 8 characters and return
return fmt.Sprintf("%x", sha256.Sum256(shapeJSON))[:8], nil
return utilpod.GenerateShape(p.Spec)
}

// Load loads all pods in the group
Expand Down
13 changes: 0 additions & 13 deletions pkg/controller/jobs/pod/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,6 @@ func getPodOptions(integrationOpts map[string]any) (*configapi.PodIntegrationOpt

var _ admission.CustomDefaulter = &PodWebhook{}

func containersShape(containers []corev1.Container) (result []map[string]interface{}) {
for _, c := range containers {
result = append(result, map[string]interface{}{
"resources": map[string]interface{}{
"requests": c.Resources.Requests,
},
"ports": c.Ports,
})
}

return result
}

// addRoleHash calculates the role hash and adds it to the pod's annotations
func (p *Pod) addRoleHash() error {
if p.pod.Annotations == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/statefulset/statefulset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
func init() {
utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{
SetupIndexes: SetupIndexes,
NewReconciler: NewReconciler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed? Please don't if not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes, a Pod is updated, but the StatefulSet isn't aware of it, causing the reconcile process to not work as expected (e.g., the finalizer isn't removed). Thats why it is better to reconcile Pod instead of StatefulSet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this case - can you elaborate? For example in the core k8s Job we also have finalizers on Job pods, but the reconciler is at the Job level, reference. I would like to first document such problematic scenarios in some form of tests and change the implementation in a dedicated PR (if needed).

NewReconciler: NewPodReconciler,
SetupWebhook: SetupWebhook,
JobType: &appsv1.StatefulSet{},
AddToScheme: appsv1.AddToScheme,
Expand Down
110 changes: 110 additions & 0 deletions pkg/controller/jobs/statefulset/statefulset_pod_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package statefulset

import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/kueue/pkg/controller/jobframework"
podcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/pod"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

type PodReconciler struct {
client client.Client
}

func NewPodReconciler(client client.Client, _ record.EventRecorder, _ ...jobframework.Option) jobframework.JobReconcilerInterface {
return &PodReconciler{client: client}
}

var _ jobframework.JobReconcilerInterface = (*PodReconciler)(nil)

func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
ctrl.Log.V(3).Info("Setting up Pod reconciler for StatefulSet")
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Named("statefulset-pod").
WithEventFilter(r).
Complete(r)
}

func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
pod := &corev1.Pod{}
err := r.client.Get(ctx, req.NamespacedName, pod)
if err != nil {
// we'll ignore not-found errors, since there is nothing to do.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(pod))
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling StatefulSet Pod")

if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
err = client.IgnoreNotFound(clientutil.Patch(ctx, r.client, pod, true, func() (bool, error) {
removed := controllerutil.RemoveFinalizer(pod, podcontroller.PodFinalizer)
if removed {
log.V(3).Info(
"Finalizing statefulset pod in group",
"pod", klog.KObj(pod),
"group", pod.Labels[podcontroller.GroupNameLabel],
)
}
return removed, nil
}))
}

return ctrl.Result{}, err
}

var _ predicate.Predicate = (*PodReconciler)(nil)

func (r *PodReconciler) Generic(event.GenericEvent) bool {
return false
}

func (r *PodReconciler) Create(e event.CreateEvent) bool {
return r.handle(e.Object)
}

func (r *PodReconciler) Update(e event.UpdateEvent) bool {
return r.handle(e.ObjectNew)
}

func (r *PodReconciler) Delete(event.DeleteEvent) bool {
return false
}

func (r *PodReconciler) handle(obj client.Object) bool {
pod, isPod := obj.(*corev1.Pod)
if !isPod {
return false
}
// Handle only statefulset pods.
return pod.Annotations[podcontroller.SuspendedByParentAnnotation] == FrameworkName
}
95 changes: 95 additions & 0 deletions pkg/controller/jobs/statefulset/statefulset_pod_reconciler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package statefulset

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjobspod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod"
"sigs.k8s.io/kueue/pkg/util/testingjobs/statefulset"
)

var (
baseCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"),
}
)

func TestPodReconciler(t *testing.T) {
cases := map[string]struct {
lws *appsv1.StatefulSet
pod *corev1.Pod
wantPod *corev1.Pod
wantErr error
}{
"should finalize succeeded pod": {
lws: statefulset.MakeStatefulSet("sts", "ns").Obj(),
pod: testingjobspod.MakePod("pod", "ns").
StatusPhase(corev1.PodSucceeded).
KueueFinalizer().
Obj(),
wantPod: testingjobspod.MakePod("pod", "ns").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
"should finalize failed pod": {
lws: statefulset.MakeStatefulSet("sts", "ns").Obj(),
pod: testingjobspod.MakePod("pod", "ns").
StatusPhase(corev1.PodFailed).
KueueFinalizer().
Obj(),
wantPod: testingjobspod.MakePod("pod", "ns").
StatusPhase(corev1.PodFailed).
Obj(),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
clientBuilder := utiltesting.NewClientBuilder()

kClient := clientBuilder.WithObjects(tc.lws, tc.pod).Build()

reconciler := NewPodReconciler(kClient, nil)

podKey := client.ObjectKeyFromObject(tc.pod)
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: podKey})
if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Reconcile returned error (-want,+got):\n%s", diff)
}

gotPod := &corev1.Pod{}
if err := kClient.Get(ctx, podKey, gotPod); err != nil {
t.Fatalf("Could not get Pod after reconcile: %v", err)
}

if diff := cmp.Diff(tc.wantPod, gotPod, baseCmpOpts...); diff != "" {
t.Errorf("Pod after reconcile (-want,+got):\n%s", diff)
}
})
}
}
102 changes: 0 additions & 102 deletions pkg/controller/jobs/statefulset/statefulset_reconciler.go

This file was deleted.

Loading