Skip to content

Commit

Permalink
Use CachedTableStatsProvider across query
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav8297 committed Mar 11, 2024
1 parent f838c09 commit 69f01b3
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.inject.Inject;
Expand All @@ -24,6 +25,7 @@
import io.opentelemetry.context.Context;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.cost.CachingTableStatsProvider;
import io.trino.cost.CostCalculator;
import io.trino.cost.StatsCalculator;
import io.trino.exchange.ExchangeManagerRegistry;
Expand Down Expand Up @@ -119,7 +121,7 @@ public class SqlQueryExecution
private final OutputStatsEstimatorFactory outputStatsEstimatorFactory;
private final TaskExecutionStats taskExecutionStats;
private final List<PlanOptimizer> planOptimizers;
private final AdaptivePlanner adaptivePlanner;
private final List<AdaptivePlanOptimizer> adaptivePlanOptimizers;
private final PlanFragmenter planFragmenter;
private final RemoteTaskFactory remoteTaskFactory;
private final int scheduleSplitBatchSize;
Expand Down Expand Up @@ -213,17 +215,8 @@ private SqlQueryExecution(
// analyze query
this.analysis = analyze(preparedQuery, stateMachine, warningCollector, planOptimizersStatsCollector, analyzerFactory);

// create adaptive planner
requireNonNull(adaptivePlanOptimizers, "adaptivePlanOptimizers is null");
this.adaptivePlanner = new AdaptivePlanner(
stateMachine.getSession(),
plannerContext,
adaptivePlanOptimizers,
planFragmenter,
DISTRIBUTED_PLAN_SANITY_CHECKER,
typeAnalyzer,
warningCollector,
planOptimizersStatsCollector);
// for adaptive planner
this.adaptivePlanOptimizers = ImmutableList.copyOf(requireNonNull(adaptivePlanOptimizers, "adaptivePlanOptimizers is null"));

stateMachine.addStateChangeListener(state -> {
if (!state.isDone()) {
Expand Down Expand Up @@ -418,11 +411,12 @@ public void start()
}, directExecutor());

try {
PlanRoot plan = planQuery();
CachingTableStatsProvider tableStatsProvider = new CachingTableStatsProvider(plannerContext.getMetadata(), getSession());
PlanRoot plan = planQuery(tableStatsProvider);
// DynamicFilterService needs plan for query to be registered.
// Query should be registered before dynamic filter suppliers are requested in distribution planning.
registerDynamicFilteringQuery(plan);
planDistribution(plan);
planDistribution(plan, tableStatsProvider);
}
finally {
synchronized (this) {
Expand Down Expand Up @@ -474,20 +468,20 @@ public void addFinalQueryInfoListener(StateChangeListener<QueryInfo> stateChange
stateMachine.addQueryInfoStateChangeListener(stateChangeListener);
}

private PlanRoot planQuery()
private PlanRoot planQuery(CachingTableStatsProvider tableStatsProvider)
{
Span span = tracer.spanBuilder("planner")
.setParent(Context.current().with(getSession().getQuerySpan()))
.startSpan();
try (var ignored = scopedSpan(span)) {
return doPlanQuery();
return doPlanQuery(tableStatsProvider);
}
catch (StackOverflowError e) {
throw new TrinoException(STACK_OVERFLOW, "statement is too large (stack overflow during analysis)", e);
}
}

private PlanRoot doPlanQuery()
private PlanRoot doPlanQuery(CachingTableStatsProvider tableStatsProvider)
{
// plan query
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
Expand All @@ -499,7 +493,8 @@ private PlanRoot doPlanQuery()
statsCalculator,
costCalculator,
stateMachine.getWarningCollector(),
planOptimizersStatsCollector);
planOptimizersStatsCollector,
tableStatsProvider);
Plan plan = logicalPlanner.plan(analysis);
queryPlan.set(plan);

Expand All @@ -520,7 +515,7 @@ private PlanRoot doPlanQuery()
return new PlanRoot(fragmentedPlan, !explainAnalyze);
}

private void planDistribution(PlanRoot plan)
private void planDistribution(PlanRoot plan, CachingTableStatsProvider tableStatsProvider)
{
// if query was canceled, skip creating scheduler
if (stateMachine.isDone()) {
Expand Down Expand Up @@ -580,7 +575,16 @@ private void planDistribution(PlanRoot plan)
failureDetector,
dynamicFilterService,
taskExecutionStats,
adaptivePlanner,
new AdaptivePlanner(
stateMachine.getSession(),
plannerContext,
adaptivePlanOptimizers,
planFragmenter,
DISTRIBUTED_PLAN_SANITY_CHECKER,
typeAnalyzer,
stateMachine.getWarningCollector(),
planOptimizersStatsCollector,
tableStatsProvider),
plan.getRoot());
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.trino.Session;
import io.trino.client.NodeVersion;
import io.trino.cost.CachingTableStatsProvider;
import io.trino.cost.CostCalculator;
import io.trino.cost.StatsCalculator;
import io.trino.execution.querystats.PlanOptimizersStatsCollector;
Expand Down Expand Up @@ -172,7 +173,8 @@ public Plan getLogicalPlan(Session session, Statement statement, List<Expression
statsCalculator,
costCalculator,
warningCollector,
planOptimizersStatsCollector);
planOptimizersStatsCollector,
new CachingTableStatsProvider(plannerContext.getMetadata(), session));
return logicalPlanner.plan(analysis, OPTIMIZED_AND_VALIDATED, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.cost.CachingTableStatsProvider;
import io.trino.cost.RuntimeInfoProvider;
import io.trino.cost.StatsAndCosts;
import io.trino.cost.TableStatsProvider;
import io.trino.execution.querystats.PlanOptimizersStatsCollector;
import io.trino.execution.warnings.WarningCollector;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -94,6 +93,7 @@ public class AdaptivePlanner
private final IrTypeAnalyzer typeAnalyzer;
private final WarningCollector warningCollector;
private final PlanOptimizersStatsCollector planOptimizersStatsCollector;
private final CachingTableStatsProvider tableStatsProvider;

public AdaptivePlanner(
Session session,
Expand All @@ -103,7 +103,8 @@ public AdaptivePlanner(
PlanSanityChecker planSanityChecker,
IrTypeAnalyzer typeAnalyzer,
WarningCollector warningCollector,
PlanOptimizersStatsCollector planOptimizersStatsCollector)
PlanOptimizersStatsCollector planOptimizersStatsCollector,
CachingTableStatsProvider tableStatsProvider)
{
this.session = requireNonNull(session, "session is null");
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
Expand All @@ -113,6 +114,7 @@ public AdaptivePlanner(
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
this.warningCollector = requireNonNull(warningCollector, "warningCollector is null");
this.planOptimizersStatsCollector = requireNonNull(planOptimizersStatsCollector, "planOptimizersStatsCollector is null");
this.tableStatsProvider = requireNonNull(tableStatsProvider, "tableStatsProvider is null");
}

public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider)
Expand Down Expand Up @@ -148,8 +150,7 @@ public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider)

// optimize the current plan
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(getMaxPlanId(currentPlan) + 1);
TableStatsProvider tableStatsProvider = new CachingTableStatsProvider(plannerContext.getMetadata(), session);
AdaptivePlanOptimizer.Result optimizationResult = optimizePlan(currentPlan, symbolAllocator, tableStatsProvider, runtimeInfoProvider, idAllocator);
AdaptivePlanOptimizer.Result optimizationResult = optimizePlan(currentPlan, symbolAllocator, runtimeInfoProvider, idAllocator);

// Check whether there are some changes in the plan after optimization
if (optimizationResult.changedPlanNodes().isEmpty()) {
Expand Down Expand Up @@ -180,7 +181,6 @@ public SubPlan optimize(SubPlan root, RuntimeInfoProvider runtimeInfoProvider)
private AdaptivePlanOptimizer.Result optimizePlan(
PlanNode plan,
SymbolAllocator symbolAllocator,
TableStatsProvider tableStatsProvider,
RuntimeInfoProvider runtimeInfoProvider,
PlanNodeIdAllocator idAllocator)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public enum Stage
private final CostCalculator costCalculator;
private final WarningCollector warningCollector;
private final PlanOptimizersStatsCollector planOptimizersStatsCollector;
private final CachingTableStatsProvider tableStatsProvider;

public LogicalPlanner(
Session session,
Expand All @@ -196,9 +197,10 @@ public LogicalPlanner(
StatsCalculator statsCalculator,
CostCalculator costCalculator,
WarningCollector warningCollector,
PlanOptimizersStatsCollector planOptimizersStatsCollector)
PlanOptimizersStatsCollector planOptimizersStatsCollector,
CachingTableStatsProvider tableStatsProvider)
{
this(session, planOptimizers, DISTRIBUTED_PLAN_SANITY_CHECKER, idAllocator, plannerContext, typeAnalyzer, statsCalculator, costCalculator, warningCollector, planOptimizersStatsCollector);
this(session, planOptimizers, DISTRIBUTED_PLAN_SANITY_CHECKER, idAllocator, plannerContext, typeAnalyzer, statsCalculator, costCalculator, warningCollector, planOptimizersStatsCollector, tableStatsProvider);
}

public LogicalPlanner(
Expand All @@ -211,7 +213,8 @@ public LogicalPlanner(
StatsCalculator statsCalculator,
CostCalculator costCalculator,
WarningCollector warningCollector,
PlanOptimizersStatsCollector planOptimizersStatsCollector)
PlanOptimizersStatsCollector planOptimizersStatsCollector,
CachingTableStatsProvider tableStatsProvider)
{
this.session = requireNonNull(session, "session is null");
this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null");
Expand All @@ -225,6 +228,7 @@ public LogicalPlanner(
this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
this.warningCollector = requireNonNull(warningCollector, "warningCollector is null");
this.planOptimizersStatsCollector = requireNonNull(planOptimizersStatsCollector, "planOptimizersStatsCollector is null");
this.tableStatsProvider = requireNonNull(tableStatsProvider, "tableStatsProvider is null");
}

public Plan plan(Analysis analysis)
Expand Down Expand Up @@ -260,8 +264,6 @@ public Plan plan(Analysis analysis, Stage stage, boolean collectPlanStatistics)
planSanityChecker.validateIntermediatePlan(root, session, plannerContext, typeAnalyzer, symbolAllocator.getTypes(), warningCollector);
}

CachingTableStatsProvider tableStatsProvider = new CachingTableStatsProvider(metadata, session);

if (stage.ordinal() >= OPTIMIZED.ordinal()) {
try (var ignored = scopedSpan(plannerContext.getTracer(), "optimizer")) {
for (PlanOptimizer optimizer : planOptimizers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.connector.system.TableCommentSystemTable;
import io.trino.connector.system.TablePropertiesSystemTable;
import io.trino.connector.system.TransactionsSystemTable;
import io.trino.cost.CachingTableStatsProvider;
import io.trino.cost.ComposableStatsCalculator;
import io.trino.cost.CostCalculator;
import io.trino.cost.CostCalculatorUsingExchanges;
Expand Down Expand Up @@ -864,7 +865,8 @@ public Plan createPlan(Session session, @Language("SQL") String sql, List<PlanOp
statsCalculator,
costCalculator,
warningCollector,
planOptimizersStatsCollector);
planOptimizersStatsCollector,
new CachingTableStatsProvider(getPlannerContext().getMetadata(), session));

Analysis analysis = analyzer.analyze(preparedQuery.getStatement());
// make PlanTester always compute plan statistics for test purposes
Expand All @@ -881,7 +883,8 @@ public SubPlan createAdaptivePlan(Session session, SubPlan subPlan, List<Adaptiv
new PlanSanityChecker(false),
new IrTypeAnalyzer(plannerContext),
warningCollector,
planOptimizersStatsCollector);
planOptimizersStatsCollector,
new CachingTableStatsProvider(getPlannerContext().getMetadata(), session));
return adaptivePlanner.optimize(subPlan, runtimeInfoProvider);
}

Expand Down

0 comments on commit 69f01b3

Please sign in to comment.