Skip to content

Commit

Permalink
Remove predicate pushdowns in the Faker connector
Browse files Browse the repository at this point in the history
Predicate pushdown in the Faker connector violates the SQL semantics,
because when applied to separate columns, correlation between columns is
not preserved, and returned results are not deterministic. The `min`,
`max`, and `options` column properties should be used instead.
  • Loading branch information
nineinchnick committed Dec 26, 2024
1 parent 71e855e commit cc741a5
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 642 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableVersion;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RetryMode;
Expand All @@ -45,7 +43,6 @@
import io.trino.spi.function.FunctionMetadata;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.type.BigintType;
Expand Down Expand Up @@ -152,7 +149,7 @@ public synchronized ConnectorTableHandle getTableHandle(ConnectorSession session
}
long schemaLimit = (long) schema.properties().getOrDefault(SchemaInfo.DEFAULT_LIMIT_PROPERTY, defaultLimit);
long tableLimit = (long) tables.get(tableName).properties().getOrDefault(TableInfo.DEFAULT_LIMIT_PROPERTY, schemaLimit);
return new FakerTableHandle(tableName, TupleDomain.all(), tableLimit);
return new FakerTableHandle(tableName, tableLimit);
}

@Override
Expand Down Expand Up @@ -522,62 +519,6 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(
true));
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
ConnectorSession session,
ConnectorTableHandle table,
Constraint constraint)
{
FakerTableHandle fakerTable = (FakerTableHandle) table;

TupleDomain<ColumnHandle> summary = constraint.getSummary();
if (summary.isAll()) {
return Optional.empty();
}
// the only reason not to use isNone is so the linter doesn't complain about not checking an Optional
if (summary.getDomains().isEmpty()) {
throw new IllegalArgumentException("summary cannot be none");
}

TupleDomain<ColumnHandle> currentConstraint = fakerTable.constraint();
if (currentConstraint.getDomains().isEmpty()) {
throw new IllegalArgumentException("currentConstraint is none but should be all!");
}

// push down everything, unsupported constraints will throw an exception during data generation
boolean anyUpdated = false;
for (Map.Entry<ColumnHandle, Domain> entry : summary.getDomains().get().entrySet()) {
FakerColumnHandle column = (FakerColumnHandle) entry.getKey();
Domain domain = entry.getValue();

if (currentConstraint.getDomains().get().containsKey(column)) {
Domain currentDomain = currentConstraint.getDomains().get().get(column);
// it is important to avoid processing same constraint multiple times
// so that planner doesn't get stuck in a loop
if (currentDomain.equals(domain)) {
continue;
}
// TODO write test cases for this, it doesn't seem to work with IS NULL
currentDomain.union(domain);
}
else {
Map<ColumnHandle, Domain> domains = new HashMap<>(currentConstraint.getDomains().get());
domains.put(column, domain);
currentConstraint = TupleDomain.withColumnDomains(domains);
}
anyUpdated = true;
}
if (!anyUpdated) {
return Optional.empty();
}

return Optional.of(new ConstraintApplicationResult<>(
fakerTable.withConstraint(currentConstraint),
TupleDomain.all(),
constraint.getExpression(),
true));
}

@Override
public Collection<FunctionMetadata> listFunctions(ConnectorSession session, String schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.CharType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
Expand Down Expand Up @@ -126,7 +123,6 @@ class FakerPageSource
Faker faker,
Random random,
List<FakerColumnHandle> columns,
TupleDomain<ColumnHandle> constraint,
long offset,
long limit)
{
Expand All @@ -136,19 +132,17 @@ class FakerPageSource
.stream()
.map(FakerColumnHandle::type)
.collect(toImmutableList());
requireNonNull(constraint, "constraint is null");
this.limit = limit;

this.generators = columns
.stream()
.map(column -> getGenerator(column, constraint, offset))
.map(column -> getGenerator(column, offset))
.collect(toImmutableList());
this.pageBuilder = new PageBuilder(types);
}

private Generator getGenerator(
FakerColumnHandle column,
TupleDomain<ColumnHandle> constraint,
long offset)
{
if (ROW_ID_COLUMN_NAME.equals(column.name())) {
Expand All @@ -164,9 +158,7 @@ public void accept(BlockBuilder blockBuilder)
};
}

return constraintedValueGenerator(
column,
constraint.getDomains().get().getOrDefault(column, Domain.all(column.type())));
return constraintedValueGenerator(column);
}

@Override
Expand Down Expand Up @@ -230,24 +222,14 @@ public void close()
closed = true;
}

private Generator constraintedValueGenerator(FakerColumnHandle handle, Domain domain)
private Generator constraintedValueGenerator(FakerColumnHandle handle)
{
if (domain.isSingleValue()) {
ObjectWriter singleValueWriter = objectWriter(handle.type());
return (blockBuilder) -> singleValueWriter.accept(blockBuilder, domain.getSingleValue());
}
if (domain.getValues().isDiscreteSet() || handle.domain().getValues().isDiscreteSet()) {
List<Object> values = domain.getValues().isDiscreteSet()
? domain.getValues().getDiscreteSet()
: handle.domain().getValues().getDiscreteSet();
if (handle.domain().getValues().isDiscreteSet()) {
List<Object> values = handle.domain().getValues().getDiscreteSet();
ObjectWriter singleValueWriter = objectWriter(handle.type());
return (blockBuilder) -> singleValueWriter.accept(blockBuilder, values.get(random.nextInt(values.size())));
}
if (domain.getValues().getRanges().getRangeCount() > 1) {
// this would require calculating weights for each range to retain uniform distribution
throw new TrinoException(INVALID_ROW_FILTER, "Generating random values from more than one range is not supported");
}
Generator generator = randomValueGenerator(handle, domain.getValues().getRanges().getSpan());
Generator generator = randomValueGenerator(handle);
if (handle.nullProbability() == 0) {
return generator;
}
Expand All @@ -261,10 +243,9 @@ private Generator constraintedValueGenerator(FakerColumnHandle handle, Domain do
};
}

private Generator randomValueGenerator(FakerColumnHandle handle, Range predicateRange)
private Generator randomValueGenerator(FakerColumnHandle handle)
{
Range range = predicateRange.intersect(handle.domain().getValues().getRanges().getSpan())
.orElseThrow(() -> new TrinoException(INVALID_ROW_FILTER, "Predicates do not overlap with column min and max properties"));
Range range = handle.domain().getValues().getRanges().getSpan();
if (handle.generator() != null) {
if (!range.isAll()) {
throw new TrinoException(INVALID_ROW_FILTER, "Predicates for columns with a generator expression are not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ public ConnectorPageSource createPageSource(
.map(FakerColumnHandle.class::cast)
.collect(toImmutableList());

FakerTableHandle fakerTable = (FakerTableHandle) table;
FakerSplit fakerSplit = (FakerSplit) split;
Random random = random(fakerSplit.splitNumber());
return new FakerPageSource(new Faker(locale, random), random, handles, fakerTable.constraint(), fakerSplit.rowsOffset(), fakerSplit.rowsCount());
return new FakerPageSource(new Faker(locale, random), random, handles, fakerSplit.rowsOffset(), fakerSplit.rowsCount());
}

private Random random(long index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,21 @@

package io.trino.plugin.faker;

import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;

import static java.util.Objects.requireNonNull;

public record FakerTableHandle(SchemaTableName schemaTableName, TupleDomain<ColumnHandle> constraint, long limit)
public record FakerTableHandle(SchemaTableName schemaTableName, long limit)
implements ConnectorTableHandle
{
public FakerTableHandle
{
requireNonNull(schemaTableName, "schemaTableName is null");
requireNonNull(constraint, "constraint is null");
}

public FakerTableHandle withConstraint(TupleDomain<ColumnHandle> constraint)
{
return new FakerTableHandle(schemaTableName, constraint, limit);
}

public FakerTableHandle withLimit(long limit)
{
return new FakerTableHandle(schemaTableName, constraint, limit);
return new FakerTableHandle(schemaTableName, limit);
}
}
Loading

0 comments on commit cc741a5

Please sign in to comment.