Skip to content

Commit

Permalink
Reject task on worker still starting up
Browse files Browse the repository at this point in the history
When worker node is restarted after a crash, coordinator may be still
unaware of the situation and may attempt to schedule tasks on it.
Ideally the coordinator should not schedule tasks on worker that is not
ready, but in pipelined execution there is currently no way to move a
task.  Accepting a request too early will likely lead to some failure
and HTTP 500 (INTERNAL_SERVER_ERROR) response. The coordinator won't
retry on this.  Send 503 (SERVICE_UNAVAILABLE) so that request is
retried.
  • Loading branch information
findepi committed May 10, 2024
1 parent b591db1 commit 8fbbd54
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
31 changes: 31 additions & 0 deletions core/trino-main/src/main/java/io/trino/server/TaskResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class TaskResource
private static final Duration ADDITIONAL_WAIT_TIME = new Duration(5, SECONDS);
private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(2, SECONDS);

private final StartupStatus startupStatus;
private final SqlTaskManager taskManager;
private final SessionPropertyManager sessionPropertyManager;
private final Executor responseExecutor;
Expand All @@ -106,12 +107,14 @@ public class TaskResource

@Inject
public TaskResource(
StartupStatus startupStatus,
SqlTaskManager taskManager,
SessionPropertyManager sessionPropertyManager,
@ForAsyncHttp BoundedExecutor responseExecutor,
@ForAsyncHttp ScheduledExecutorService timeoutExecutor,
FailureInjector failureInjector)
{
this.startupStatus = requireNonNull(startupStatus, "startupStatus is null");
this.taskManager = requireNonNull(taskManager, "taskManager is null");
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null");
Expand Down Expand Up @@ -143,6 +146,9 @@ public void createOrUpdateTask(
@Suspended AsyncResponse asyncResponse)
{
requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");
if (failRequestIfInvalid(asyncResponse)) {
return;
}

Session session = taskUpdateRequest.session().toSession(sessionPropertyManager, taskUpdateRequest.extraCredentials(), taskUpdateRequest.exchangeEncryptionKey());

Expand Down Expand Up @@ -179,6 +185,9 @@ public void getTaskInfo(
@Suspended AsyncResponse asyncResponse)
{
requireNonNull(taskId, "taskId is null");
if (failRequestIfInvalid(asyncResponse)) {
return;
}

if (injectFailure(taskManager.getTraceToken(taskId), taskId, RequestType.GET_TASK_INFO, asyncResponse)) {
return;
Expand Down Expand Up @@ -224,6 +233,9 @@ public void getTaskStatus(
@Suspended AsyncResponse asyncResponse)
{
requireNonNull(taskId, "taskId is null");
if (failRequestIfInvalid(asyncResponse)) {
return;
}

if (injectFailure(taskManager.getTraceToken(taskId), taskId, RequestType.GET_TASK_STATUS, asyncResponse)) {
return;
Expand Down Expand Up @@ -265,6 +277,9 @@ public void acknowledgeAndGetNewDynamicFilterDomains(
{
requireNonNull(taskId, "taskId is null");
requireNonNull(currentDynamicFiltersVersion, "currentDynamicFiltersVersion is null");
if (failRequestIfInvalid(asyncResponse)) {
return;
}

if (injectFailure(taskManager.getTraceToken(taskId), taskId, RequestType.ACKNOWLEDGE_AND_GET_NEW_DYNAMIC_FILTER_DOMAINS, asyncResponse)) {
return;
Expand Down Expand Up @@ -397,6 +412,22 @@ public void pruneCatalogs(Set<CatalogHandle> catalogHandles)
taskManager.pruneCatalogs(catalogHandles);
}

private boolean failRequestIfInvalid(AsyncResponse asyncResponse)
{
if (!startupStatus.isStartupComplete()) {
// When worker node is restarted after a crash, coordinator may be still unaware of the situation and may attempt to schedule tasks on it.
// Ideally the coordinator should not schedule tasks on worker that is not ready, but in pipelined execution there is currently no way to move a task.
// Accepting a request too early will likely lead to some failure and HTTP 500 (INTERNAL_SERVER_ERROR) response. The coordinator won't retry on this.
// Send 503 (SERVICE_UNAVAILABLE) so that request is retried.
asyncResponse.resume(Response.status(Status.SERVICE_UNAVAILABLE)
.type(MediaType.TEXT_PLAIN_TYPE)
.entity("The server is not fully started yet ")
.build());
return true;
}
return false;
}

private boolean injectFailure(
Optional<String> traceToken,
TaskId taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,7 @@ public void testStartDuringQuery()
Future<MaterializedResult> future = executor.submit(() -> queryRunner.execute("SELECT count(*) FROM tpch.tiny.lineitem -- " + randomUUID()));
// the worker is shut down already, but restartWorker() will reuse its address
queryRunner.restartWorker(worker);
assertThatThrownBy(future::get)
.isInstanceOf(ExecutionException.class)
.hasStackTraceContaining("Error loading catalogs on worker");
future.get(); // query should succeed

// Ensure that the restarted worker is able to serve queries.
assertThat((long) queryRunner.execute("SELECT count(*) FROM tpch.tiny.lineitem").getOnlyValue())
Expand Down

0 comments on commit 8fbbd54

Please sign in to comment.