-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reject task updates addressed to previous failed node #21744
Reject task updates addressed to previous failed node #21744
Conversation
One of my favorite all time bug fix |
core/trino-main/src/main/java/io/trino/server/TaskResource.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/TaskResource.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/TaskResource.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/TaskResource.java
Outdated
Show resolved
Hide resolved
d0c0010
to
ded32f1
Compare
TaskUpdateRequest taskUpdateRequest, | ||
@Context UriInfo uriInfo, | ||
@Suspended AsyncResponse asyncResponse) | ||
{ | ||
requireNonNull(taskUpdateRequest, "taskUpdateRequest is null"); | ||
if (!nodeIdentifier.equals(requestNodeId)) { | ||
asyncResponse.resume(Response.status(Status.BAD_REQUEST) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to put this in an Http Filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it require to update all the comms requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be able to filter by common path if it exists IIRC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if(path.startsWith(/v1/task) {
//filter logic
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I somehow prefer explicit approach at least for now, where not all communication channels are updated.
I think a filter-based approach is reasonable follow-up once we have the approach proven and we just want to reduce boilerplate and ensure it's applied uniformly. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HTTP filter on receiving side would be easy. HTTP client request filter wouldn't be possible, as the filter wouldn't have access to the expected announcement ID for the target.
What happens when a node fails and restarts with the same IP and node id? This is a legitimate case, as the node id is expected to be stable for a given node across restarts (especially, when the node may have persistent data - raptor, etc) |
That's definitely not handled by this PR. I was initially thinking about introducing something like "incarnation ID" (e.g. a UUID randomly chosen at a node start). Switched over to node ID on the assumption that (1) very quick node restarts with IP re-use are most likely to happen in kubernetes env and (2) node ID can be generated there (it's generated when not set, right?) cc @elonazoulay |
That sounds like a good idea! It looks like 1) and 2) are correct. This is the behavior if node id is unset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is using node ID, which is expected to be stable across restarts. For the Docker image, we set this to the hostname. The node identifiers uniquely identifies the deployment, which may have persistent cached data, etc.
Instead, this should use instance ID via NodeInfo.getInstanceId()
, which will be unique on every restart. Unfortunately, we don't provide this in discovery announcements. I think the easiest approach is to modify the code in Server.updateConnectorIds()
to add instance ID as a property.
I also like @jklamer's idea of using an HTTP request filter. Instead of modifying the request paths, we can add a header X-Trino-Instance-Id
that is validated in a filter. This makes it easy to add validations to more resources by adding the header in the request.
I think the problem of node restarts is more complicated than it appears. We probably should track the instance ID and node IDs for remote tasks in the coordinator and proactively fail them once we detect that the node has restarted (same node ID with different instance ID, or different IDs for the same task host/port). @losipiuk How does this affect FTE? |
ded32f1
to
eb0b960
Compare
This might be needed, but also it may turn out unnecessary. The task ID identifies the task a node was doing. If a node restarts, it won't know that task ID, so it won't update coordinator on its progress. This should result in task failure.
Indeed. It seems it was added in 8d15b14 |
eb0b960
to
22405b9
Compare
Done
Not done at this point. It is easy to inject validation when header is present, but it's harder to inject validation that requires the header to be present. For instance, I have problem finding |
39d6298
to
3d62a27
Compare
Added a test. |
3d62a27
to
95115f7
Compare
testing/trino-tests/src/test/java/io/trino/tests/TestRejectTaskOnNodeRestart.java
Outdated
Show resolved
Hide resolved
queryRunner.restartWorker(worker); | ||
} | ||
|
||
assertThatThrownBy(future::get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you expect error if nodeRestartBeforeQueryStart==true
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because coordinator's copy of services isn't updated yet, and coordinator tries to schedule a new task on an old instance of a worker. This is exactly the kind of production problem that sparked this PR -- worker was restarted after crash and coordinator was trying to use this new worker before it's initialized (has catalogs) because it was thinking it's the previously announced instance.
in my local testing testNodeRestartBeforeQueryStart
passed 50 of 50.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To your point, there is no strong guarantee, as there is no synchronization / delayed registration for a newly started worker. Moreover, the worker startup is synchronous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so the assumption here is that we are scheduling query quickly enough that coordinator state is stale, right? a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW this indicates a bigger problem:
the worker crash and restart has a more prolonged effect on query execution that it could.
I.e. it affects currently executing queries, but also new queries.
A user can retry a query after it failed due to a worker crash, and the retried query can still fail because of that very same worker crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i run the test removing the instance_id related changes (both: checks in TaskResource and discovery propagation).
The results for testRestartDuringQuery
were better before changes in this PR in terms on how quickly the cluster heals itself.
After worker is started, the next query succeeds. But it doesn't mean a retried query is guaranteed to succeed. It won't succeed, if retry happens before the worker is fully restarted.
The results for testRestartBeforeQuery
were much worse before changes in this PR.
The worker crash pretty deterministically leads to next query hanging indefinitely. This also means that in testRestartDuringQuery
case, if the retried query was sent too quickly (before worker fully restarted), it would hang instead of failing.
Thus, I consider this PR an improvement and want to continue on it & get it merged.
But, we may want to iterate further and heal from crashes faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The results for
testRestartBeforeQuery
were much worse before changes in this PR.
The worker crash pretty deterministically leads to next query hanging indefinitely.
that's was just observation mistake.
my test query was simply never completing (very long), but executing just fine.
7bc02a7
to
f8d2826
Compare
7719662
to
af33656
Compare
Propagate Airlift's `NodeInfo.instanceId` to Trino's `InternalNode`. The instance ID allows to identify node restarts.
In case of a worker node failure, a new node can be started with same IP address. Once the new node is done initializing and registers with discovery service, the coordinator will eventually be aware of it and use it for processing queries. Before then, and before coordinator is aware of node failure, it may try to reach the old failed node. Since the old and the new may have same IP address, coordinator may effective talk to uninitialized worker node as if it was ready to service queries. This may lead to failures such as "unknown catalog" or rather "unknown handle id", when decoding task update requests. Prevent such hard to understand failures by passing node ID in task update requests. Provided that the new worker is started with a different node ID, these requests will be cleanly rejected without trying to service them.
af33656
to
4242f87
Compare
#21921 is a simpler alternative that avoids the problem described here: #21744 (comment) |
In case of a worker node failure, a new node can be started with same IP address. Once the new node is done initializing and registers with discovery service, the coordinator will eventually be aware of it and use it for processing queries. Before then, and before coordinator is aware of node failure, it may try to reach the old failed node. Since the old and the new may have same IP address, coordinator may effective talk to uninitialized worker node as if it was ready to service queries. This may lead to failures such as "unknown catalog" or rather "unknown handle id", when decoding task update requests.
Prevent such hard to understand failures by passing node ID in task update requests. Provided that the new worker is started with a different node ID, these requests will be cleanly rejected without trying to service them.
Per #21735 (comment) fixes #21735