Skip to content

Commit

Permalink
Fix procedures to not use HiveTableHandle
Browse files Browse the repository at this point in the history
This is to prevent unnecessary classcast issues when tableHandle is not always HiveTableHandle
  • Loading branch information
anusudarsan committed Jan 2, 2025
1 parent 1a4bdbf commit d30d9bd
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.plugin.base.util.UncheckedCloseable;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveInsertTableHandle;
import io.trino.plugin.hive.HiveTableHandle;
import io.trino.plugin.hive.LocationService;
import io.trino.plugin.hive.LocationService.WriteInfo;
import io.trino.plugin.hive.PartitionUpdate;
Expand All @@ -34,6 +33,7 @@
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.procedure.Procedure.Argument;
Expand Down Expand Up @@ -112,14 +112,16 @@ private void doCreateEmptyPartition(ConnectorSession session, ConnectorAccessCon
TransactionalMetadata hiveMetadata = hiveMetadataFactory.create(session.getIdentity(), true);
hiveMetadata.beginQuery(session);
try (UncheckedCloseable ignore = () -> hiveMetadata.cleanupQuery(session)) {
HiveTableHandle tableHandle = (HiveTableHandle) hiveMetadata.getTableHandle(session, new SchemaTableName(schemaName, tableName), Optional.empty(), Optional.empty());
ConnectorTableHandle tableHandle = hiveMetadata.getTableHandle(session, new SchemaTableName(schemaName, tableName), Optional.empty(), Optional.empty());
if (tableHandle == null) {
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", new SchemaTableName(schemaName, tableName)));
}

accessControl.checkCanInsertIntoTable(null, new SchemaTableName(schemaName, tableName));

List<String> actualPartitionColumnNames = tableHandle.getPartitionColumns().stream()
List<String> actualPartitionColumnNames = hiveMetadata.getColumnHandles(session, tableHandle).values().stream()
.map(HiveColumnHandle.class::cast)
.filter(HiveColumnHandle::isPartitionKey)
.map(HiveColumnHandle::getName)
.collect(toImmutableList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import io.trino.metastore.PartitionStatistics;
import io.trino.plugin.base.util.UncheckedCloseable;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveTableHandle;
import io.trino.plugin.hive.TransactionalMetadata;
import io.trino.plugin.hive.TransactionalMetadataFactory;
import io.trino.spi.TrinoException;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -107,12 +107,13 @@ private void doDropStats(ConnectorSession session, ConnectorAccessControl access
TransactionalMetadata hiveMetadata = hiveMetadataFactory.create(session.getIdentity(), true);
hiveMetadata.beginQuery(session);
try (UncheckedCloseable ignore = () -> hiveMetadata.cleanupQuery(session)) {
HiveTableHandle handle = (HiveTableHandle) hiveMetadata.getTableHandle(session, new SchemaTableName(schema, table), Optional.empty(), Optional.empty());
SchemaTableName schemaTableName = new SchemaTableName(schema, table);
ConnectorTableHandle handle = hiveMetadata.getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty());
if (handle == null) {
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", new SchemaTableName(schema, table)));
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", schemaTableName));
}

accessControl.checkCanInsertIntoTable(null, new SchemaTableName(schema, table));
accessControl.checkCanInsertIntoTable(null, schemaTableName);

Map<String, ColumnHandle> columns = hiveMetadata.getColumnHandles(session, handle);
List<String> partitionColumns = columns.values().stream()
Expand All @@ -131,7 +132,7 @@ private void doDropStats(ConnectorSession session, ConnectorAccessControl access

partitionStringValues.forEach(values -> metastore.updatePartitionStatistics(
metastore.getTable(schema, table)
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(schema, table))),
.orElseThrow(() -> new TableNotFoundException(schemaTableName)),
CLEAR_ALL,
ImmutableMap.of(
makePartName(partitionColumns, values),
Expand All @@ -150,10 +151,10 @@ private void doDropStats(ConnectorSession session, ConnectorAccessControl access
}
else {
// the table is partitioned; remove stats for every partition
hiveMetadata.getMetastore().getPartitionNamesByFilter(handle.getSchemaName(), handle.getTableName(), partitionColumns, TupleDomain.all())
hiveMetadata.getMetastore().getPartitionNamesByFilter(schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionColumns, TupleDomain.all())
.ifPresent(partitions -> partitions.forEach(partitionName -> metastore.updatePartitionStatistics(
metastore.getTable(schema, table)
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(schema, table))),
.orElseThrow(() -> new TableNotFoundException(schemaTableName)),
CLEAR_ALL,
ImmutableMap.of(
partitionName,
Expand Down

0 comments on commit d30d9bd

Please sign in to comment.