Skip to content

Commit

Permalink
Extend WaitForPodsReady API with RecoveryTimeout field
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Dec 19, 2024
1 parent 88d83ff commit ffaf1aa
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 11 deletions.
10 changes: 10 additions & 0 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ type WaitForPodsReady struct {
// +optional
Timeout *metav1.Duration `json:"timeout,omitempty"`

// RecoveryTimeout defines an optional timeout, measured since the
// last transition to the PodsReady=false condition after a Workload is Admitted and running.
// Such a transition may happen when a Pod failed and the replacement Pod
// is awaited to be scheduled.
// After exceeding the timeout the corresponding job gets suspended again
// and requeued after the backoff delay. The timeout is enforced only if waitForPodsReady.enable=true.
// Defaults to 3 mins.
// +optional
RecoveryTimeout *metav1.Duration `json:"recoveryTimeout,omitempty"`

// BlockAdmission when true, cluster queue will block admissions for all
// subsequent jobs until the jobs reach the PodsReady=true condition.
// This setting is only honored when `Enable` is set to true.
Expand Down
4 changes: 4 additions & 0 deletions apis/config/v1beta1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
DefaultClientConnectionQPS float32 = 20.0
DefaultClientConnectionBurst int32 = 30
defaultPodsReadyTimeout = 5 * time.Minute
defaultPodsRecoveryTimeout = 3 * time.Minute
DefaultQueueVisibilityUpdateIntervalSeconds int32 = 5
DefaultClusterQueuesMaxCount int32 = 10
defaultJobFrameworkName = "batch/job"
Expand Down Expand Up @@ -119,6 +120,9 @@ func SetDefaults_Configuration(cfg *Configuration) {
if cfg.WaitForPodsReady.Timeout == nil {
cfg.WaitForPodsReady.Timeout = &metav1.Duration{Duration: defaultPodsReadyTimeout}
}
if cfg.WaitForPodsReady.RecoveryTimeout == nil {
cfg.WaitForPodsReady.RecoveryTimeout = &metav1.Duration{Duration: defaultPodsRecoveryTimeout}
}
if cfg.WaitForPodsReady.BlockAdmission == nil {
defaultBlockAdmission := true
if !cfg.WaitForPodsReady.Enable {
Expand Down
24 changes: 14 additions & 10 deletions apis/config/v1beta1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func TestSetDefaults_Configuration(t *testing.T) {
WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout},
}

podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
podsReadyTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
podsReadyRecoveryTimeout := metav1.Duration{Duration: defaultPodsRecoveryTimeout}
podsReadyTimeoutOverwrite := metav1.Duration{Duration: time.Minute}

testCases := map[string]struct {
Expand Down Expand Up @@ -388,9 +389,10 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
want: &Configuration{
WaitForPodsReady: &WaitForPodsReady{
Enable: true,
BlockAdmission: ptr.To(true),
Timeout: &podsReadyTimeoutTimeout,
Enable: true,
BlockAdmission: ptr.To(true),
Timeout: &podsReadyTimeout,
RecoveryTimeout: &podsReadyRecoveryTimeout,
RequeuingStrategy: &RequeuingStrategy{
Timestamp: ptr.To(EvictionTimestamp),
BackoffBaseSeconds: ptr.To[int32](DefaultRequeuingBackoffBaseSeconds),
Expand Down Expand Up @@ -420,9 +422,10 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
want: &Configuration{
WaitForPodsReady: &WaitForPodsReady{
Enable: false,
BlockAdmission: ptr.To(false),
Timeout: &podsReadyTimeoutTimeout,
Enable: false,
BlockAdmission: ptr.To(false),
Timeout: &podsReadyTimeout,
RecoveryTimeout: &podsReadyRecoveryTimeout,
RequeuingStrategy: &RequeuingStrategy{
Timestamp: ptr.To(EvictionTimestamp),
BackoffBaseSeconds: ptr.To[int32](DefaultRequeuingBackoffBaseSeconds),
Expand Down Expand Up @@ -458,9 +461,10 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
want: &Configuration{
WaitForPodsReady: &WaitForPodsReady{
Enable: true,
BlockAdmission: ptr.To(true),
Timeout: &podsReadyTimeoutOverwrite,
Enable: true,
BlockAdmission: ptr.To(true),
Timeout: &podsReadyTimeoutOverwrite,
RecoveryTimeout: &podsReadyRecoveryTimeout,
RequeuingStrategy: &RequeuingStrategy{
Timestamp: ptr.To(CreationTimestamp),
BackoffBaseSeconds: ptr.To[int32](63),
Expand Down
5 changes: 5 additions & 0 deletions apis/config/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ webhook:
Enable: true,
BlockAdmission: ptr.To(false),
Timeout: &metav1.Duration{Duration: 50 * time.Second},
RecoveryTimeout: &metav1.Duration{Duration: 3 * time.Minute},
RequeuingStrategy: &configapi.RequeuingStrategy{
Timestamp: ptr.To(configapi.CreationTimestamp),
BackoffLimitCount: ptr.To[int32](10),
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func validateWaitForPodsReady(c *configapi.Configuration) field.ErrorList {
allErrs = append(allErrs, field.Invalid(waitForPodsReadyPath.Child("timeout"),
c.WaitForPodsReady.Timeout, apimachineryvalidation.IsNegativeErrorMsg))
}
if c.WaitForPodsReady.RecoveryTimeout != nil && c.WaitForPodsReady.RecoveryTimeout.Duration < 0 {
allErrs = append(allErrs, field.Invalid(waitForPodsReadyPath.Child("recoveryTimeout"),
c.WaitForPodsReady.RecoveryTimeout, apimachineryvalidation.IsNegativeErrorMsg))
}
if strategy := c.WaitForPodsReady.RequeuingStrategy; strategy != nil {
if strategy.Timestamp != nil &&
*strategy.Timestamp != configapi.CreationTimestamp && *strategy.Timestamp != configapi.EvictionTimestamp {
Expand Down
20 changes: 20 additions & 0 deletions pkg/config/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,23 @@ func TestValidate(t *testing.T) {
},
},
},
"negative waitForPodsReady.recoveryTimeout": {
cfg: &configapi.Configuration{
Integrations: defaultIntegrations,
WaitForPodsReady: &configapi.WaitForPodsReady{
Enable: true,
RecoveryTimeout: &metav1.Duration{
Duration: -1,
},
},
},
wantErr: field.ErrorList{
&field.Error{
Type: field.ErrorTypeInvalid,
Field: "waitForPodsReady.recoveryTimeout",
},
},
},
"valid waitForPodsReady": {
cfg: &configapi.Configuration{
Integrations: defaultIntegrations,
Expand All @@ -437,6 +454,9 @@ func TestValidate(t *testing.T) {
Timeout: &metav1.Duration{
Duration: 50,
},
RecoveryTimeout: &metav1.Duration{
Duration: 5,
},
BlockAdmission: ptr.To(false),
RequeuingStrategy: &configapi.RequeuingStrategy{
Timestamp: ptr.To(configapi.CreationTimestamp),
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func waitForPodsReady(cfg *configapi.WaitForPodsReady) *waitForPodsReadyConfig {
return nil
}
result := waitForPodsReadyConfig{
timeout: cfg.Timeout.Duration,
timeout: cfg.Timeout.Duration,
recoveryTimeout: cfg.RecoveryTimeout.Duration,
}
if cfg.RequeuingStrategy != nil {
result.requeuingBackoffBaseSeconds = *cfg.RequeuingStrategy.BackoffBaseSeconds
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (

type waitForPodsReadyConfig struct {
timeout time.Duration
recoveryTimeout time.Duration
requeuingBackoffLimitCount *int32
requeuingBackoffBaseSeconds int32
requeuingBackoffMaxDuration time.Duration
Expand Down
13 changes: 13 additions & 0 deletions site/content/en/docs/reference/kueue-config.v1beta1.md
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,19 @@ evicted and requeued in the same cluster queue.
Defaults to 5min.</p>
</td>
</tr>
<tr><td><code>recoveryTimeout</code><br/>
<a href="https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#duration-v1-meta"><code>k8s.io/apimachinery/pkg/apis/meta/v1.Duration</code></a>
</td>
<td>
<p>RecoveryTimeout defines an optional timeout, measured since the
last transition to the PodsReady=false condition after a Workload is Admitted and running.
Such a transition may happen when a Pod failed and the replacement Pod
is awaited to be scheduled.
After exceeding the timeout the corresponding job gets suspended again
and requeued after the backoff delay. The timeout is enforced only if waitForPodsReady.enable=true.
Defaults to 3 mins.</p>
</td>
</tr>
<tr><td><code>blockAdmission</code> <B>[Required]</B><br/>
<code>bool</code>
</td>
Expand Down

0 comments on commit ffaf1aa

Please sign in to comment.