diff --git a/core/trino-main/src/main/java/io/trino/server/TaskResource.java b/core/trino-main/src/main/java/io/trino/server/TaskResource.java index c8e3d5660c29..e714769025f0 100644 --- a/core/trino-main/src/main/java/io/trino/server/TaskResource.java +++ b/core/trino-main/src/main/java/io/trino/server/TaskResource.java @@ -96,6 +96,7 @@ public class TaskResource private static final Duration ADDITIONAL_WAIT_TIME = new Duration(5, SECONDS); private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(2, SECONDS); + private final StartupStatus startupStatus; private final SqlTaskManager taskManager; private final SessionPropertyManager sessionPropertyManager; private final Executor responseExecutor; @@ -106,12 +107,14 @@ public class TaskResource @Inject public TaskResource( + StartupStatus startupStatus, SqlTaskManager taskManager, SessionPropertyManager sessionPropertyManager, @ForAsyncHttp BoundedExecutor responseExecutor, @ForAsyncHttp ScheduledExecutorService timeoutExecutor, FailureInjector failureInjector) { + this.startupStatus = requireNonNull(startupStatus, "startupStatus is null"); this.taskManager = requireNonNull(taskManager, "taskManager is null"); this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null"); this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null"); @@ -143,6 +146,9 @@ public void createOrUpdateTask( @Suspended AsyncResponse asyncResponse) { requireNonNull(taskUpdateRequest, "taskUpdateRequest is null"); + if (failRequestIfInvalid(asyncResponse)) { + return; + } Session session = taskUpdateRequest.session().toSession(sessionPropertyManager, taskUpdateRequest.extraCredentials(), taskUpdateRequest.exchangeEncryptionKey()); @@ -179,6 +185,9 @@ public void getTaskInfo( @Suspended AsyncResponse asyncResponse) { requireNonNull(taskId, "taskId is null"); + if (failRequestIfInvalid(asyncResponse)) { + return; + } if (injectFailure(taskManager.getTraceToken(taskId), taskId, RequestType.GET_TASK_INFO, asyncResponse)) { return; @@ -224,6 +233,9 @@ public void getTaskStatus( @Suspended AsyncResponse asyncResponse) { requireNonNull(taskId, "taskId is null"); + if (failRequestIfInvalid(asyncResponse)) { + return; + } if (injectFailure(taskManager.getTraceToken(taskId), taskId, RequestType.GET_TASK_STATUS, asyncResponse)) { return; @@ -265,6 +277,9 @@ public void acknowledgeAndGetNewDynamicFilterDomains( { requireNonNull(taskId, "taskId is null"); requireNonNull(currentDynamicFiltersVersion, "currentDynamicFiltersVersion is null"); + if (failRequestIfInvalid(asyncResponse)) { + return; + } if (injectFailure(taskManager.getTraceToken(taskId), taskId, RequestType.ACKNOWLEDGE_AND_GET_NEW_DYNAMIC_FILTER_DOMAINS, asyncResponse)) { return; @@ -397,6 +412,22 @@ public void pruneCatalogs(Set catalogHandles) taskManager.pruneCatalogs(catalogHandles); } + private boolean failRequestIfInvalid(AsyncResponse asyncResponse) + { + if (!startupStatus.isStartupComplete()) { + // When worker node is restarted after a crash, coordinator may be still unaware of the situation and may attempt to schedule tasks on it. + // Ideally the coordinator should not schedule tasks on worker that is not ready, but in pipelined execution there is currently no way to move a task. + // Accepting a request too early will likely lead to some failure and HTTP 500 (INTERNAL_SERVER_ERROR) response. The coordinator won't retry on this. + // Send 503 (SERVICE_UNAVAILABLE) so that request is retried. + asyncResponse.resume(Response.status(Status.SERVICE_UNAVAILABLE) + .type(MediaType.TEXT_PLAIN_TYPE) + .entity("The server is not fully started yet ") + .build()); + return true; + } + return false; + } + private boolean injectFailure( Optional traceToken, TaskId taskId, diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestWorkerRestart.java b/testing/trino-tests/src/test/java/io/trino/tests/TestWorkerRestart.java index 3f3962bc8f04..ffdbccad86c1 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestWorkerRestart.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestWorkerRestart.java @@ -133,9 +133,7 @@ public void testStartDuringQuery() Future future = executor.submit(() -> queryRunner.execute("SELECT count(*) FROM tpch.tiny.lineitem -- " + randomUUID())); // the worker is shut down already, but restartWorker() will reuse its address queryRunner.restartWorker(worker); - assertThatThrownBy(future::get) - .isInstanceOf(ExecutionException.class) - .hasStackTraceContaining("Error loading catalogs on worker"); + future.get(); // query should succeed // Ensure that the restarted worker is able to serve queries. assertThat((long) queryRunner.execute("SELECT count(*) FROM tpch.tiny.lineitem").getOnlyValue())