Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor cleanups #24610

Merged
merged 10 commits into from
Jan 2, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,6 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
}
}

@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorSession session, ConnectorTableHandle tableHandle, String procedureName, Map<String, Object> executeProperties, RetryMode retryMode)
{
Span span = startSpan("getTableHandleForExecute", tableHandle);
try (var _ = scopedSpan(span)) {
return delegate.getTableHandleForExecute(session, tableHandle, procedureName, executeProperties, retryMode);
}
}

@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorSession session, ConnectorAccessControl accessControl, ConnectorTableHandle tableHandle, String procedureName, Map<String, Object> executeProperties, RetryMode retryMode)
{
Expand Down Expand Up @@ -761,15 +752,6 @@ public Optional<ConnectorPartitioningHandle> getUpdateLayout(ConnectorSession se
}
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
Span span = startSpan("beginMerge", tableHandle);
try (var _ = scopedSpan(span)) {
return delegate.beginMerge(session, tableHandle, retryMode);
}
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel
@Override
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transaction)
{
return metadataWrapper.apply(new MockConnectorMetadata(allowSplittingReadIntoMultipleSubQueries));
return metadataWrapper.apply(new MockConnectorMetadata());
}

@Override
Expand Down Expand Up @@ -448,13 +448,6 @@ public Set<ConnectorCapabilities> getCapabilities()
private class MockConnectorMetadata
implements ConnectorMetadata
{
private final boolean allowSplittingReadIntoMultipleSubQueries;

public MockConnectorMetadata(boolean allowSplittingReadIntoMultipleSubQueries)
{
this.allowSplittingReadIntoMultipleSubQueries = allowSplittingReadIntoMultipleSubQueries;
}

@Override
public boolean schemaExists(ConnectorSession session, String schemaName)
{
Expand Down Expand Up @@ -876,7 +869,7 @@ public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, Connecto
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
{
return new MockConnectorMergeTableHandle((MockConnectorTableHandle) tableHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,29 +116,6 @@ default ConnectorTableHandle getTableHandle(
throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata getTableHandle() is not implemented");
}

/**
* Create initial handle for execution of table procedure. The handle will be used through planning process. It will be converted to final
* handle used for execution via @{link {@link ConnectorMetadata#beginTableExecute}
* <p>
* If connector does not support execution with retries, the method should throw:
* <pre>
* new TrinoException(NOT_SUPPORTED, "This connector does not support query retries")
* </pre>
* unless {@code retryMode} is set to {@code NO_RETRIES}.
*
* @deprecated {Use {@link #getTableHandleForExecute(ConnectorSession, ConnectorAccessControl, ConnectorTableHandle, String, Map, RetryMode)}}
*/
@Deprecated
default Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
ConnectorSession session,
ConnectorTableHandle tableHandle,
String procedureName,
Map<String, Object> executeProperties,
RetryMode retryMode)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support table procedures");
}

/**
* Create initial handle for execution of table procedure. The handle will be used through planning process. It will be converted to final
* handle used for execution via @{link {@link ConnectorMetadata#beginTableExecute}
Expand All @@ -157,7 +134,7 @@ default Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
Map<String, Object> executeProperties,
RetryMode retryMode)
{
return getTableHandleForExecute(session, tableHandle, procedureName, executeProperties, retryMode);
throw new TrinoException(NOT_SUPPORTED, "This connector does not support table procedures");
}

default Optional<ConnectorTableLayout> getLayoutForTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
Expand Down Expand Up @@ -885,25 +862,13 @@ default Optional<ConnectorPartitioningHandle> getUpdateLayout(ConnectorSession s
return Optional.empty();
}

/**
* Do whatever is necessary to start an MERGE query, returning the {@link ConnectorMergeTableHandle}
* instance that will be passed to the PageSink, and to the {@link #finishMerge} method.
*
* @deprecated {Use {@link #beginMerge(ConnectorSession, ConnectorTableHandle, Map, RetryMode)}}
*/
@Deprecated
default ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE);
}

/**
* Do whatever is necessary to start an MERGE query, returning the {@link ConnectorMergeTableHandle}
* instance that will be passed to the PageSink, and to the {@link #finishMerge} method.
*/
default ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
{
return beginMerge(session, tableHandle, retryMode);
throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,6 @@ public List<String> listSchemaNames(ConnectorSession session)
}
}

@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorSession session, ConnectorTableHandle tableHandle, String procedureName, Map<String, Object> executeProperties, RetryMode retryMode)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
return delegate.getTableHandleForExecute(session, tableHandle, procedureName, executeProperties, retryMode);
}
}

@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorSession session, ConnectorAccessControl accessControl, ConnectorTableHandle tableHandle, String procedureName, Map<String, Object> executeProperties, RetryMode retryMode)
{
Expand Down Expand Up @@ -1206,14 +1198,6 @@ public Optional<ConnectorPartitioningHandle> getUpdateLayout(ConnectorSession se
return delegate.getUpdateLayout(session, tableHandle);
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
return delegate.beginMerge(session, tableHandle, retryMode);
}
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, Connecto
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
{
return new BlackHoleMergeTableHandle((BlackHoleTableHandle) tableHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableColumnsMetadata;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
import io.trino.spi.connector.ViewNotFoundException;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.expression.ConnectorExpression;
Expand Down Expand Up @@ -438,7 +437,6 @@ public class DeltaLakeMetadata
private final String nodeVersion;
private final String nodeId;
private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();
private final DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider;
private final CachingExtendedStatisticsAccess statisticsAccess;
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;
Expand Down Expand Up @@ -474,7 +472,6 @@ public DeltaLakeMetadata(
CheckpointWriterManager checkpointWriterManager,
long defaultCheckpointInterval,
boolean deleteSchemaLocationsFallback,
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
CachingExtendedStatisticsAccess statisticsAccess,
DeltaLakeTableMetadataScheduler metadataScheduler,
boolean useUniqueTableLocation,
Expand All @@ -497,7 +494,6 @@ public DeltaLakeMetadata(
this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier();
this.checkpointWriterManager = requireNonNull(checkpointWriterManager, "checkpointWriterManager is null");
this.defaultCheckpointInterval = defaultCheckpointInterval;
this.deltaLakeRedirectionsProvider = requireNonNull(deltaLakeRedirectionsProvider, "deltaLakeRedirectionsProvider is null");
this.statisticsAccess = requireNonNull(statisticsAccess, "statisticsAccess is null");
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
this.metadataScheduler = requireNonNull(metadataScheduler, "metadataScheduler is null");
Expand Down Expand Up @@ -2436,7 +2432,7 @@ public Optional<ConnectorPartitioningHandle> getUpdateLayout(ConnectorSession se
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
if (isAppendOnly(handle.getMetadataEntry(), handle.getProtocolEntry())) {
Expand Down Expand Up @@ -3576,12 +3572,6 @@ public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
}
}

@Override
public Optional<TableScanRedirectApplicationResult> applyTableScanRedirect(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return deltaLakeRedirectionsProvider.getTableScanRedirection(session, (DeltaLakeTableHandle) tableHandle);
}

@Override
public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Object> analyzeProperties)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public class DeltaLakeMetadataFactory
private final TransactionLogWriterFactory transactionLogWriterFactory;
private final NodeManager nodeManager;
private final CheckpointWriterManager checkpointWriterManager;
private final DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider;
private final CachingExtendedStatisticsAccess statisticsAccess;
private final int domainCompactionThreshold;
private final boolean unsafeWritesEnabled;
Expand All @@ -79,7 +78,6 @@ public DeltaLakeMetadataFactory(
TransactionLogWriterFactory transactionLogWriterFactory,
NodeManager nodeManager,
CheckpointWriterManager checkpointWriterManager,
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
CachingExtendedStatisticsAccess statisticsAccess,
@AllowDeltaLakeManagedTableRename boolean allowManagedTableRename,
NodeVersion nodeVersion,
Expand All @@ -96,7 +94,6 @@ public DeltaLakeMetadataFactory(
this.transactionLogWriterFactory = requireNonNull(transactionLogWriterFactory, "transactionLogWriterFactory is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.checkpointWriterManager = requireNonNull(checkpointWriterManager, "checkpointWriterManager is null");
this.deltaLakeRedirectionsProvider = requireNonNull(deltaLakeRedirectionsProvider, "deltaLakeRedirectionsProvider is null");
this.statisticsAccess = requireNonNull(statisticsAccess, "statisticsAccess is null");
this.domainCompactionThreshold = deltaLakeConfig.getDomainCompactionThreshold();
this.unsafeWritesEnabled = deltaLakeConfig.getUnsafeWritesEnabled();
Expand Down Expand Up @@ -148,7 +145,6 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
checkpointWriterManager,
checkpointWritingInterval,
deleteSchemaLocationsFallback,
deltaLakeRedirectionsProvider,
statisticsAccess,
metadataScheduler,
useUniqueTableLocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ public void setup(Binder binder)
binder.bind(NoIsolationSynchronizer.class).in(Scopes.SINGLETON);
newMapBinder(binder, String.class, TransactionLogSynchronizer.class);

newOptionalBinder(binder, DeltaLakeRedirectionsProvider.class)
.setDefault().toInstance(DeltaLakeRedirectionsProvider.NOOP);

jsonCodecBinder(binder).bindJsonCodec(DataFileInfo.class);
jsonCodecBinder(binder).bindJsonCodec(DeltaLakeMergeResult.class);
binder.bind(DeltaLakeWriterStats.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,5 @@

public interface DeltaLakeRedirectionsProvider
{
DeltaLakeRedirectionsProvider NOOP = (session, tableHandle) -> Optional.empty();

Optional<TableScanRedirectApplicationResult> getTableScanRedirection(ConnectorSession session, DeltaLakeTableHandle tableHandle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableMap;
import io.airlift.bootstrap.ApplicationConfigurationException;
import io.trino.plugin.hive.HiveConfig;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.testing.TestingConnectorContext;
Expand Down Expand Up @@ -165,7 +164,7 @@ public void testHiveConfigIsNotBound()
ImmutableMap.of(
"hive.metastore.uri", "thrift://foo:1234",
// Try setting any property provided by HiveConfig class
HiveConfig.CONFIGURATION_HIVE_PARTITION_PROJECTION_ENABLED, "true",
"hive.partition-projection-enabled", "true",
"bootstrap.quiet", "true"),
new TestingConnectorContext()))
.hasMessageContaining("Error: Configuration property 'hive.partition-projection-enabled' was not used");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ public Stream<AddFileEntry> getActiveFiles(
new TransactionLogSynchronizerManager(ImmutableMap.of(), new NoIsolationSynchronizer(hdfsFileSystemFactory))),
new TestingNodeManager(),
checkpointWriterManager,
DeltaLakeRedirectionsProvider.NOOP,
new CachingExtendedStatisticsAccess(new MetaDirStatisticsAccess(HDFS_FILE_SYSTEM_FACTORY, new JsonCodecFactory().jsonCodec(ExtendedStatistics.class))),
true,
new NodeVersion("test_version"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
})
public class HiveConfig
{
public static final String CONFIGURATION_HIVE_PARTITION_PROJECTION_ENABLED = "hive.partition-projection-enabled";

private boolean singleStatementWritesOnly;

private DataSize maxSplitSize = DataSize.of(64, MEGABYTE);
Expand Down Expand Up @@ -1249,7 +1247,7 @@ public boolean isPartitionProjectionEnabled()
return partitionProjectionEnabled;
}

@Config(CONFIGURATION_HIVE_PARTITION_PROJECTION_ENABLED)
@Config("hive.partition-projection-enabled")
@ConfigDescription("Enables AWS Athena partition projection")
public HiveConfig setPartitionProjectionEnabled(boolean enabledAthenaPartitionProjection)
{
Expand Down
Loading
Loading