From bd1b989ceef4f4e5d115c0d8d40c8267b4754676 Mon Sep 17 00:00:00 2001 From: TusharMohapatra07 Date: Mon, 9 Dec 2024 21:05:03 +0530 Subject: [PATCH 1/9] use clock inside pod_controller --- pkg/controller/jobs/pod/pod_controller.go | 7 +++++-- pkg/controller/jobs/pod/pod_controller_test.go | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 67e6231470..fd109ae885 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -40,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/utils/clock" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -90,6 +91,7 @@ var ( errIncorrectReconcileRequest = errors.New("event handler error: got a single pod reconcile request for a pod group") errPendingOps = jobframework.UnretryableError("waiting to observe previous operations on pods") errPodGroupLabelsMismatch = errors.New("constructing workload: pods have different label values") + realClock = clock.RealClock{} ) func init() { @@ -118,7 +120,7 @@ type Reconciler struct { } func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - return r.ReconcileGenericJob(ctx, req, &Pod{excessPodExpectations: r.expectationsStore}) + return r.ReconcileGenericJob(ctx, req, &Pod{excessPodExpectations: r.expectationsStore, clock: realClock}) } func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -155,6 +157,7 @@ type Pod struct { absentPods int excessPodExpectations *expectations.Store satisfiedExcessPods bool + clock clock.Clock } var ( @@ -445,7 +448,7 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo, Type: ConditionTypeTerminationTarget, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Time{ - Time: time.Now(), + Time: p.clock.Now(), }, Reason: string(stopReason), Message: eventMsg, diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index e1dfc8f94d..72e53ebbea 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -5194,6 +5194,8 @@ func TestReconciler(t *testing.T) { func TestReconciler_ErrorFinalizingPod(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) + now := time.Now() + fakeClock := testingclock.NewFakeClock(now) clientBuilder := utiltesting.NewClientBuilder() if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil { @@ -5254,7 +5256,7 @@ func TestReconciler_ErrorFinalizingPod(t *testing.T) { } recorder := record.NewBroadcaster().NewRecorder(kClient.Scheme(), corev1.EventSource{Component: "test"}) - reconciler := NewReconciler(kClient, recorder) + reconciler := NewReconciler(kClient, recorder, jobframework.WithClock(t, fakeClock)) podKey := client.ObjectKeyFromObject(&pod) _, err := reconciler.Reconcile(ctx, reconcile.Request{ @@ -5388,6 +5390,7 @@ func TestGetWorkloadNameForPod(t *testing.T) { func TestReconciler_DeletePodAfterTransientErrorsOnUpdateOrDeleteOps(t *testing.T) { now := time.Now() + fakeClock := testingclock.NewFakeClock(now) connRefusedErrMock := fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED) ctx, _ := utiltesting.ContextWithLog(t) var triggerUpdateErr, triggerDeleteErr bool @@ -5475,7 +5478,7 @@ func TestReconciler_DeletePodAfterTransientErrorsOnUpdateOrDeleteOps(t *testing. } recorder := record.NewBroadcaster().NewRecorder(kClient.Scheme(), corev1.EventSource{Component: "test"}) - reconciler := NewReconciler(kClient, recorder) + reconciler := NewReconciler(kClient, recorder, jobframework.WithClock(t, fakeClock)) reconcileRequest := reconcileRequestForPod(&pods[0]) // Reconcile for the first time. It'll try to remove the finalizers but fail From f28726d1810dcafff8ef4abf11e69812fa8fb0cc Mon Sep 17 00:00:00 2001 From: TusharMohapatra07 Date: Wed, 11 Dec 2024 15:48:58 +0530 Subject: [PATCH 2/9] add Clock clock to Pod --- pkg/controller/jobs/pod/pod_controller.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index fd109ae885..35c4e489a4 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -209,6 +209,13 @@ func (p *Pod) isUnretriableGroup() bool { return false } +func (p *Pod) Clock() clock.Clock { + if p.clock != nil { + return p.clock + } + return realClock +} + // IsSuspended returns whether the job is suspended or not. func (p *Pod) IsSuspended() bool { if !p.isGroup { From 522f45dc67953d8441037c1f00f7d7fdc50ac887 Mon Sep 17 00:00:00 2001 From: TusharMohapatra07 Date: Wed, 11 Dec 2024 19:09:00 +0530 Subject: [PATCH 3/9] change function name --- pkg/controller/jobs/pod/pod_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 35c4e489a4..84951cad49 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -209,7 +209,7 @@ func (p *Pod) isUnretriableGroup() bool { return false } -func (p *Pod) Clock() clock.Clock { +func (p *Pod) getClock() clock.Clock { if p.clock != nil { return p.clock } From c11d423c913818093ab2dc9b782ed6cfa3e262ef Mon Sep 17 00:00:00 2001 From: TusharMohapatra07 Date: Wed, 11 Dec 2024 19:36:41 +0530 Subject: [PATCH 4/9] use p.getClock() function instead of p.clock --- pkg/controller/jobs/pod/pod_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 84951cad49..23099074ef 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -455,7 +455,7 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo, Type: ConditionTypeTerminationTarget, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Time{ - Time: p.clock.Now(), + Time: p.getClock().Now(), }, Reason: string(stopReason), Message: eventMsg, From 0a97c7e7d795b62b4fad6de5d2c318962a07bca2 Mon Sep 17 00:00:00 2001 From: TusharMohapatra07 Date: Wed, 11 Dec 2024 20:00:11 +0530 Subject: [PATCH 5/9] use NewTime instead of Now --- pkg/controller/jobs/pod/pod_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 23099074ef..a924430ce0 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -1335,7 +1335,7 @@ func (p *Pod) waitingForReplacementPodsCondition(wl *kueue.Workload) (*metav1.Co if updated { replCond.ObservedGeneration = wl.Generation - replCond.LastTransitionTime = metav1.Now() + replCond.LastTransitionTime = metav1.NewTime(p.getClock().Now()) } return replCond, updated From 0bedb972723f11959b22430e995bc7d59a1da572 Mon Sep 17 00:00:00 2001 From: TusharMohapatra07 Date: Wed, 11 Dec 2024 20:10:29 +0530 Subject: [PATCH 6/9] remove Now() dependency in lastActiveTime --- pkg/controller/jobs/pod/pod_controller.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index a924430ce0..e3d72a71ec 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -782,20 +782,17 @@ func isPodRunnableOrSucceeded(p *corev1.Pod) bool { // - the time the pod was declared Failed // - the deletion time func lastActiveTime(p *corev1.Pod) time.Time { - lastTransition := metav1.Now() + var lastTransition time.Time for _, c := range p.Status.Conditions { - if c.Type == corev1.ContainersReady { - if c.Status == corev1.ConditionFalse && c.Reason == string(corev1.PodFailed) { - lastTransition = c.LastTransitionTime - } + if c.Type == corev1.ContainersReady && c.Status == corev1.ConditionFalse && c.Reason == string(corev1.PodFailed) { + lastTransition = c.LastTransitionTime.Time break } } - deletionTime := ptr.Deref(p.DeletionTimestamp, metav1.Now()) - if lastTransition.Before(&deletionTime) { - return lastTransition.Time + if p.DeletionTimestamp != nil && p.DeletionTimestamp.Time.After(lastTransition) { + return p.DeletionTimestamp.Time } - return deletionTime.Time + return lastTransition } // sortInactivePods sorts the provided pods slice based on: From 7310d272b4c0b6097a8598b96977ab2f810a8ec3 Mon Sep 17 00:00:00 2001 From: TusharMohapatra07 Date: Thu, 12 Dec 2024 00:32:55 +0530 Subject: [PATCH 7/9] add arguments to sortInactivePods and lastActiveTime --- pkg/controller/jobs/pod/pod_controller.go | 24 ++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index e3d72a71ec..33f9774e68 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -781,17 +781,19 @@ func isPodRunnableOrSucceeded(p *corev1.Pod) bool { // lastActiveTime returns the last timestamp on which the pod was observed active: // - the time the pod was declared Failed // - the deletion time -func lastActiveTime(p *corev1.Pod) time.Time { - var lastTransition time.Time +func lastActiveTime(clock clock.Clock, p *corev1.Pod) time.Time { + if !p.DeletionTimestamp.IsZero() { + return p.DeletionTimestamp.Time + } + lastTransition := clock.Now() for _, c := range p.Status.Conditions { - if c.Type == corev1.ContainersReady && c.Status == corev1.ConditionFalse && c.Reason == string(corev1.PodFailed) { - lastTransition = c.LastTransitionTime.Time + if c.Type == corev1.ContainersReady { + if c.Status == corev1.ConditionFalse && c.Reason == string(corev1.PodFailed) { + lastTransition = c.LastTransitionTime.Time + } break } } - if p.DeletionTimestamp != nil && p.DeletionTimestamp.Time.After(lastTransition) { - return p.DeletionTimestamp.Time - } return lastTransition } @@ -799,7 +801,7 @@ func lastActiveTime(p *corev1.Pod) time.Time { // - finalizer state (pods with finalizers are first) // - lastActiveTime (pods that were active last are first) // - creation timestamp (newer pods are first) -func sortInactivePods(inactivePods []corev1.Pod) { +func sortInactivePods(clock clock.Clock, inactivePods []corev1.Pod) { sort.Slice(inactivePods, func(i, j int) bool { pi := &inactivePods[i] pj := &inactivePods[j] @@ -809,8 +811,8 @@ func sortInactivePods(inactivePods []corev1.Pod) { return iFin } - iLastActive := lastActiveTime(pi) - jLastActive := lastActiveTime(pj) + iLastActive := lastActiveTime(clock, pi) + jLastActive := lastActiveTime(clock, pj) if iLastActive.Equal(jLastActive) { return pi.CreationTimestamp.Before(&pj.CreationTimestamp) @@ -1157,7 +1159,7 @@ func (p *Pod) FindMatchingWorkloads(ctx context.Context, c client.Client, r reco } if finalizeablePodsCount := min(len(roleInactivePods), len(roleInactivePods)+len(roleActivePods)-int(ps.Count)); finalizeablePodsCount > 0 { - sortInactivePods(roleInactivePods) + sortInactivePods(p.getClock(), roleInactivePods) replacedInactivePods = append(replacedInactivePods, roleInactivePods[len(roleInactivePods)-finalizeablePodsCount:]...) keptPods = append(keptPods, roleInactivePods[:len(roleInactivePods)-finalizeablePodsCount]...) } else { From c50ad1eeecbcad58a0a395c4e9eb3aca0a35700e Mon Sep 17 00:00:00 2001 From: TusharMohapatra07 Date: Thu, 12 Dec 2024 00:48:20 +0530 Subject: [PATCH 8/9] remove fakeClock option --- pkg/controller/jobs/pod/pod_controller_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index 72e53ebbea..9c135ab2a4 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -5194,8 +5194,8 @@ func TestReconciler(t *testing.T) { func TestReconciler_ErrorFinalizingPod(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - now := time.Now() - fakeClock := testingclock.NewFakeClock(now) + // now := time.Now() + // fakeClock := testingclock.NewFakeClock(now) clientBuilder := utiltesting.NewClientBuilder() if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil { @@ -5256,7 +5256,7 @@ func TestReconciler_ErrorFinalizingPod(t *testing.T) { } recorder := record.NewBroadcaster().NewRecorder(kClient.Scheme(), corev1.EventSource{Component: "test"}) - reconciler := NewReconciler(kClient, recorder, jobframework.WithClock(t, fakeClock)) + reconciler := NewReconciler(kClient, recorder) podKey := client.ObjectKeyFromObject(&pod) _, err := reconciler.Reconcile(ctx, reconcile.Request{ From 4aee186c9e602c7038d6d0332cedd4c52879e04e Mon Sep 17 00:00:00 2001 From: TusharMohapatra07 Date: Thu, 12 Dec 2024 00:50:41 +0530 Subject: [PATCH 9/9] remove comments --- pkg/controller/jobs/pod/pod_controller_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index 9c135ab2a4..8d98bf9979 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -5194,8 +5194,6 @@ func TestReconciler(t *testing.T) { func TestReconciler_ErrorFinalizingPod(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) - // now := time.Now() - // fakeClock := testingclock.NewFakeClock(now) clientBuilder := utiltesting.NewClientBuilder() if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil {