-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adaptive planning framework in FTE #20276
Conversation
2694490
to
27234a6
Compare
b654a9c
to
d1c836f
Compare
d1c836f
to
6e69010
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put some comments but I am a bit lost on main logic
core/trino-main/src/main/java/io/trino/cost/StatsCalculator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputStats.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputStats.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/OutputDataSizeEstimate.java
Outdated
Show resolved
Hide resolved
public boolean isStageFinished(PlanFragmentId planFragmentId) | ||
{ | ||
return runtimeOutputStats.containsKey(planFragmentId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this accurate? We can get output stats for stages which are still running and perform adaptive optimization based on those.
I did not get to that commit yet but it looks you can put stuff in runtimeOutputStats
for stages which are running and stats are estimated via ESTIMATED_BY_PROGRESS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline, add comment
.../java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java
Show resolved
Hide resolved
.filter(planFragmentId -> stageExecutions.containsKey(getStageId(planFragmentId))) | ||
.collect(toImmutableSet())); | ||
if (isReadyForExecutionResult.isReadyForExecution()) { | ||
return adaptivePlanner.optimize(plan, createStageRuntimeInfoProvider()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return here after we replan single stage. What about the others?
core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAdaptivePartitioning.java
Show resolved
Hide resolved
return root; | ||
} | ||
|
||
PlanNode adaptivePlan = addAdaptivePlanNode(idAllocator, initialPlan, optimizedPlan); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have hard time understanding this - can you explain why we need extra plan node here?
Maybe code comment?
A bit more clearer after I read PR description (shoul have done that at the beginning I guess). Will get back to reading in a while. |
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AdaptivePartitioning.java
Outdated
Show resolved
Hide resolved
72fecaa
to
74ab6f2
Compare
Taking a look at failed CI |
@gaurav8297 Could you extract prefix commits as separate PRs? They should land much quicker |
6859c62
to
8b6513b
Compare
fbb8325
to
b0d1c4d
Compare
CI Issue: #16315 |
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AdaptivePlanOptimizer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AdaptivePlanOptimizer.java
Show resolved
Hide resolved
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
public class AdaptivePlanNode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can be many Visitors in the code which would not support this one.
E.g. UnaliasSymbolReferences.Visitor - but it is just a random find. How are we sure no important ones are missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnaliasSymbolReferences
-> I don't think we need to handle AdaptivePlanNode
in this visitor since it is used during static planning. The same goes for other similar kind of visitors. However, for visitors that are used during execution time, we need to handle AdaptivePlanNode
there. @losipiuk
5a49930
to
9b04207
Compare
9b04207
to
b73c746
Compare
core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java
Outdated
Show resolved
Hide resolved
@@ -508,13 +559,28 @@ else if (exchange.getType() == ExchangeNode.Type.REPARTITION) { | |||
isWorkerCoordinatorBoundary(context.get(), childrenProperties.build()) ? getRetryPolicy(session) : RetryPolicy.NONE); | |||
} | |||
|
|||
private SubPlan buildSubPlan(PlanNode node, FragmentProperties properties, RewriteContext<FragmentProperties> context) | |||
private SubPlan buildSubPlan(PlanNode node, FragmentIdentifier fragmentIdentifier, FragmentProperties properties, RewriteContext<FragmentProperties> context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dihotomy of FragmentIdentifier
and PlanFragmentId
is confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also why do we need to rebuild fragment for "unchanged" ones. Would it be more logical to cache whole unchanged SubPlan
and return here instead of rebuilding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added caching
core/trino-main/src/main/java/io/trino/sql/planner/AdaptivePlanner.java
Outdated
Show resolved
Hide resolved
b73c746
to
2ada6a7
Compare
This rule is only applicable for FTE where we have exchange runtime statistics. We will use this rule in adaptive re-planning.
We will require this while re-planning if there are more exchanges added to the existing stage plan.
bea3729
to
0c4fdb1
Compare
0c4fdb1
to
69f01b3
Compare
LGTM. Thanks |
Does this need a release note? Or is it non-user impacting at current moment due to just being a framework? |
It does not i think- it refactors a feature which was handcoded to use the new framework added by this PR - but from user perspective it is mostly not visible. |
Description
Adaptive planning is part of FTE, wherein the engine can modify the plan at runtime based on exchange-level statistics. For example, reordering of join or mitigation of skewness. It will significantly impact cost and performance if the plan chosen by the static optimiser isn’t the best due to the underestimation of statistics or lack of statistics.
Included in PR:
High Level Design
Available Runtime Stats (at exchange level)
RemoteSourceNode Stats Rule:
Replanning Steps:
PlanFragmentIds
. The re-fragmentation will only happen if the old plan and the new plan have some differences. To check these differences, we rely on PlanOptimizer#optimizeAndReturnChanges API which also returns changes in plan ids.Note: We do not change the fragment ids which have no changes and are not downstream of the changed plan nodes. This optimization is done to avoid unnecessary stage restarts due to speculative execution.
Explain Analyze:
An example of adaptive partitioning, where the partition count changes from 50 to 1000.
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: