From d17ae7abc4c1298c4b1329623d974d882aaf22c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Mon, 16 Oct 2023 16:50:49 +0200 Subject: [PATCH] Refactor stage output data size estimation Extract OutputDataSizeEstimator and model current logic as a series of implementations of it wrapped in CompositeOutputDataSizeEstimator. --- .../io/trino/execution/SqlQueryExecution.java | 9 + .../ByEagerParentOutputDataSizeEstimator.java | 54 ++++ .../BySmallStageOutputDataSizeEstimator.java | 147 +++++++++++ ...ByTaskProgressOutputDataSizeEstimator.java | 76 ++++++ .../CompositeOutputDataSizeEstimator.java | 86 +++++++ ...ventDrivenFaultTolerantQueryScheduler.java | 234 +++--------------- .../OutputDataSizeEstimator.java | 54 ++++ .../OutputDataSizeEstimatorFactory.java | 21 ++ .../io/trino/server/CoordinatorModule.java | 30 +++ 9 files changed, 506 insertions(+), 205 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ByEagerParentOutputDataSizeEstimator.java create mode 100644 core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BySmallStageOutputDataSizeEstimator.java create mode 100644 core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ByTaskProgressOutputDataSizeEstimator.java create mode 100644 core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/CompositeOutputDataSizeEstimator.java create mode 100644 core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/OutputDataSizeEstimator.java create mode 100644 core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/OutputDataSizeEstimatorFactory.java diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java index 805322d2a917..b309706b68f6 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java @@ -38,6 +38,7 @@ import io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler; import io.trino.execution.scheduler.faulttolerant.EventDrivenTaskSourceFactory; import io.trino.execution.scheduler.faulttolerant.NodeAllocatorService; +import io.trino.execution.scheduler.faulttolerant.OutputDataSizeEstimatorFactory; import io.trino.execution.scheduler.faulttolerant.PartitionMemoryEstimatorFactory; import io.trino.execution.scheduler.faulttolerant.TaskDescriptorStorage; import io.trino.execution.scheduler.policy.ExecutionPolicy; @@ -112,6 +113,7 @@ public class SqlQueryExecution private final NodeScheduler nodeScheduler; private final NodeAllocatorService nodeAllocatorService; private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; + private final OutputDataSizeEstimatorFactory outputDataSizeEstimatorFactory; private final TaskExecutionStats taskExecutionStats; private final List planOptimizers; private final PlanFragmenter planFragmenter; @@ -150,6 +152,7 @@ private SqlQueryExecution( NodeScheduler nodeScheduler, NodeAllocatorService nodeAllocatorService, PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, + OutputDataSizeEstimatorFactory outputDataSizeEstimatorFactory, TaskExecutionStats taskExecutionStats, List planOptimizers, PlanFragmenter planFragmenter, @@ -182,6 +185,7 @@ private SqlQueryExecution( this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null"); this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); + this.outputDataSizeEstimatorFactory = requireNonNull(outputDataSizeEstimatorFactory, "outputDataSizeEstimatorFactory is null"); this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null"); this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null"); @@ -552,6 +556,7 @@ private void planDistribution(PlanRoot plan) tracer, schedulerStats, partitionMemoryEstimatorFactory, + outputDataSizeEstimatorFactory, nodePartitioningManager, exchangeManagerRegistry.getExchangeManager(), nodeAllocatorService, @@ -747,6 +752,7 @@ public static class SqlQueryExecutionFactory private final NodeScheduler nodeScheduler; private final NodeAllocatorService nodeAllocatorService; private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; + private final OutputDataSizeEstimatorFactory outputDataSizeEstimatorFactory; private final TaskExecutionStats taskExecutionStats; private final List planOptimizers; private final PlanFragmenter planFragmenter; @@ -777,6 +783,7 @@ public static class SqlQueryExecutionFactory NodeScheduler nodeScheduler, NodeAllocatorService nodeAllocatorService, PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, + OutputDataSizeEstimatorFactory outputDataSizeEstimatorFactory, TaskExecutionStats taskExecutionStats, PlanOptimizersFactory planOptimizersFactory, PlanFragmenter planFragmenter, @@ -807,6 +814,7 @@ public static class SqlQueryExecutionFactory this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null"); this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); + this.outputDataSizeEstimatorFactory = requireNonNull(outputDataSizeEstimatorFactory, "outputDataSizeEstimatorFactory is null"); this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null"); this.remoteTaskFactory = requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"); @@ -851,6 +859,7 @@ public QueryExecution createQueryExecution( nodeScheduler, nodeAllocatorService, partitionMemoryEstimatorFactory, + outputDataSizeEstimatorFactory, taskExecutionStats, planOptimizers, planFragmenter, diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ByEagerParentOutputDataSizeEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ByEagerParentOutputDataSizeEstimator.java new file mode 100644 index 000000000000..5e8bf8b03574 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ByEagerParentOutputDataSizeEstimator.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler.faulttolerant; + +import com.google.common.primitives.ImmutableLongArray; +import io.trino.Session; +import io.trino.execution.StageId; +import io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler.StageExecution; + +import java.util.Optional; +import java.util.function.Function; + +public class ByEagerParentOutputDataSizeEstimator + implements OutputDataSizeEstimator +{ + public static class Factory + implements OutputDataSizeEstimatorFactory + { + @Override + public OutputDataSizeEstimator create(Session session) + { + return new ByEagerParentOutputDataSizeEstimator(); + } + } + + @Override + public Optional getEstimatedOutputDataSize(StageExecution stageExecution, Function stageExecutionLookup, boolean parentEager) + { + if (!parentEager) { + return Optional.empty(); + } + + // use empty estimate as fallback for eager parents. It matches current logic of assessing if node should be processed eagerly or not. + // Currently, we use eager task exectuion only for stages with small FINAL LIMIT which implies small input from child stages (child stages will + // enforce small input via PARTIAL LIMIT) + int outputPartitionsCount = stageExecution.getSinkPartitioningScheme().getPartitionCount(); + ImmutableLongArray.Builder estimateBuilder = ImmutableLongArray.builder(outputPartitionsCount); + for (int i = 0; i < outputPartitionsCount; ++i) { + estimateBuilder.add(0); + } + return Optional.of(new OutputDataSizeEstimateResult(estimateBuilder.build(), OutputDataSizeEstimateStatus.ESTIMATED_FOR_EAGER_PARENT)); + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BySmallStageOutputDataSizeEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BySmallStageOutputDataSizeEstimator.java new file mode 100644 index 000000000000..8a1a03556b0a --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BySmallStageOutputDataSizeEstimator.java @@ -0,0 +1,147 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler.faulttolerant; + +import com.google.common.primitives.ImmutableLongArray; +import io.airlift.units.DataSize; +import io.trino.Session; +import io.trino.execution.StageId; +import io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler.StageExecution; +import io.trino.spi.QueryId; +import io.trino.sql.planner.PlanFragment; +import io.trino.sql.planner.plan.PlanFragmentId; +import io.trino.sql.planner.plan.RemoteSourceNode; + +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionSmallStageEstimationThreshold; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionSmallStageSourceSizeMultiplier; +import static io.trino.SystemSessionProperties.isFaultTolerantExecutionSmallStageEstimationEnabled; +import static io.trino.SystemSessionProperties.isFaultTolerantExecutionSmallStageRequireNoMorePartitions; +import static java.util.Objects.requireNonNull; + +public class BySmallStageOutputDataSizeEstimator + implements OutputDataSizeEstimator +{ + public static class Factory + implements OutputDataSizeEstimatorFactory + { + @Override + public OutputDataSizeEstimator create(Session session) + { + return new BySmallStageOutputDataSizeEstimator( + session.getQueryId(), + isFaultTolerantExecutionSmallStageEstimationEnabled(session), + getFaultTolerantExecutionSmallStageEstimationThreshold(session), + getFaultTolerantExecutionSmallStageSourceSizeMultiplier(session), + getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(session), + isFaultTolerantExecutionSmallStageRequireNoMorePartitions(session)); + } + } + + private final QueryId queryId; + private final boolean smallStageEstimationEnabled; + private final DataSize smallStageEstimationThreshold; + private final double smallStageSourceSizeMultiplier; + private final DataSize smallSizePartitionSizeEstimate; + private final boolean smallStageRequireNoMorePartitions; + + private BySmallStageOutputDataSizeEstimator( + QueryId queryId, + boolean smallStageEstimationEnabled, + DataSize smallStageEstimationThreshold, + double smallStageSourceSizeMultiplier, + DataSize smallSizePartitionSizeEstimate, + boolean smallStageRequireNoMorePartitions) + { + this.queryId = requireNonNull(queryId, "queryId is null"); + this.smallStageEstimationEnabled = smallStageEstimationEnabled; + this.smallStageEstimationThreshold = requireNonNull(smallStageEstimationThreshold, "smallStageEstimationThreshold is null"); + this.smallStageSourceSizeMultiplier = smallStageSourceSizeMultiplier; + this.smallSizePartitionSizeEstimate = requireNonNull(smallSizePartitionSizeEstimate, "smallSizePartitionSizeEstimate is null"); + this.smallStageRequireNoMorePartitions = smallStageRequireNoMorePartitions; + } + + @Override + public Optional getEstimatedOutputDataSize(StageExecution stageExecution, Function stageExecutionLookup, boolean parentEager) + { + if (!smallStageEstimationEnabled) { + return Optional.empty(); + } + + if (smallStageRequireNoMorePartitions && !stageExecution.isNoMorePartitions()) { + return Optional.empty(); + } + + long[] currentOutputDataSize = stageExecution.currentOutputDataSize(); + long totaleOutputDataSize = 0; + for (long partitionOutputDataSize : currentOutputDataSize) { + totaleOutputDataSize += partitionOutputDataSize; + } + if (totaleOutputDataSize > smallStageEstimationThreshold.toBytes()) { + // our output is too big already + return Optional.empty(); + } + + PlanFragment planFragment = stageExecution.getStageInfo().getPlan(); + boolean hasPartitionedSources = planFragment.getPartitionedSources().size() > 0; + List remoteSourceNodes = planFragment.getRemoteSourceNodes(); + + long partitionedInputSizeEstimate = 0; + if (hasPartitionedSources) { + if (!stageExecution.isNoMorePartitions()) { + // stage is reading directly from table + // for leaf stages require all tasks to be enumerated + return Optional.empty(); + } + // estimate partitioned input based on number of task partitions + partitionedInputSizeEstimate += stageExecution.getPartitionsCount() * smallSizePartitionSizeEstimate.toBytes(); + } + + long remoteInputSizeEstimate = 0; + for (RemoteSourceNode remoteSourceNode : remoteSourceNodes) { + for (PlanFragmentId sourceFragmentId : remoteSourceNode.getSourceFragmentIds()) { + StageId sourceStageId = StageId.create(queryId, sourceFragmentId); + + StageExecution sourceStage = stageExecutionLookup.apply(sourceStageId); + requireNonNull(sourceStage, "sourceStage is null"); + Optional sourceStageOutputDataSize = sourceStage.getOutputDataSize(stageExecutionLookup, false); + + if (sourceStageOutputDataSize.isEmpty()) { + // cant estimate size of one of sources; should not happen in practice + return Optional.empty(); + } + + remoteInputSizeEstimate += sourceStageOutputDataSize.orElseThrow().outputDataSizeEstimate().getTotalSizeInBytes(); + } + } + + long inputSizeEstimate = (long) ((partitionedInputSizeEstimate + remoteInputSizeEstimate) * smallStageSourceSizeMultiplier); + if (inputSizeEstimate > smallStageEstimationThreshold.toBytes()) { + return Optional.empty(); + } + + int outputPartitionsCount = stageExecution.getSinkPartitioningScheme().getPartitionCount(); + ImmutableLongArray.Builder estimateBuilder = ImmutableLongArray.builder(outputPartitionsCount); + for (int i = 0; i < outputPartitionsCount; ++i) { + // assume uniform distribution + // TODO; should we use distribution as in this.outputDataSize if we have some data there already? + estimateBuilder.add(inputSizeEstimate / outputPartitionsCount); + } + return Optional.of(new OutputDataSizeEstimateResult(estimateBuilder.build(), OutputDataSizeEstimateStatus.ESTIMATED_BY_SMALL_INPUT)); + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ByTaskProgressOutputDataSizeEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ByTaskProgressOutputDataSizeEstimator.java new file mode 100644 index 000000000000..4b3e42eb6c40 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ByTaskProgressOutputDataSizeEstimator.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler.faulttolerant; + +import com.google.common.primitives.ImmutableLongArray; +import io.trino.Session; +import io.trino.execution.StageId; +import io.trino.execution.scheduler.OutputDataSizeEstimate; +import io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler.StageExecution; + +import java.util.Optional; +import java.util.function.Function; + +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMinSourceStageProgress; + +public class ByTaskProgressOutputDataSizeEstimator + implements OutputDataSizeEstimator +{ + public static class Factory + implements OutputDataSizeEstimatorFactory + { + @Override + public OutputDataSizeEstimator create(Session session) + { + return new ByTaskProgressOutputDataSizeEstimator(getFaultTolerantExecutionMinSourceStageProgress(session)); + } + } + + private final double minSourceStageProgress; + + private ByTaskProgressOutputDataSizeEstimator(double minSourceStageProgress) + { + this.minSourceStageProgress = minSourceStageProgress; + } + + @Override + public Optional getEstimatedOutputDataSize(StageExecution stageExecution, Function stageExecutionLookup, boolean parentEager) + { + if (!stageExecution.isNoMorePartitions()) { + return Optional.empty(); + } + + int allPartitionsCount = stageExecution.getPartitionsCount(); + int remainingPartitionsCount = stageExecution.getRemainingPartitionsCount(); + + if (remainingPartitionsCount == allPartitionsCount) { + return Optional.empty(); + } + + double progress = (double) (allPartitionsCount - remainingPartitionsCount) / allPartitionsCount; + + if (progress < minSourceStageProgress) { + return Optional.empty(); + } + + long[] currentOutputDataSize = stageExecution.currentOutputDataSize(); + + ImmutableLongArray.Builder estimateBuilder = ImmutableLongArray.builder(currentOutputDataSize.length); + + for (long partitionSize : currentOutputDataSize) { + estimateBuilder.add((long) (partitionSize / progress)); + } + return Optional.of(new OutputDataSizeEstimateResult(new OutputDataSizeEstimate(estimateBuilder.build()), OutputDataSizeEstimateStatus.ESTIMATED_BY_PROGRESS)); + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/CompositeOutputDataSizeEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/CompositeOutputDataSizeEstimator.java new file mode 100644 index 000000000000..021a2cc8cd63 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/CompositeOutputDataSizeEstimator.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler.faulttolerant; + +import com.google.common.collect.ImmutableList; +import com.google.inject.BindingAnnotation; +import com.google.inject.Inject; +import io.trino.Session; +import io.trino.execution.StageId; +import io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler.StageExecution; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +public class CompositeOutputDataSizeEstimator + implements OutputDataSizeEstimator +{ + public static class Factory + implements OutputDataSizeEstimatorFactory + { + private final List delegateFactories; + + @Inject + public Factory(@ForCompositeOutputDataSizeEstimator List delegateFactories) + { + checkArgument(!delegateFactories.isEmpty(), "Got empty list of delegates"); + this.delegateFactories = ImmutableList.copyOf(delegateFactories); + } + + @Override + public OutputDataSizeEstimator create(Session session) + { + List estimators = delegateFactories.stream().map(factory -> factory.create(session)) + .collect(toImmutableList()); + return new CompositeOutputDataSizeEstimator(estimators); + } + } + + @Retention(RUNTIME) + @Target({FIELD, PARAMETER, METHOD}) + @BindingAnnotation + public @interface ForCompositeOutputDataSizeEstimator {} + + private final List estimators; + + private CompositeOutputDataSizeEstimator(List estimators) + { + this.estimators = ImmutableList.copyOf(estimators); + } + + @Override + public Optional getEstimatedOutputDataSize( + StageExecution stageExecution, + Function stageExecutionLookup, + boolean parentEager) + { + for (OutputDataSizeEstimator estimator : estimators) { + Optional result = estimator.getEstimatedOutputDataSize(stageExecution, stageExecutionLookup, parentEager); + if (result.isPresent()) { + return result; + } + } + return Optional.empty(); + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index c2914d0bc1a0..463628290dfe 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -63,6 +63,8 @@ import io.trino.execution.scheduler.SplitSchedulerStats; import io.trino.execution.scheduler.TaskExecutionStats; import io.trino.execution.scheduler.faulttolerant.NodeAllocator.NodeLease; +import io.trino.execution.scheduler.faulttolerant.OutputDataSizeEstimator.OutputDataSizeEstimateResult; +import io.trino.execution.scheduler.faulttolerant.OutputDataSizeEstimator.OutputDataSizeEstimateStatus; import io.trino.execution.scheduler.faulttolerant.PartitionMemoryEstimator.MemoryRequirements; import io.trino.execution.scheduler.faulttolerant.SplitAssigner.AssignmentResult; import io.trino.execution.scheduler.faulttolerant.SplitAssigner.Partition; @@ -133,13 +135,9 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.airlift.units.DataSize.succinctBytes; -import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxPartitionCount; -import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMinSourceStageProgress; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount; -import static io.trino.SystemSessionProperties.getFaultTolerantExecutionSmallStageEstimationThreshold; -import static io.trino.SystemSessionProperties.getFaultTolerantExecutionSmallStageSourceSizeMultiplier; import static io.trino.SystemSessionProperties.getMaxTasksWaitingForExecutionPerQuery; import static io.trino.SystemSessionProperties.getMaxTasksWaitingForNodePerStage; import static io.trino.SystemSessionProperties.getRetryDelayScaleFactor; @@ -148,8 +146,6 @@ import static io.trino.SystemSessionProperties.getRetryPolicy; import static io.trino.SystemSessionProperties.getTaskRetryAttemptsPerTask; import static io.trino.SystemSessionProperties.isFaultTolerantExecutionRuntimeAdaptivePartitioningEnabled; -import static io.trino.SystemSessionProperties.isFaultTolerantExecutionSmallStageEstimationEnabled; -import static io.trino.SystemSessionProperties.isFaultTolerantExecutionSmallStageRequireNoMorePartitions; import static io.trino.SystemSessionProperties.isFaultTolerantExecutionStageEstimationForEagerParentEnabled; import static io.trino.execution.BasicStageStats.aggregateBasicStageStats; import static io.trino.execution.StageState.ABORTED; @@ -205,6 +201,7 @@ public class EventDrivenFaultTolerantQueryScheduler private final Tracer tracer; private final SplitSchedulerStats schedulerStats; private final PartitionMemoryEstimatorFactory memoryEstimatorFactory; + private final OutputDataSizeEstimatorFactory outputDataSizeEstimatorFactory; private final NodePartitioningManager nodePartitioningManager; private final ExchangeManager exchangeManager; private final NodeAllocatorService nodeAllocatorService; @@ -212,12 +209,6 @@ public class EventDrivenFaultTolerantQueryScheduler private final DynamicFilterService dynamicFilterService; private final TaskExecutionStats taskExecutionStats; private final SubPlan originalPlan; - private final double minSourceStageProgress; - private final boolean smallStageEstimationEnabled; - private final DataSize smallStageEstimationThreshold; - private final double smallStageSourceSizeMultiplier; - private final DataSize smallSizePartitionSizeEstimate; - private final boolean smallStageRequireNoMorePartitions; private final boolean stageEstimationForEagerParentEnabled; private final StageRegistry stageRegistry; @@ -240,6 +231,7 @@ public EventDrivenFaultTolerantQueryScheduler( Tracer tracer, SplitSchedulerStats schedulerStats, PartitionMemoryEstimatorFactory memoryEstimatorFactory, + OutputDataSizeEstimatorFactory outputDataSizeEstimatorFactory, NodePartitioningManager nodePartitioningManager, ExchangeManager exchangeManager, NodeAllocatorService nodeAllocatorService, @@ -262,6 +254,7 @@ public EventDrivenFaultTolerantQueryScheduler( this.tracer = requireNonNull(tracer, "tracer is null"); this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null"); this.memoryEstimatorFactory = requireNonNull(memoryEstimatorFactory, "memoryEstimatorFactory is null"); + this.outputDataSizeEstimatorFactory = requireNonNull(outputDataSizeEstimatorFactory, "outputDataSizeEstimatorFactory is null"); this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "partitioningSchemeFactory is null"); this.exchangeManager = requireNonNull(exchangeManager, "exchangeManager is null"); this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null"); @@ -269,12 +262,7 @@ public EventDrivenFaultTolerantQueryScheduler( this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null"); this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null"); this.originalPlan = requireNonNull(originalPlan, "originalPlan is null"); - this.minSourceStageProgress = getFaultTolerantExecutionMinSourceStageProgress(queryStateMachine.getSession()); - this.smallStageEstimationEnabled = isFaultTolerantExecutionSmallStageEstimationEnabled(queryStateMachine.getSession()); - this.smallStageEstimationThreshold = getFaultTolerantExecutionSmallStageEstimationThreshold(queryStateMachine.getSession()); - this.smallStageSourceSizeMultiplier = getFaultTolerantExecutionSmallStageSourceSizeMultiplier(queryStateMachine.getSession()); - this.smallSizePartitionSizeEstimate = getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(queryStateMachine.getSession()); - this.smallStageRequireNoMorePartitions = isFaultTolerantExecutionSmallStageRequireNoMorePartitions(queryStateMachine.getSession()); + this.stageEstimationForEagerParentEnabled = isFaultTolerantExecutionStageEstimationForEagerParentEnabled(queryStateMachine.getSession()); stageRegistry = new StageRegistry(queryStateMachine, originalPlan); @@ -335,6 +323,7 @@ public synchronized void start() tracer, schedulerStats, memoryEstimatorFactory, + outputDataSizeEstimatorFactory.create(session), partitioningSchemeFactory, exchangeManager, getTaskRetryAttemptsPerTask(session) + 1, @@ -355,13 +344,7 @@ public synchronized void start() isFaultTolerantExecutionRuntimeAdaptivePartitioningEnabled(session), getFaultTolerantExecutionRuntimeAdaptivePartitioningPartitionCount(session), getFaultTolerantExecutionRuntimeAdaptivePartitioningMaxTaskSize(session), - minSourceStageProgress, - smallStageEstimationEnabled, - smallStageEstimationThreshold, - smallStageSourceSizeMultiplier, - smallSizePartitionSizeEstimate, - stageEstimationForEagerParentEnabled, - smallStageRequireNoMorePartitions); + stageEstimationForEagerParentEnabled); queryExecutor.submit(scheduler::run); } catch (Throwable t) { @@ -527,6 +510,7 @@ private static class Scheduler private final Tracer tracer; private final SplitSchedulerStats schedulerStats; private final PartitionMemoryEstimatorFactory memoryEstimatorFactory; + private final OutputDataSizeEstimator outputDataSizeEstimator; private final FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory; private final ExchangeManager exchangeManager; private final int maxTaskExecutionAttempts; @@ -541,12 +525,6 @@ private static class Scheduler private final boolean runtimeAdaptivePartitioningEnabled; private final int runtimeAdaptivePartitioningPartitionCount; private final long runtimeAdaptivePartitioningMaxTaskSizeInBytes; - private final double minSourceStageProgress; - private final boolean smallStageEstimationEnabled; - private final DataSize smallStageEstimationThreshold; - private final double smallStageSourceSizeMultiplier; - private final DataSize smallSizePartitionSizeEstimate; - private final boolean smallStageRequireNoMorePartitions; private final boolean stageEstimationForEagerParentEnabled; private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); @@ -583,6 +561,7 @@ public Scheduler( Tracer tracer, SplitSchedulerStats schedulerStats, PartitionMemoryEstimatorFactory memoryEstimatorFactory, + OutputDataSizeEstimator outputDataSizeEstimator, FaultTolerantPartitioningSchemeFactory partitioningSchemeFactory, ExchangeManager exchangeManager, int maxTaskExecutionAttempts, @@ -599,13 +578,7 @@ public Scheduler( boolean runtimeAdaptivePartitioningEnabled, int runtimeAdaptivePartitioningPartitionCount, DataSize runtimeAdaptivePartitioningMaxTaskSize, - double minSourceStageProgress, - boolean smallStageEstimationEnabled, - DataSize smallStageEstimationThreshold, - double smallStageSourceSizeMultiplier, - DataSize smallSizePartitionSizeEstimate, - boolean stageEstimationForEagerParentEnabled, - boolean smallStageRequireNoMorePartitions) + boolean stageEstimationForEagerParentEnabled) { this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null"); this.metadata = requireNonNull(metadata, "metadata is null"); @@ -619,6 +592,7 @@ public Scheduler( this.tracer = requireNonNull(tracer, "tracer is null"); this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null"); this.memoryEstimatorFactory = requireNonNull(memoryEstimatorFactory, "memoryEstimatorFactory is null"); + this.outputDataSizeEstimator = requireNonNull(outputDataSizeEstimator, "outputDataSizeEstimator is null"); this.partitioningSchemeFactory = requireNonNull(partitioningSchemeFactory, "partitioningSchemeFactory is null"); this.exchangeManager = requireNonNull(exchangeManager, "exchangeManager is null"); checkArgument(maxTaskExecutionAttempts > 0, "maxTaskExecutionAttempts must be greater than zero: %s", maxTaskExecutionAttempts); @@ -636,13 +610,7 @@ public Scheduler( this.runtimeAdaptivePartitioningEnabled = runtimeAdaptivePartitioningEnabled; this.runtimeAdaptivePartitioningPartitionCount = runtimeAdaptivePartitioningPartitionCount; this.runtimeAdaptivePartitioningMaxTaskSizeInBytes = requireNonNull(runtimeAdaptivePartitioningMaxTaskSize, "runtimeAdaptivePartitioningMaxTaskSize is null").toBytes(); - this.minSourceStageProgress = minSourceStageProgress; - this.smallStageEstimationEnabled = smallStageEstimationEnabled; - this.smallStageEstimationThreshold = requireNonNull(smallStageEstimationThreshold, "smallStageEstimationThreshold is null"); - this.smallStageSourceSizeMultiplier = smallStageSourceSizeMultiplier; - this.smallSizePartitionSizeEstimate = requireNonNull(smallSizePartitionSizeEstimate, "smallSizePartitionSizeEstimate is null"); this.stageEstimationForEagerParentEnabled = stageEstimationForEagerParentEnabled; - this.smallStageRequireNoMorePartitions = smallStageRequireNoMorePartitions; planInTopologicalOrder = sortPlanInTopologicalOrder(plan); } @@ -1203,24 +1171,18 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map getOutputDataSize(Function result = getEstimatedOutputDataSize().or(() -> getEstimatedSmallStageOutputDataSize(stageExecutionLookup)); - if (result.isEmpty() && parentEager) { - result = getEstimatedStageOutputSizeForEagerParent(); - } - return result; + return outputDataSizeEstimator.getEstimatedOutputDataSize(this, stageExecutionLookup, parentEager); } public boolean isSomeProgressMade() @@ -2149,113 +2088,6 @@ public boolean isSomeProgressMade() return partitions.size() > 0 && remainingPartitions.size() < partitions.size(); } - private Optional getEstimatedOutputDataSize() - { - if (!isNoMorePartitions()) { - return Optional.empty(); - } - - int allPartitionsCount = getPartitionsCount(); - int remainingPartitionsCount = getRemainingPartitionsCount(); - - if (remainingPartitionsCount == allPartitionsCount) { - return Optional.empty(); - } - - double progress = (double) (allPartitionsCount - remainingPartitionsCount) / allPartitionsCount; - - if (progress < minSourceStageProgress) { - return Optional.empty(); - } - - ImmutableLongArray.Builder estimateBuilder = ImmutableLongArray.builder(outputDataSize.length); - - for (long partitionSize : outputDataSize) { - estimateBuilder.add((long) (partitionSize / progress)); - } - return Optional.of(new OutputDataSizeEstimateResult(new OutputDataSizeEstimate(estimateBuilder.build()), OutputDataSizeEstimateStatus.ESTIMATED_BY_PROGRESS)); - } - - private Optional getEstimatedSmallStageOutputDataSize(Function stageExecutionLookup) - { - if (!smallStageEstimationEnabled) { - return Optional.empty(); - } - - if (smallStageRequireNoMorePartitions && !isNoMorePartitions()) { - return Optional.empty(); - } - - long currentOutputDataSize = 0; - for (long partitionOutputDataSize : outputDataSize) { - currentOutputDataSize += partitionOutputDataSize; - } - if (currentOutputDataSize > smallStageEstimationThreshold.toBytes()) { - // our output is too big already - return Optional.empty(); - } - - PlanFragment planFragment = this.getStageInfo().getPlan(); - boolean hasPartitionedSources = planFragment.getPartitionedSources().size() > 0; - List remoteSourceNodes = planFragment.getRemoteSourceNodes(); - - long partitionedInputSizeEstimate = 0; - if (hasPartitionedSources) { - if (!isNoMorePartitions()) { - // stage is reading directly from table - // for leaf stages require all tasks to be enumerated - return Optional.empty(); - } - // estimate partitioned input based on number of task partitions - partitionedInputSizeEstimate += this.getPartitionsCount() * smallSizePartitionSizeEstimate.toBytes(); - } - - long remoteInputSizeEstimate = 0; - for (RemoteSourceNode remoteSourceNode : remoteSourceNodes) { - for (PlanFragmentId sourceFragmentId : remoteSourceNode.getSourceFragmentIds()) { - StageId sourceStageId = StageId.create(queryStateMachine.getQueryId(), sourceFragmentId); - - StageExecution sourceStage = stageExecutionLookup.apply(sourceStageId); - requireNonNull(sourceStage, "sourceStage is null"); - Optional sourceStageOutputDataSize = sourceStage.getOutputDataSize(stageExecutionLookup, false); - - if (sourceStageOutputDataSize.isEmpty()) { - // cant estimate size of one of sources; should not happen in practice - return Optional.empty(); - } - - remoteInputSizeEstimate += sourceStageOutputDataSize.orElseThrow().outputDataSizeEstimate().getTotalSizeInBytes(); - } - } - - long inputSizeEstimate = (long) ((partitionedInputSizeEstimate + remoteInputSizeEstimate) * smallStageSourceSizeMultiplier); - if (inputSizeEstimate > smallStageEstimationThreshold.toBytes()) { - return Optional.empty(); - } - - int outputPartitionsCount = sinkPartitioningScheme.getPartitionCount(); - ImmutableLongArray.Builder estimateBuilder = ImmutableLongArray.builder(outputPartitionsCount); - for (int i = 0; i < outputPartitionsCount; ++i) { - // assume uniform distribution - // TODO; should we use distribution as in this.outputDataSize if we have some data there already? - estimateBuilder.add(inputSizeEstimate / outputPartitionsCount); - } - return Optional.of(new OutputDataSizeEstimateResult(estimateBuilder.build(), OutputDataSizeEstimateStatus.ESTIMATED_BY_SMALL_INPUT)); - } - - private Optional getEstimatedStageOutputSizeForEagerParent() - { - // use empty estimate as fallback for eager parents. It matches current logic of assessing if node should be processed eagerly or not. - // Currently, we use eager task exectuion only for stages with small FINAL LIMIT which implies small input from child stages (child stages will - // enforce small input via PARTIAL LIMIT) - int outputPartitionsCount = sinkPartitioningScheme.getPartitionCount(); - ImmutableLongArray.Builder estimateBuilder = ImmutableLongArray.builder(outputPartitionsCount); - for (int i = 0; i < outputPartitionsCount; ++i) { - estimateBuilder.add(0); - } - return Optional.of(new OutputDataSizeEstimateResult(estimateBuilder.build(), OutputDataSizeEstimateStatus.ESTIMATED_FOR_EAGER_PARENT)); - } - public ExchangeSourceOutputSelector getSinkOutputSelector() { if (finalSinkOutputSelector != null) { @@ -2326,28 +2158,20 @@ private StagePartition getStagePartition(int partitionId) checkState(partition != null, "partition with id %s does not exist in stage %s", partitionId, stage.getStageId()); return partition; } - } - - private enum OutputDataSizeEstimateStatus { - FINISHED, - ESTIMATED_BY_PROGRESS, - ESTIMATED_BY_SMALL_INPUT, - ESTIMATED_FOR_EAGER_PARENT - } - private record OutputDataSizeEstimateResult( - OutputDataSizeEstimate outputDataSizeEstimate, - OutputDataSizeEstimateStatus status) - { - OutputDataSizeEstimateResult(ImmutableLongArray partitionDataSizes, OutputDataSizeEstimateStatus status) + /** + * This returns current output data size as captured on internal long[] field. + * Returning internal mutable field is done due to performance reasons. + * It is not allowed for the caller to mutate contents of returned array. + */ + public long[] currentOutputDataSize() { - this(new OutputDataSizeEstimate(partitionDataSizes), status); + return outputDataSize; } - OutputDataSizeEstimateResult + public FaultTolerantPartitioningScheme getSinkPartitioningScheme() { - requireNonNull(outputDataSizeEstimate, "outputDataSizeEstimate is null"); - requireNonNull(status, "status is null"); + return sinkPartitioningScheme; } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/OutputDataSizeEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/OutputDataSizeEstimator.java new file mode 100644 index 000000000000..ffa4aba127a2 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/OutputDataSizeEstimator.java @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler.faulttolerant; + +import com.google.common.primitives.ImmutableLongArray; +import io.trino.execution.StageId; +import io.trino.execution.scheduler.OutputDataSizeEstimate; + +import java.util.Optional; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; + +public interface OutputDataSizeEstimator +{ + Optional getEstimatedOutputDataSize( + EventDrivenFaultTolerantQueryScheduler.StageExecution stageExecution, + Function stageExecutionLookup, + boolean parentEager); + + enum OutputDataSizeEstimateStatus { + FINISHED, + ESTIMATED_BY_PROGRESS, + ESTIMATED_BY_SMALL_INPUT, + ESTIMATED_FOR_EAGER_PARENT + } + + record OutputDataSizeEstimateResult( + OutputDataSizeEstimate outputDataSizeEstimate, + OutputDataSizeEstimateStatus status) + { + OutputDataSizeEstimateResult(ImmutableLongArray partitionDataSizes, OutputDataSizeEstimateStatus status) + { + this(new OutputDataSizeEstimate(partitionDataSizes), status); + } + + public OutputDataSizeEstimateResult + { + requireNonNull(outputDataSizeEstimate, "outputDataSizeEstimate is null"); + requireNonNull(status, "status is null"); + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/OutputDataSizeEstimatorFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/OutputDataSizeEstimatorFactory.java new file mode 100644 index 000000000000..7c806026a536 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/OutputDataSizeEstimatorFactory.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler.faulttolerant; + +import io.trino.Session; + +public interface OutputDataSizeEstimatorFactory +{ + OutputDataSizeEstimator create(Session session); +} diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index 844ac20135f8..94b506e3ec46 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -66,11 +66,16 @@ import io.trino.execution.scheduler.SplitSchedulerStats; import io.trino.execution.scheduler.TaskExecutionStats; import io.trino.execution.scheduler.faulttolerant.BinPackingNodeAllocatorService; +import io.trino.execution.scheduler.faulttolerant.ByEagerParentOutputDataSizeEstimator; +import io.trino.execution.scheduler.faulttolerant.BySmallStageOutputDataSizeEstimator; +import io.trino.execution.scheduler.faulttolerant.ByTaskProgressOutputDataSizeEstimator; +import io.trino.execution.scheduler.faulttolerant.CompositeOutputDataSizeEstimator; import io.trino.execution.scheduler.faulttolerant.EventDrivenTaskSourceFactory; import io.trino.execution.scheduler.faulttolerant.ExponentialGrowthPartitionMemoryEstimator; import io.trino.execution.scheduler.faulttolerant.NoMemoryAwarePartitionMemoryEstimator; import io.trino.execution.scheduler.faulttolerant.NoMemoryAwarePartitionMemoryEstimator.ForNoMemoryAwarePartitionMemoryEstimator; import io.trino.execution.scheduler.faulttolerant.NodeAllocatorService; +import io.trino.execution.scheduler.faulttolerant.OutputDataSizeEstimatorFactory; import io.trino.execution.scheduler.faulttolerant.PartitionMemoryEstimatorFactory; import io.trino.execution.scheduler.faulttolerant.TaskDescriptorStorage; import io.trino.execution.scheduler.policy.AllAtOnceExecutionPolicy; @@ -228,6 +233,31 @@ protected void setup(Binder binder) .annotatedWith(ForNoMemoryAwarePartitionMemoryEstimator.class) .to(ExponentialGrowthPartitionMemoryEstimator.Factory.class).in(Scopes.SINGLETON); + // output data size estimator + binder.bind(OutputDataSizeEstimatorFactory.class) + .to(CompositeOutputDataSizeEstimator.Factory.class) + .in(Scopes.SINGLETON); + binder.bind(ByTaskProgressOutputDataSizeEstimator.Factory.class).in(Scopes.SINGLETON); + binder.bind(BySmallStageOutputDataSizeEstimator.Factory.class).in(Scopes.SINGLETON); + binder.bind(ByEagerParentOutputDataSizeEstimator.Factory.class).in(Scopes.SINGLETON); + // use provider method returning list to ensure ordering + // OutputDataSizeEstimator factories are ordered starting from most accurate + install(new AbstractConfigurationAwareModule() { + @Override + protected void setup(Binder binder) {} + + @Provides + @Singleton + @CompositeOutputDataSizeEstimator.ForCompositeOutputDataSizeEstimator + List getCompositeOutputDataSizeEstimatorDelegateFactories( + ByTaskProgressOutputDataSizeEstimator.Factory byTaskProgressOutputDataSizeEstimatorFactory, + BySmallStageOutputDataSizeEstimator.Factory bySmallStageOutputDataSizeEstimatorFactory, + ByEagerParentOutputDataSizeEstimator.Factory byEagerParentOutputDataSizeEstimatorFactoryy) + { + return ImmutableList.of(byTaskProgressOutputDataSizeEstimatorFactory, bySmallStageOutputDataSizeEstimatorFactory, byEagerParentOutputDataSizeEstimatorFactoryy); + } + }); + // node monitor binder.bind(ClusterSizeMonitor.class).in(Scopes.SINGLETON); newExporter(binder).export(ClusterSizeMonitor.class).withGeneratedName();