diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 67e6231470..33f9774e68 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -40,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/utils/clock" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -90,6 +91,7 @@ var ( errIncorrectReconcileRequest = errors.New("event handler error: got a single pod reconcile request for a pod group") errPendingOps = jobframework.UnretryableError("waiting to observe previous operations on pods") errPodGroupLabelsMismatch = errors.New("constructing workload: pods have different label values") + realClock = clock.RealClock{} ) func init() { @@ -118,7 +120,7 @@ type Reconciler struct { } func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - return r.ReconcileGenericJob(ctx, req, &Pod{excessPodExpectations: r.expectationsStore}) + return r.ReconcileGenericJob(ctx, req, &Pod{excessPodExpectations: r.expectationsStore, clock: realClock}) } func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -155,6 +157,7 @@ type Pod struct { absentPods int excessPodExpectations *expectations.Store satisfiedExcessPods bool + clock clock.Clock } var ( @@ -206,6 +209,13 @@ func (p *Pod) isUnretriableGroup() bool { return false } +func (p *Pod) getClock() clock.Clock { + if p.clock != nil { + return p.clock + } + return realClock +} + // IsSuspended returns whether the job is suspended or not. func (p *Pod) IsSuspended() bool { if !p.isGroup { @@ -445,7 +455,7 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo, Type: ConditionTypeTerminationTarget, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Time{ - Time: time.Now(), + Time: p.getClock().Now(), }, Reason: string(stopReason), Message: eventMsg, @@ -771,28 +781,27 @@ func isPodRunnableOrSucceeded(p *corev1.Pod) bool { // lastActiveTime returns the last timestamp on which the pod was observed active: // - the time the pod was declared Failed // - the deletion time -func lastActiveTime(p *corev1.Pod) time.Time { - lastTransition := metav1.Now() +func lastActiveTime(clock clock.Clock, p *corev1.Pod) time.Time { + if !p.DeletionTimestamp.IsZero() { + return p.DeletionTimestamp.Time + } + lastTransition := clock.Now() for _, c := range p.Status.Conditions { if c.Type == corev1.ContainersReady { if c.Status == corev1.ConditionFalse && c.Reason == string(corev1.PodFailed) { - lastTransition = c.LastTransitionTime + lastTransition = c.LastTransitionTime.Time } break } } - deletionTime := ptr.Deref(p.DeletionTimestamp, metav1.Now()) - if lastTransition.Before(&deletionTime) { - return lastTransition.Time - } - return deletionTime.Time + return lastTransition } // sortInactivePods sorts the provided pods slice based on: // - finalizer state (pods with finalizers are first) // - lastActiveTime (pods that were active last are first) // - creation timestamp (newer pods are first) -func sortInactivePods(inactivePods []corev1.Pod) { +func sortInactivePods(clock clock.Clock, inactivePods []corev1.Pod) { sort.Slice(inactivePods, func(i, j int) bool { pi := &inactivePods[i] pj := &inactivePods[j] @@ -802,8 +811,8 @@ func sortInactivePods(inactivePods []corev1.Pod) { return iFin } - iLastActive := lastActiveTime(pi) - jLastActive := lastActiveTime(pj) + iLastActive := lastActiveTime(clock, pi) + jLastActive := lastActiveTime(clock, pj) if iLastActive.Equal(jLastActive) { return pi.CreationTimestamp.Before(&pj.CreationTimestamp) @@ -1150,7 +1159,7 @@ func (p *Pod) FindMatchingWorkloads(ctx context.Context, c client.Client, r reco } if finalizeablePodsCount := min(len(roleInactivePods), len(roleInactivePods)+len(roleActivePods)-int(ps.Count)); finalizeablePodsCount > 0 { - sortInactivePods(roleInactivePods) + sortInactivePods(p.getClock(), roleInactivePods) replacedInactivePods = append(replacedInactivePods, roleInactivePods[len(roleInactivePods)-finalizeablePodsCount:]...) keptPods = append(keptPods, roleInactivePods[:len(roleInactivePods)-finalizeablePodsCount]...) } else { @@ -1325,7 +1334,7 @@ func (p *Pod) waitingForReplacementPodsCondition(wl *kueue.Workload) (*metav1.Co if updated { replCond.ObservedGeneration = wl.Generation - replCond.LastTransitionTime = metav1.Now() + replCond.LastTransitionTime = metav1.NewTime(p.getClock().Now()) } return replCond, updated diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index e1dfc8f94d..8d98bf9979 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -5388,6 +5388,7 @@ func TestGetWorkloadNameForPod(t *testing.T) { func TestReconciler_DeletePodAfterTransientErrorsOnUpdateOrDeleteOps(t *testing.T) { now := time.Now() + fakeClock := testingclock.NewFakeClock(now) connRefusedErrMock := fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED) ctx, _ := utiltesting.ContextWithLog(t) var triggerUpdateErr, triggerDeleteErr bool @@ -5475,7 +5476,7 @@ func TestReconciler_DeletePodAfterTransientErrorsOnUpdateOrDeleteOps(t *testing. } recorder := record.NewBroadcaster().NewRecorder(kClient.Scheme(), corev1.EventSource{Component: "test"}) - reconciler := NewReconciler(kClient, recorder) + reconciler := NewReconciler(kClient, recorder, jobframework.WithClock(t, fakeClock)) reconcileRequest := reconcileRequestForPod(&pods[0]) // Reconcile for the first time. It'll try to remove the finalizers but fail