From 69f01b30c7c31df516a7a299ac3a16022b112f39 Mon Sep 17 00:00:00 2001 From: Gaurav Sehgal Date: Tue, 9 Jan 2024 13:42:46 +0530 Subject: [PATCH] Use CachedTableStatsProvider across query --- .../io/trino/execution/SqlQueryExecution.java | 44 ++++++++++--------- .../io/trino/sql/analyzer/QueryExplainer.java | 4 +- .../io/trino/sql/planner/AdaptivePlanner.java | 10 ++--- .../io/trino/sql/planner/LogicalPlanner.java | 12 ++--- .../java/io/trino/testing/PlanTester.java | 7 ++- 5 files changed, 44 insertions(+), 33 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java index f03ec6c9d0fd..d924f73eaebf 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java @@ -13,6 +13,7 @@ */ package io.trino.execution; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.errorprone.annotations.ThreadSafe; import com.google.inject.Inject; @@ -24,6 +25,7 @@ import io.opentelemetry.context.Context; import io.trino.Session; import io.trino.SystemSessionProperties; +import io.trino.cost.CachingTableStatsProvider; import io.trino.cost.CostCalculator; import io.trino.cost.StatsCalculator; import io.trino.exchange.ExchangeManagerRegistry; @@ -119,7 +121,7 @@ public class SqlQueryExecution private final OutputStatsEstimatorFactory outputStatsEstimatorFactory; private final TaskExecutionStats taskExecutionStats; private final List planOptimizers; - private final AdaptivePlanner adaptivePlanner; + private final List adaptivePlanOptimizers; private final PlanFragmenter planFragmenter; private final RemoteTaskFactory remoteTaskFactory; private final int scheduleSplitBatchSize; @@ -213,17 +215,8 @@ private SqlQueryExecution( // analyze query this.analysis = analyze(preparedQuery, stateMachine, warningCollector, planOptimizersStatsCollector, analyzerFactory); - // create adaptive planner - requireNonNull(adaptivePlanOptimizers, "adaptivePlanOptimizers is null"); - this.adaptivePlanner = new AdaptivePlanner( - stateMachine.getSession(), - plannerContext, - adaptivePlanOptimizers, - planFragmenter, - DISTRIBUTED_PLAN_SANITY_CHECKER, - typeAnalyzer, - warningCollector, - planOptimizersStatsCollector); + // for adaptive planner + this.adaptivePlanOptimizers = ImmutableList.copyOf(requireNonNull(adaptivePlanOptimizers, "adaptivePlanOptimizers is null")); stateMachine.addStateChangeListener(state -> { if (!state.isDone()) { @@ -418,11 +411,12 @@ public void start() }, directExecutor()); try { - PlanRoot plan = planQuery(); + CachingTableStatsProvider tableStatsProvider = new CachingTableStatsProvider(plannerContext.getMetadata(), getSession()); + PlanRoot plan = planQuery(tableStatsProvider); // DynamicFilterService needs plan for query to be registered. // Query should be registered before dynamic filter suppliers are requested in distribution planning. registerDynamicFilteringQuery(plan); - planDistribution(plan); + planDistribution(plan, tableStatsProvider); } finally { synchronized (this) { @@ -474,20 +468,20 @@ public void addFinalQueryInfoListener(StateChangeListener stateChange stateMachine.addQueryInfoStateChangeListener(stateChangeListener); } - private PlanRoot planQuery() + private PlanRoot planQuery(CachingTableStatsProvider tableStatsProvider) { Span span = tracer.spanBuilder("planner") .setParent(Context.current().with(getSession().getQuerySpan())) .startSpan(); try (var ignored = scopedSpan(span)) { - return doPlanQuery(); + return doPlanQuery(tableStatsProvider); } catch (StackOverflowError e) { throw new TrinoException(STACK_OVERFLOW, "statement is too large (stack overflow during analysis)", e); } } - private PlanRoot doPlanQuery() + private PlanRoot doPlanQuery(CachingTableStatsProvider tableStatsProvider) { // plan query PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(); @@ -499,7 +493,8 @@ private PlanRoot doPlanQuery() statsCalculator, costCalculator, stateMachine.getWarningCollector(), - planOptimizersStatsCollector); + planOptimizersStatsCollector, + tableStatsProvider); Plan plan = logicalPlanner.plan(analysis); queryPlan.set(plan); @@ -520,7 +515,7 @@ private PlanRoot doPlanQuery() return new PlanRoot(fragmentedPlan, !explainAnalyze); } - private void planDistribution(PlanRoot plan) + private void planDistribution(PlanRoot plan, CachingTableStatsProvider tableStatsProvider) { // if query was canceled, skip creating scheduler if (stateMachine.isDone()) { @@ -580,7 +575,16 @@ private void planDistribution(PlanRoot plan) failureDetector, dynamicFilterService, taskExecutionStats, - adaptivePlanner, + new AdaptivePlanner( + stateMachine.getSession(), + plannerContext, + adaptivePlanOptimizers, + planFragmenter, + DISTRIBUTED_PLAN_SANITY_CHECKER, + typeAnalyzer, + stateMachine.getWarningCollector(), + planOptimizersStatsCollector, + tableStatsProvider), plan.getRoot()); break; default: diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/QueryExplainer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/QueryExplainer.java index 31052c723310..23dcfa0beb30 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/QueryExplainer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/QueryExplainer.java @@ -15,6 +15,7 @@ import io.trino.Session; import io.trino.client.NodeVersion; +import io.trino.cost.CachingTableStatsProvider; import io.trino.cost.CostCalculator; import io.trino.cost.StatsCalculator; import io.trino.execution.querystats.PlanOptimizersStatsCollector; @@ -172,7 +173,8 @@ public Plan getLogicalPlan(Session session, Statement statement, List= OPTIMIZED.ordinal()) { try (var ignored = scopedSpan(plannerContext.getTracer(), "optimizer")) { for (PlanOptimizer optimizer : planOptimizers) { diff --git a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java index 19287c9e135b..f91f7fcb2d45 100644 --- a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java +++ b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java @@ -45,6 +45,7 @@ import io.trino.connector.system.TableCommentSystemTable; import io.trino.connector.system.TablePropertiesSystemTable; import io.trino.connector.system.TransactionsSystemTable; +import io.trino.cost.CachingTableStatsProvider; import io.trino.cost.ComposableStatsCalculator; import io.trino.cost.CostCalculator; import io.trino.cost.CostCalculatorUsingExchanges; @@ -864,7 +865,8 @@ public Plan createPlan(Session session, @Language("SQL") String sql, List