diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java index af868a76358f..d767cc62505c 100644 --- a/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java +++ b/core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java @@ -58,6 +58,7 @@ import io.trino.execution.buffer.PipelinedOutputBuffers; import io.trino.execution.buffer.SpoolingOutputStats; import io.trino.metadata.Split; +import io.trino.operator.RetryPolicy; import io.trino.operator.TaskStats; import io.trino.server.DynamicFilterService; import io.trino.server.FailTaskRequest; @@ -110,6 +111,7 @@ import static io.trino.SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask; import static io.trino.SystemSessionProperties.getRemoteTaskGuaranteedSplitsPerRequest; import static io.trino.SystemSessionProperties.getRemoteTaskRequestSizeHeadroom; +import static io.trino.SystemSessionProperties.getRetryPolicy; import static io.trino.SystemSessionProperties.isRemoteTaskAdaptiveUpdateRequestSizeEnabled; import static io.trino.execution.DynamicFiltersCollector.INITIAL_DYNAMIC_FILTERS_VERSION; import static io.trino.execution.TaskInfo.createInitialTask; @@ -343,6 +345,7 @@ public HttpRemoteTask( errorScheduledExecutor, stats); + RetryPolicy retryPolicy = getRetryPolicy(session); this.taskInfoFetcher = new TaskInfoFetcher( this::fatalUnacknowledgedFailure, taskStatusFetcher, @@ -357,7 +360,8 @@ public HttpRemoteTask( updateScheduledExecutor, errorScheduledExecutor, stats, - estimatedMemory); + estimatedMemory, + retryPolicy); taskStatusFetcher.addStateChangeListener(newStatus -> { TaskState state = newStatus.getState(); diff --git a/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java b/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java index 73b14cade4ba..7f87b2c5c94d 100644 --- a/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java +++ b/core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java @@ -33,6 +33,7 @@ import io.trino.execution.TaskState; import io.trino.execution.TaskStatus; import io.trino.execution.buffer.SpoolingOutputStats; +import io.trino.operator.RetryPolicy; import java.net.URI; import java.util.Optional; @@ -52,6 +53,7 @@ import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; import static io.airlift.http.client.Request.Builder.prepareGet; import static io.airlift.units.Duration.nanosSince; +import static io.trino.operator.RetryPolicy.TASK; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -81,6 +83,8 @@ public class TaskInfoFetcher private final AtomicReference spoolingOutputStats = new AtomicReference<>(); + private final RetryPolicy retryPolicy; + @GuardedBy("this") private boolean running; @@ -104,7 +108,8 @@ public TaskInfoFetcher( ScheduledExecutorService updateScheduledExecutor, ScheduledExecutorService errorScheduledExecutor, RemoteTaskStats stats, - Optional estimatedMemory) + Optional estimatedMemory, + RetryPolicy retryPolicy) { requireNonNull(initialTask, "initialTask is null"); requireNonNull(errorScheduledExecutor, "errorScheduledExecutor is null"); @@ -127,6 +132,7 @@ public TaskInfoFetcher( this.spanBuilderFactory = requireNonNull(spanBuilderFactory, "spanBuilderFactory is null"); this.stats = requireNonNull(stats, "stats is null"); this.estimatedMemory = requireNonNull(estimatedMemory, "estimatedMemory is null"); + this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null"); } public TaskInfo getTaskInfo() @@ -268,7 +274,7 @@ synchronized void updateTaskInfo(TaskInfo newTaskInfo) if (newTaskInfo.getTaskStatus().getState().isDone()) { boolean wasSet = spoolingOutputStats.compareAndSet(null, newTaskInfo.getOutputBuffers().getSpoolingOutputStats().orElse(null)); - if (wasSet && spoolingOutputStats.get() == null) { + if (retryPolicy == TASK && wasSet && spoolingOutputStats.get() == null) { log.debug("Task %s was updated to null spoolingOutputStats. Future calls to retrieveAndDropSpoolingOutputStats will fail.", taskId); } newTaskInfo = newTaskInfo.pruneSpoolingOutputStats();