From 4c44ca662f24010abbb991684716f50835db57f3 Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Fri, 8 Dec 2023 13:34:46 -0500 Subject: [PATCH] Support vended s3 credentials in the Iceberg REST catalog --- .../io/trino/filesystem/s3/S3Context.java | 22 +- .../io/trino/filesystem/s3/S3FileSystem.java | 4 + .../filesystem/s3/S3FileSystemFactory.java | 16 +- .../io/trino/filesystem/s3/S3InputFile.java | 4 + .../trino/filesystem/s3/S3OutputStream.java | 7 + .../filesystem/s3/S3FileSystemConstants.java | 23 ++ plugin/trino-iceberg/pom.xml | 36 ++ .../iceberg/IcebergFileSystemFactory.java | 24 ++ .../trino/plugin/iceberg/IcebergMetadata.java | 38 +- .../iceberg/IcebergMetadataFactory.java | 5 +- .../trino/plugin/iceberg/IcebergModule.java | 3 + .../iceberg/IcebergPageSinkProvider.java | 11 +- .../iceberg/IcebergPageSourceProvider.java | 17 +- .../io/trino/plugin/iceberg/IcebergSplit.java | 16 +- .../plugin/iceberg/IcebergSplitManager.java | 6 +- .../plugin/iceberg/IcebergSplitSource.java | 13 +- .../iceberg/IcebergWritableTableHandle.java | 11 +- .../rest/DefaultIcebergFileSystemFactory.java | 42 +++ .../rest/IcebergRestCatalogConfig.java | 14 + .../IcebergRestCatalogFileSystemFactory.java | 71 ++++ .../rest/IcebergRestCatalogModule.java | 12 + .../rest/TrinoIcebergRestCatalogFactory.java | 15 +- .../iceberg/fileio/ForwardingFileIo.java | 21 ++ .../TableChangesFunctionProcessor.java | 1 + .../tablechanges/TableChangesSplit.java | 9 +- .../tablechanges/TableChangesSplitSource.java | 6 +- .../procedure/IcebergTableExecuteHandle.java | 16 +- ...stIcebergNodeLocalDynamicSplitPruning.java | 13 +- .../plugin/iceberg/TestIcebergPlugin.java | 19 + .../iceberg/TestIcebergSplitSource.java | 4 +- .../iceberg/catalog/BaseTrinoCatalogTest.java | 2 +- .../catalog/glue/TestTrinoGlueCatalog.java | 2 +- .../nessie/TestTrinoNessieCatalog.java | 2 +- .../rest/TestIcebergRestCatalogConfig.java | 7 +- ...ergTrinoRestCatalogConnectorSmokeTest.java | 3 +- ...gVendingRestCatalogConnectorSmokeTest.java | 355 ++++++++++++++++++ .../catalog/rest/TestTrinoRestCatalog.java | 2 +- .../IcebergRestCatalogBackendContainer.java | 57 +++ 38 files changed, 859 insertions(+), 70 deletions(-) create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/s3/S3FileSystemConstants.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileSystemFactory.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/DefaultIcebergFileSystemFactory.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java create mode 100644 testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergRestCatalogBackendContainer.java diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3Context.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3Context.java index 026acde55240..8a1456effe7b 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3Context.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3Context.java @@ -14,12 +14,16 @@ package io.trino.filesystem.s3; import io.trino.filesystem.s3.S3FileSystemConfig.S3SseType; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.services.s3.model.RequestPayer; +import java.util.Optional; + import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; -record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String sseKmsKeyId) +record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String sseKmsKeyId, Optional credentialsProviderOverride) { private static final int MIN_PART_SIZE = 5 * 1024 * 1024; // S3 requirement @@ -28,10 +32,26 @@ record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String checkArgument(partSize >= MIN_PART_SIZE, "partSize must be at least %s bytes", MIN_PART_SIZE); requireNonNull(sseType, "sseType is null"); checkArgument((sseType != S3SseType.KMS) || (sseKmsKeyId != null), "sseKmsKeyId is null for SSE-KMS"); + requireNonNull(credentialsProviderOverride, "credentialsProviderOverride is null"); } public RequestPayer requestPayer() { return requesterPays ? RequestPayer.REQUESTER : null; } + + public S3Context withCredentialsProviderOverride(AwsCredentialsProvider credentialsProviderOverride) + { + return new S3Context( + partSize, + requesterPays, + sseType, + sseKmsKeyId, + Optional.of(credentialsProviderOverride)); + } + + public void applyCredentialProviderOverride(AwsRequestOverrideConfiguration.Builder builder) + { + credentialsProviderOverride.ifPresent(builder::credentialsProvider); + } } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index e9b296349871..8228c6d45c58 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -87,6 +87,7 @@ public void deleteFile(Location location) location.verifyValidFileLocation(); S3Location s3Location = new S3Location(location); DeleteObjectRequest request = DeleteObjectRequest.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .requestPayer(requestPayer) .key(s3Location.key()) .bucket(s3Location.bucket()) @@ -136,6 +137,7 @@ public void deleteFiles(Collection locations) .toList(); DeleteObjectsRequest request = DeleteObjectsRequest.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .requestPayer(requestPayer) .bucket(bucket) .delete(builder -> builder.objects(objects).quiet(true)) @@ -177,6 +179,7 @@ public FileIterator listFiles(Location location) } ListObjectsV2Request request = ListObjectsV2Request.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .bucket(s3Location.bucket()) .prefix(key) .build(); @@ -230,6 +233,7 @@ public Set listDirectories(Location location) } ListObjectsV2Request request = ListObjectsV2Request.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .bucket(s3Location.bucket()) .prefix(key) .delimiter("/") diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java index 81dac4930095..8beada1d1b46 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystemFactory.java @@ -21,6 +21,8 @@ import io.trino.spi.security.ConnectorIdentity; import jakarta.annotation.PreDestroy; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.http.apache.ApacheHttpClient; @@ -35,6 +37,9 @@ import java.net.URI; import java.util.Optional; +import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY; +import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY; +import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY; import static java.lang.Math.toIntExact; public final class S3FileSystemFactory @@ -100,7 +105,8 @@ public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig confi toIntExact(config.getStreamingPartSize().toBytes()), config.isRequesterPays(), config.getSseType(), - config.getSseKmsKeyId()); + config.getSseKmsKeyId(), + Optional.empty()); } @PreDestroy @@ -112,6 +118,14 @@ public void destroy() @Override public TrinoFileSystem create(ConnectorIdentity identity) { + if (identity.getExtraCredentials().containsKey(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY)) { + AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsSessionCredentials.create( + identity.getExtraCredentials().get(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY), + identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY), + identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY))); + return new S3FileSystem(client, context.withCredentialsProviderOverride(credentialsProvider)); + } + return new S3FileSystem(client, context); } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java index 9df5d5169854..9fa15c6003bd 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java @@ -36,6 +36,7 @@ final class S3InputFile { private final S3Client client; private final S3Location location; + private final S3Context context; private final RequestPayer requestPayer; private Long length; private Instant lastModified; @@ -44,6 +45,7 @@ public S3InputFile(S3Client client, S3Context context, S3Location location, Long { this.client = requireNonNull(client, "client is null"); this.location = requireNonNull(location, "location is null"); + this.context = requireNonNull(context, "context is null"); this.requestPayer = context.requestPayer(); this.length = length; location.location().verifyValidFileLocation(); @@ -97,6 +99,7 @@ public Location location() private GetObjectRequest newGetObjectRequest() { return GetObjectRequest.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .requestPayer(requestPayer) .bucket(location.bucket()) .key(location.key()) @@ -107,6 +110,7 @@ private boolean headObject() throws IOException { HeadObjectRequest request = HeadObjectRequest.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .requestPayer(requestPayer) .bucket(location.bucket()) .key(location.key()) diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java index 731a56752cc7..c4f83d5dbc81 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java @@ -55,6 +55,7 @@ final class S3OutputStream private final LocalMemoryContext memoryContext; private final S3Client client; private final S3Location location; + private final S3Context context; private final int partSize; private final RequestPayer requestPayer; private final S3SseType sseType; @@ -80,6 +81,7 @@ public S3OutputStream(AggregatedMemoryContext memoryContext, S3Client client, S3 this.memoryContext = memoryContext.newLocalMemoryContext(S3OutputStream.class.getSimpleName()); this.client = requireNonNull(client, "client is null"); this.location = requireNonNull(location, "location is null"); + this.context = requireNonNull(context, "context is null"); this.partSize = context.partSize(); this.requestPayer = context.requestPayer(); this.sseType = context.sseType(); @@ -192,6 +194,7 @@ private void flushBuffer(boolean finished) // skip multipart upload if there would only be one part if (finished && !multipartUploadStarted) { PutObjectRequest request = PutObjectRequest.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .requestPayer(requestPayer) .bucket(location.bucket()) .key(location.key()) @@ -268,6 +271,7 @@ private CompletedPart uploadPage(byte[] data, int length) { if (uploadId.isEmpty()) { CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .requestPayer(requestPayer) .bucket(location.bucket()) .key(location.key()) @@ -285,6 +289,7 @@ private CompletedPart uploadPage(byte[] data, int length) currentPartNumber++; UploadPartRequest request = UploadPartRequest.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .requestPayer(requestPayer) .bucket(location.bucket()) .key(location.key()) @@ -309,6 +314,7 @@ private CompletedPart uploadPage(byte[] data, int length) private void finishUpload(String uploadId) { CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .requestPayer(requestPayer) .bucket(location.bucket()) .key(location.key()) @@ -322,6 +328,7 @@ private void finishUpload(String uploadId) private void abortUpload() { uploadId.map(id -> AbortMultipartUploadRequest.builder() + .overrideConfiguration(context::applyCredentialProviderOverride) .requestPayer(requestPayer) .bucket(location.bucket()) .key(location.key()) diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/s3/S3FileSystemConstants.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/s3/S3FileSystemConstants.java new file mode 100644 index 000000000000..1155026292a6 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/s3/S3FileSystemConstants.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.s3; + +public final class S3FileSystemConstants +{ + public static final String EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY = "internal$s3_aws_access_key"; + public static final String EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY = "internal$s3_aws_secret_key"; + public static final String EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY = "internal$s3_aws_session_token"; + + private S3FileSystemConstants() {} +} diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 70f003c2bdb8..157cf9ac6c28 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -341,6 +341,12 @@ runtime + + io.trino + trino-filesystem-s3 + runtime + + org.apache.httpcomponents.client5 httpclient5 @@ -374,6 +380,36 @@ runtime + + software.amazon.awssdk + auth + runtime + + + + software.amazon.awssdk + aws-core + runtime + + + + software.amazon.awssdk + regions + runtime + + + + software.amazon.awssdk + sdk-core + runtime + + + + software.amazon.awssdk + sts + runtime + + io.airlift http-server diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileSystemFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileSystemFactory.java new file mode 100644 index 000000000000..a8bd36cce40c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileSystemFactory.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.filesystem.TrinoFileSystem; +import io.trino.spi.security.ConnectorIdentity; + +import java.util.Map; + +public interface IcebergFileSystemFactory +{ + TrinoFileSystem create(ConnectorIdentity identity, Map fileIoProperties); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 597b04df280b..b7c450a0623a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -32,7 +32,6 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; -import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable; import io.trino.plugin.base.filter.UtcConstraintExtractor; import io.trino.plugin.base.projection.ApplyProjectionUtil; @@ -328,7 +327,7 @@ public class IcebergMetadata private final CatalogHandle trinoCatalogHandle; private final JsonCodec commitTaskCodec; private final TrinoCatalog catalog; - private final TrinoFileSystemFactory fileSystemFactory; + private final IcebergFileSystemFactory fileSystemFactory; private final TableStatisticsWriter tableStatisticsWriter; private final Map tableStatisticsCache = new ConcurrentHashMap<>(); @@ -340,7 +339,7 @@ public IcebergMetadata( CatalogHandle trinoCatalogHandle, JsonCodec commitTaskCodec, TrinoCatalog catalog, - TrinoFileSystemFactory fileSystemFactory, + IcebergFileSystemFactory fileSystemFactory, TableStatisticsWriter tableStatisticsWriter) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); @@ -961,7 +960,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con } transaction = newCreateTableTransaction(catalog, tableMetadata, session, replace, tableLocation); Location location = Location.of(transaction.table().location()); - TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), transaction.table().io().properties()); try { if (!replace && fileSystem.listFiles(location).hasNext()) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("" + @@ -1054,7 +1053,8 @@ private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, table.location(), getFileFormat(table), table.properties(), - retryMode); + retryMode, + table.io().properties()); } private static List getSupportedSortFields(Schema schema, SortOrder sortOrder) @@ -1168,7 +1168,7 @@ public Optional finishInsert(ConnectorSession session, private void cleanExtraOutputFiles(ConnectorSession session, Set writtenFiles) { - TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), transaction.table().io().properties()); Set locations = getOutputFilesLocations(writtenFiles); Set fileNames = getOutputFilesFileNames(writtenFiles); for (String location : locations) { @@ -1302,7 +1302,8 @@ private Optional getTableHandleForOptimize( tableHandle.getStorageProperties(), maxScannedFileSize, retryMode != NO_RETRIES), - tableHandle.getTableLocation())); + tableHandle.getTableLocation(), + icebergTable.io().properties())); } private Optional getTableHandleForDropExtendedStats(ConnectorSession session, IcebergTableHandle tableHandle) @@ -1313,7 +1314,8 @@ private Optional getTableHandleForDropExtendedStats tableHandle.getSchemaTableName(), DROP_EXTENDED_STATS, new IcebergDropExtendedStatsHandle(), - icebergTable.location())); + icebergTable.location(), + icebergTable.io().properties())); } private Optional getTableHandleForExpireSnapshots(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) @@ -1325,7 +1327,8 @@ private Optional getTableHandleForExpireSnapshots(C tableHandle.getSchemaTableName(), EXPIRE_SNAPSHOTS, new IcebergExpireSnapshotsHandle(retentionThreshold), - icebergTable.location())); + icebergTable.location(), + icebergTable.io().properties())); } private Optional getTableHandleForRemoveOrphanFiles(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) @@ -1337,7 +1340,8 @@ private Optional getTableHandleForRemoveOrphanFiles tableHandle.getSchemaTableName(), REMOVE_ORPHAN_FILES, new IcebergRemoveOrphanFilesHandle(retentionThreshold), - icebergTable.location())); + icebergTable.location(), + icebergTable.io().properties())); } @Override @@ -1543,7 +1547,7 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut IcebergSessionProperties.EXPIRE_SNAPSHOTS_MIN_RETENTION); long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis(); - TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), table.io().properties()); List pathsToDelete = new ArrayList<>(); // deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used Consumer deleteFunction = path -> { @@ -1629,10 +1633,10 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu } Instant expiration = session.getStart().minusMillis(retention.toMillis()); - removeOrphanFiles(table, session, executeHandle.getSchemaTableName(), expiration); + removeOrphanFiles(table, session, executeHandle.getSchemaTableName(), expiration, executeHandle.getFileIoProperties()); } - private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration) + private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration, Map fileIoProperties) { Set processedManifestFilePaths = new HashSet<>(); // Similarly to issues like https://github.com/trinodb/trino/issues/13759, equivalent paths may have different String @@ -1674,8 +1678,8 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl validMetadataFileNames.add("version-hint.text"); - scanAndDeleteInvalidFiles(table, session, schemaTableName, expiration, validDataFileNames.build(), "data"); - scanAndDeleteInvalidFiles(table, session, schemaTableName, expiration, validMetadataFileNames.build(), "metadata"); + scanAndDeleteInvalidFiles(table, session, schemaTableName, expiration, validDataFileNames.build(), "data", fileIoProperties); + scanAndDeleteInvalidFiles(table, session, schemaTableName, expiration, validMetadataFileNames.build(), "metadata", fileIoProperties); } private static ManifestReader> readerForManifest(Table table, ManifestFile manifest) @@ -1686,11 +1690,11 @@ private static ManifestReader> readerForManifest(Table }; } - private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration, Set validFiles, String subfolder) + private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration, Set validFiles, String subfolder, Map fileIoProperties) { try { List filesToDelete = new ArrayList<>(); - TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), fileIoProperties); FileIterator allFiles = fileSystem.listFiles(Location.of(table.location()).appendPath(subfolder)); while (allFiles.hasNext()) { FileEntry entry = allFiles.next(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index 36163eadc864..c9259a57ee89 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -15,7 +15,6 @@ import com.google.inject.Inject; import io.airlift.json.JsonCodec; -import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.spi.connector.CatalogHandle; import io.trino.spi.security.ConnectorIdentity; @@ -29,7 +28,7 @@ public class IcebergMetadataFactory private final CatalogHandle trinoCatalogHandle; private final JsonCodec commitTaskCodec; private final TrinoCatalogFactory catalogFactory; - private final TrinoFileSystemFactory fileSystemFactory; + private final IcebergFileSystemFactory fileSystemFactory; private final TableStatisticsWriter tableStatisticsWriter; @Inject @@ -38,7 +37,7 @@ public IcebergMetadataFactory( CatalogHandle trinoCatalogHandle, JsonCodec commitTaskCodec, TrinoCatalogFactory catalogFactory, - TrinoFileSystemFactory fileSystemFactory, + IcebergFileSystemFactory fileSystemFactory, TableStatisticsWriter tableStatisticsWriter) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index 0d16cb035572..f994235610fa 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -26,6 +26,7 @@ import io.trino.plugin.hive.orc.OrcWriterConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.plugin.iceberg.catalog.rest.DefaultIcebergFileSystemFactory; import io.trino.plugin.iceberg.functions.IcebergFunctionProvider; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProvider; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider; @@ -105,5 +106,7 @@ public void configure(Binder binder) newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(TableChangesFunctionProcessorProvider.class).in(Scopes.SINGLETON); + + newOptionalBinder(binder, IcebergFileSystemFactory.class).setDefault().to(DefaultIcebergFileSystemFactory.class).in(Scopes.SINGLETON); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index 1242c9dd4c19..60846bd7264d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -16,7 +16,6 @@ import com.google.inject.Inject; import io.airlift.json.JsonCodec; import io.airlift.units.DataSize; -import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.hive.SortingFileWriterConfig; import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle; import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; @@ -48,7 +47,7 @@ public class IcebergPageSinkProvider implements ConnectorPageSinkProvider { - private final TrinoFileSystemFactory fileSystemFactory; + private final IcebergFileSystemFactory fileSystemFactory; private final JsonCodec jsonCodec; private final IcebergFileWriterFactory fileWriterFactory; private final PageIndexerFactory pageIndexerFactory; @@ -60,7 +59,7 @@ public class IcebergPageSinkProvider @Inject public IcebergPageSinkProvider( - TrinoFileSystemFactory fileSystemFactory, + IcebergFileSystemFactory fileSystemFactory, JsonCodec jsonCodec, IcebergFileWriterFactory fileWriterFactory, PageIndexerFactory pageIndexerFactory, @@ -104,7 +103,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab locationProvider, fileWriterFactory, pageIndexerFactory, - fileSystemFactory.create(session), + fileSystemFactory.create(session.getIdentity(), tableHandle.getFileIoProperties()), tableHandle.getInputColumns(), jsonCodec, session, @@ -135,7 +134,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa locationProvider, fileWriterFactory, pageIndexerFactory, - fileSystemFactory.create(session), + fileSystemFactory.create(session.getIdentity(), executeHandle.getFileIoProperties()), optimizeHandle.getTableColumns(), jsonCodec, session, @@ -168,7 +167,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction return new IcebergMergeSink( locationProvider, fileWriterFactory, - fileSystemFactory.create(session), + fileSystemFactory.create(session.getIdentity(), tableHandle.getFileIoProperties()), jsonCodec, session, tableHandle.getFileFormat(), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index c0bec12b9aa6..7c3d50c15ec7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -26,7 +26,6 @@ import io.trino.annotation.NotThreadSafe; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; -import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.orc.OrcColumn; @@ -222,7 +221,7 @@ public class IcebergPageSourceProvider // TODO (https://github.com/trinodb/trino/issues/16824) allow connector to return pages of arbitrary row count and handle this gracefully in engine private static final int MAX_RLE_PAGE_SIZE = DEFAULT_MAX_PAGE_SIZE_IN_BYTES / SIZE_OF_LONG; - private final TrinoFileSystemFactory fileSystemFactory; + private final IcebergFileSystemFactory fileSystemFactory; private final FileFormatDataSourceStats fileFormatDataSourceStats; private final OrcReaderOptions orcReaderOptions; private final ParquetReaderOptions parquetReaderOptions; @@ -230,7 +229,7 @@ public class IcebergPageSourceProvider @Inject public IcebergPageSourceProvider( - TrinoFileSystemFactory fileSystemFactory, + IcebergFileSystemFactory fileSystemFactory, FileFormatDataSourceStats fileFormatDataSourceStats, OrcReaderConfig orcReaderConfig, ParquetReaderConfig parquetReaderConfig, @@ -279,6 +278,7 @@ public ConnectorPageSource createPageSource( split.getFileRecordCount(), split.getPartitionDataJson(), split.getFileFormat(), + split.getFileIoProperties(), tableHandle.getNameMappingJson().map(NameMappingParser::fromJson)); } @@ -298,6 +298,7 @@ public ConnectorPageSource createPageSource( long fileRecordCount, String partitionDataJson, IcebergFileFormat fileFormat, + Map fileIoProperties, Optional nameMapping) { Set deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, deletes); @@ -347,7 +348,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { return new EmptyPageSource(); } - TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), fileIoProperties); TrinoInputFile inputfile = isUseFileSizeFromMetadata(session) ? fileSystem.newInputFile(Location.of(path), fileSize) : fileSystem.newInputFile(Location.of(path)); @@ -397,6 +398,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { Supplier> deletePredicate = memoize(() -> { List deleteFilters = readDeletes( session, + fileSystem, tableSchema, readColumns, path, @@ -457,6 +459,7 @@ else if (deleteFile.content() == EQUALITY_DELETES) { private List readDeletes( ConnectorSession session, + TrinoFileSystem fileSystem, Schema schema, List readColumns, String dataFilePath, @@ -498,7 +501,7 @@ private List readDeletes( } } - try (ConnectorPageSource pageSource = openDeletes(session, delete, deleteColumns, deleteDomain)) { + try (ConnectorPageSource pageSource = openDeletes(session, fileSystem, delete, deleteColumns, deleteDomain)) { readPositionDeletes(pageSource, targetPath, deletedRows); } catch (IOException e) { @@ -515,7 +518,7 @@ else if (delete.content() == EQUALITY_DELETES) { EqualityDeleteSet equalityDeleteSet = deletesSetByFieldIds.computeIfAbsent(fieldIds, key -> new EqualityDeleteSet(deleteSchema, schemaFromHandles(readColumns))); - try (ConnectorPageSource pageSource = openDeletes(session, delete, columns, TupleDomain.all())) { + try (ConnectorPageSource pageSource = openDeletes(session, fileSystem, delete, columns, TupleDomain.all())) { readEqualityDeletes(pageSource, columns, equalityDeleteSet::add); } catch (IOException e) { @@ -540,11 +543,11 @@ else if (delete.content() == EQUALITY_DELETES) { private ConnectorPageSource openDeletes( ConnectorSession session, + TrinoFileSystem fileSystem, DeleteFile delete, List columns, TupleDomain tupleDomain) { - TrinoFileSystem fileSystem = fileSystemFactory.create(session); return createDataPageSource( session, fileSystem.newInputFile(Location.of(delete.path()), delete.fileSizeInBytes()), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index d14178c28689..134ae3a87a42 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -18,11 +18,13 @@ import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.slice.SizeOf; import io.trino.plugin.iceberg.delete.DeleteFile; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; import java.util.List; +import java.util.Map; import static com.google.common.base.MoreObjects.toStringHelper; import static io.airlift.slice.SizeOf.estimatedSizeOf; @@ -44,6 +46,7 @@ public class IcebergSplit private final String partitionDataJson; private final List deletes; private final SplitWeight splitWeight; + private final Map fileIoProperties; @JsonCreator public IcebergSplit( @@ -56,7 +59,8 @@ public IcebergSplit( @JsonProperty("partitionSpecJson") String partitionSpecJson, @JsonProperty("partitionDataJson") String partitionDataJson, @JsonProperty("deletes") List deletes, - @JsonProperty("splitWeight") SplitWeight splitWeight) + @JsonProperty("splitWeight") SplitWeight splitWeight, + @JsonProperty("fileIoProperties") Map fileIoProperties) { this.path = requireNonNull(path, "path is null"); this.start = start; @@ -68,6 +72,7 @@ public IcebergSplit( this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null")); this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); + this.fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); } @JsonProperty @@ -131,6 +136,12 @@ public SplitWeight getSplitWeight() return splitWeight; } + @JsonProperty + public Map getFileIoProperties() + { + return fileIoProperties; + } + @Override public Object getInfo() { @@ -149,7 +160,8 @@ public long getRetainedSizeInBytes() + estimatedSizeOf(partitionSpecJson) + estimatedSizeOf(partitionDataJson) + estimatedSizeOf(deletes, DeleteFile::getRetainedSizeInBytes) - + splitWeight.getRetainedSizeInBytes(); + + splitWeight.getRetainedSizeInBytes() + + estimatedSizeOf(fileIoProperties, SizeOf::estimatedSizeOf, SizeOf::estimatedSizeOf); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index 5d7404644fcb..d4d5023b1a59 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.airlift.units.Duration; -import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesSplitSource; @@ -46,14 +45,14 @@ public class IcebergSplitManager private final IcebergTransactionManager transactionManager; private final TypeManager typeManager; - private final TrinoFileSystemFactory fileSystemFactory; + private final IcebergFileSystemFactory fileSystemFactory; private final boolean asyncIcebergSplitProducer; @Inject public IcebergSplitManager( IcebergTransactionManager transactionManager, TypeManager typeManager, - TrinoFileSystemFactory fileSystemFactory, + IcebergFileSystemFactory fileSystemFactory, @AsyncIcebergSplitProducer boolean asyncIcebergSplitProducer) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); @@ -91,6 +90,7 @@ public ConnectorSplitSource getSplits( fileSystemFactory, session, table, + icebergTable.io().properties(), tableScan, table.getMaxScannedFileSize(), dynamicFilter, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 2356dc4b8fb3..bdc552329893 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -25,7 +25,6 @@ import io.airlift.units.Duration; import io.trino.cache.NonEvictableCache; import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.plugin.iceberg.delete.DeleteFile; import io.trino.plugin.iceberg.util.DataFileWithDeleteFiles; @@ -108,9 +107,10 @@ public class IcebergSplitSource private static final ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitBatch(ImmutableList.of(), false); private static final ConnectorSplitBatch NO_MORE_SPLITS_BATCH = new ConnectorSplitBatch(ImmutableList.of(), true); - private final TrinoFileSystemFactory fileSystemFactory; + private final IcebergFileSystemFactory fileSystemFactory; private final ConnectorSession session; private final IcebergTableHandle tableHandle; + private final Map fileIoProperties; private final TableScan tableScan; private final Optional maxScannedFileSizeInBytes; private final Map fieldIdToType; @@ -138,9 +138,10 @@ public class IcebergSplitSource private long outputRowsLowerBound; public IcebergSplitSource( - TrinoFileSystemFactory fileSystemFactory, + IcebergFileSystemFactory fileSystemFactory, ConnectorSession session, IcebergTableHandle tableHandle, + Map fileIoProperties, TableScan tableScan, Optional maxScannedFileSize, DynamicFilter dynamicFilter, @@ -153,6 +154,7 @@ public IcebergSplitSource( this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.session = requireNonNull(session, "session is null"); this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); + this.fileIoProperties = requireNonNull(fileIoProperties, "fileIoProperties is null"); this.tableScan = requireNonNull(tableScan, "tableScan is null"); this.maxScannedFileSizeInBytes = maxScannedFileSize.map(DataSize::toBytes); this.fieldIdToType = primitiveFieldTypes(tableScan.schema()); @@ -332,7 +334,7 @@ private boolean noDataColumnsProjected(FileScanTask fileScanTask) private long getModificationTime(String path) { try { - TrinoInputFile inputFile = fileSystemFactory.create(session).newInputFile(Location.of(path)); + TrinoInputFile inputFile = fileSystemFactory.create(session.getIdentity(), fileIoProperties).newInputFile(Location.of(path)); return inputFile.lastModified().toEpochMilli(); } catch (IOException e) { @@ -514,7 +516,8 @@ private IcebergSplit toIcebergSplit(FileScanTask task) task.deletes().stream() .map(DeleteFile::fromIceberg) .collect(toImmutableList()), - SplitWeight.fromProportion(clamp((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight, 1.0))); + SplitWeight.fromProportion(clamp((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight, 1.0)), + fileIoProperties); } private static Domain getPathDomain(TupleDomain effectivePredicate) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java index 22c885b4c465..f46cf4640309 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java @@ -41,6 +41,7 @@ public class IcebergWritableTableHandle private final IcebergFileFormat fileFormat; private final Map storageProperties; private final RetryMode retryMode; + private final Map fileIoProperties; @JsonCreator public IcebergWritableTableHandle( @@ -53,7 +54,8 @@ public IcebergWritableTableHandle( @JsonProperty("outputPath") String outputPath, @JsonProperty("fileFormat") IcebergFileFormat fileFormat, @JsonProperty("properties") Map storageProperties, - @JsonProperty("retryMode") RetryMode retryMode) + @JsonProperty("retryMode") RetryMode retryMode, + @JsonProperty("fileIoProperties") Map fileIoProperties) { this.name = requireNonNull(name, "name is null"); this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null"); @@ -66,6 +68,7 @@ public IcebergWritableTableHandle( this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); this.retryMode = requireNonNull(retryMode, "retryMode is null"); checkArgument(partitionsSpecsAsJson.containsKey(partitionSpecId), "partitionSpecId missing from partitionSpecs"); + this.fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); } @JsonProperty @@ -128,6 +131,12 @@ public RetryMode getRetryMode() return retryMode; } + @JsonProperty + public Map getFileIoProperties() + { + return fileIoProperties; + } + @Override public String toString() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/DefaultIcebergFileSystemFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/DefaultIcebergFileSystemFactory.java new file mode 100644 index 000000000000..e61601e71ea8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/DefaultIcebergFileSystemFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.inject.Inject; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.iceberg.IcebergFileSystemFactory; +import io.trino.spi.security.ConnectorIdentity; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class DefaultIcebergFileSystemFactory + implements IcebergFileSystemFactory +{ + private final TrinoFileSystemFactory fileSystemFactory; + + @Inject + public DefaultIcebergFileSystemFactory(TrinoFileSystemFactory fileSystemFactory) + { + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + } + + @Override + public TrinoFileSystem create(ConnectorIdentity identity, Map fileIoProperties) + { + return fileSystemFactory.create(identity); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java index af8e3a5fe0b4..122ffc6d8e51 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java @@ -38,6 +38,7 @@ public enum SessionType private Optional warehouse = Optional.empty(); private Security security = Security.NONE; private SessionType sessionType = SessionType.NONE; + private boolean vendedCredentialsEnabled; @NotNull public URI getBaseUri() @@ -95,4 +96,17 @@ public IcebergRestCatalogConfig setWarehouse(String warehouse) this.warehouse = Optional.ofNullable(warehouse); return this; } + + public boolean isVendedCredentialsEnabled() + { + return vendedCredentialsEnabled; + } + + @Config("iceberg.rest-catalog.vended-credentials-enabled") + @ConfigDescription("Use credentials provided by the REST backend for file system access") + public IcebergRestCatalogConfig setVendedCredentialsEnabled(boolean vendedCredentialsEnabled) + { + this.vendedCredentialsEnabled = vendedCredentialsEnabled; + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java new file mode 100644 index 000000000000..b5e777c7ee15 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.iceberg.IcebergFileSystemFactory; +import io.trino.spi.security.ConnectorIdentity; + +import java.util.Map; + +import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY; +import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY; +import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY; +import static java.util.Objects.requireNonNull; + +public class IcebergRestCatalogFileSystemFactory + implements IcebergFileSystemFactory +{ + private static final String VENDED_S3_ACCESS_KEY = "s3.access-key-id"; + private static final String VENDED_S3_SECRET_KEY = "s3.secret-access-key"; + private static final String VENDED_S3_SESSION_TOKEN = "s3.session-token"; + + private final TrinoFileSystemFactory fileSystemFactory; + private final boolean vendedCredentialsEnabled; + + @Inject + public IcebergRestCatalogFileSystemFactory(TrinoFileSystemFactory fileSystemFactory, IcebergRestCatalogConfig config) + { + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.vendedCredentialsEnabled = config.isVendedCredentialsEnabled(); + } + + @Override + public TrinoFileSystem create(ConnectorIdentity identity, Map fileIoProperties) + { + if (vendedCredentialsEnabled && + fileIoProperties.containsKey(VENDED_S3_ACCESS_KEY) && + fileIoProperties.containsKey(VENDED_S3_SECRET_KEY) && + fileIoProperties.containsKey(VENDED_S3_SESSION_TOKEN)) { + // Do not include original credentials as they should not be used in vended mode + ConnectorIdentity identityWithExtraCredentials = ConnectorIdentity.forUser(identity.getUser()) + .withGroups(identity.getGroups()) + .withPrincipal(identity.getPrincipal()) + .withEnabledSystemRoles(identity.getEnabledSystemRoles()) + .withConnectorRole(identity.getConnectorRole()) + .withExtraCredentials(ImmutableMap.builder() + .put(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY, fileIoProperties.get(VENDED_S3_ACCESS_KEY)) + .put(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY, fileIoProperties.get(VENDED_S3_SECRET_KEY)) + .put(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY, fileIoProperties.get(VENDED_S3_SESSION_TOKEN)) + .buildOrThrow()) + .build(); + return fileSystemFactory.create(identityWithExtraCredentials); + } + + return fileSystemFactory.create(identity); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java index 8950212bbbaf..4753b5c8cb12 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java @@ -16,11 +16,16 @@ import com.google.inject.Binder; import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergFileSystemFactory; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.Security; +import io.trino.spi.TrinoException; +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; public class IcebergRestCatalogModule extends AbstractConfigurationAwareModule @@ -36,5 +41,12 @@ protected void setup(Binder binder) new NoneSecurityModule())); binder.bind(TrinoCatalogFactory.class).to(TrinoIcebergRestCatalogFactory.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, IcebergFileSystemFactory.class).setBinding().to(IcebergRestCatalogFileSystemFactory.class).in(Scopes.SINGLETON); + + IcebergConfig icebergConfig = buildConfigObject(IcebergConfig.class); + IcebergRestCatalogConfig restCatalogConfig = buildConfigObject(IcebergRestCatalogConfig.class); + if (restCatalogConfig.isVendedCredentialsEnabled() && icebergConfig.isRegisterTableProcedureEnabled()) { + throw new TrinoException(NOT_SUPPORTED, "Using the `register_table` procedure with vended credentials is currently not supported"); + } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java index e883cc3beb4b..3f1c483dc652 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java @@ -16,10 +16,10 @@ import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; -import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergFileSystemFactory; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType; @@ -37,12 +37,13 @@ public class TrinoIcebergRestCatalogFactory implements TrinoCatalogFactory { - private final TrinoFileSystemFactory fileSystemFactory; + private final IcebergFileSystemFactory fileSystemFactory; private final CatalogName catalogName; private final String trinoVersion; private final URI serverUri; private final Optional warehouse; private final SessionType sessionType; + private final boolean vendedCredentialsEnabled; private final SecurityProperties securityProperties; private final boolean uniqueTableLocation; @@ -51,7 +52,7 @@ public class TrinoIcebergRestCatalogFactory @Inject public TrinoIcebergRestCatalogFactory( - TrinoFileSystemFactory fileSystemFactory, + IcebergFileSystemFactory fileSystemFactory, CatalogName catalogName, IcebergRestCatalogConfig restConfig, SecurityProperties securityProperties, @@ -65,6 +66,7 @@ public TrinoIcebergRestCatalogFactory( this.serverUri = restConfig.getBaseUri(); this.warehouse = restConfig.getWarehouse(); this.sessionType = restConfig.getSessionType(); + this.vendedCredentialsEnabled = restConfig.isVendedCredentialsEnabled(); this.securityProperties = requireNonNull(securityProperties, "securityProperties is null"); requireNonNull(icebergConfig, "icebergConfig is null"); this.uniqueTableLocation = icebergConfig.isUniqueTableLocation(); @@ -81,13 +83,18 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity) warehouse.ifPresent(location -> properties.put(CatalogProperties.WAREHOUSE_LOCATION, location)); properties.put("trino-version", trinoVersion); properties.putAll(securityProperties.get()); + + if (vendedCredentialsEnabled) { + properties.put("header.x-tabular-s3-access", "vended_credentials"); + } + RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog( config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build(), (context, config) -> { ConnectorIdentity currentIdentity = (context.wrappedIdentity() != null) ? ((ConnectorIdentity) context.wrappedIdentity()) : ConnectorIdentity.ofUser("fake"); - return new ForwardingFileIo(fileSystemFactory.create(currentIdentity)); + return new ForwardingFileIo(fileSystemFactory.create(currentIdentity, config), config); }); icebergCatalogInstance.initialize(catalogName.toString(), properties.buildOrThrow()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java index 96b2be40424c..cfb511c3fccb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg.fileio; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; @@ -24,6 +25,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import java.util.stream.Stream; import static java.util.Objects.requireNonNull; @@ -36,10 +38,17 @@ public class ForwardingFileIo private static final int BATCH_DELETE_PATHS_MESSAGE_LIMIT = 5; private final TrinoFileSystem fileSystem; + private final Map properties; public ForwardingFileIo(TrinoFileSystem fileSystem) + { + this(fileSystem, ImmutableMap.of()); + } + + public ForwardingFileIo(TrinoFileSystem fileSystem, Map properties) { this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); + this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null")); } @Override @@ -95,4 +104,16 @@ private void deleteBatch(List filesToDelete) e); } } + + @Override + public Map properties() + { + return properties; + } + + @Override + public void initialize(Map properties) + { + throw new UnsupportedOperationException("ForwardingFileIO does not support initialization by properties"); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java index 7271a025ace5..d0b07741cb37 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java @@ -127,6 +127,7 @@ else if (column.getId() == DATA_CHANGE_ORDINAL_ID) { split.fileRecordCount(), split.partitionDataJson(), split.fileFormat(), + split.fileIoProperties(), functionHandle.nameMappingJson().map(NameMappingParser::fromJson)); this.delegateColumnMap = delegateColumnMap; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplit.java index cd4718ff6b92..bf1ae0e85ff1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplit.java @@ -19,6 +19,8 @@ import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; +import java.util.Map; + import static com.google.common.base.MoreObjects.toStringHelper; import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; @@ -36,7 +38,8 @@ public record TableChangesSplit( IcebergFileFormat fileFormat, String partitionSpecJson, String partitionDataJson, - SplitWeight splitWeight) implements ConnectorSplit + SplitWeight splitWeight, + Map fileIoProperties) implements ConnectorSplit { private static final int INSTANCE_SIZE = SizeOf.instanceSize(TableChangesSplit.class); @@ -48,6 +51,7 @@ public record TableChangesSplit( requireNonNull(partitionSpecJson, "partitionSpecJson is null"); requireNonNull(partitionDataJson, "partitionDataJson is null"); requireNonNull(splitWeight, "splitWeight is null"); + fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); } @Override @@ -73,7 +77,8 @@ public long getRetainedSizeInBytes() + estimatedSizeOf(path) + estimatedSizeOf(partitionSpecJson) + estimatedSizeOf(partitionDataJson) - + splitWeight.getRetainedSizeInBytes(); + + splitWeight.getRetainedSizeInBytes() + + estimatedSizeOf(fileIoProperties, SizeOf::estimatedSizeOf, SizeOf::estimatedSizeOf); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java index edd60e8b9ba2..eff137667061 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesSplitSource.java @@ -153,7 +153,8 @@ private TableChangesSplit toSplit(AddedRowsScanTask task) IcebergFileFormat.fromIceberg(task.file().format()), PartitionSpecParser.toJson(task.spec()), PartitionData.toJson(task.file().partition()), - SplitWeight.standard()); + SplitWeight.standard(), + icebergTable.io().properties()); } private TableChangesSplit toSplit(DeletedDataFileScanTask task) @@ -171,6 +172,7 @@ private TableChangesSplit toSplit(DeletedDataFileScanTask task) IcebergFileFormat.fromIceberg(task.file().format()), PartitionSpecParser.toJson(task.spec()), PartitionData.toJson(task.file().partition()), - SplitWeight.standard()); + SplitWeight.standard(), + icebergTable.io().properties()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableExecuteHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableExecuteHandle.java index 655bf7b02324..3657b5dd3a18 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableExecuteHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableExecuteHandle.java @@ -18,6 +18,8 @@ import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.SchemaTableName; +import java.util.Map; + import static java.util.Objects.requireNonNull; public class IcebergTableExecuteHandle @@ -27,18 +29,21 @@ public class IcebergTableExecuteHandle private final IcebergTableProcedureId procedureId; private final IcebergProcedureHandle procedureHandle; private final String tableLocation; + private final Map fileIoProperties; @JsonCreator public IcebergTableExecuteHandle( SchemaTableName schemaTableName, IcebergTableProcedureId procedureId, IcebergProcedureHandle procedureHandle, - String tableLocation) + String tableLocation, + Map fileIoProperties) { this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.procedureId = requireNonNull(procedureId, "procedureId is null"); this.procedureHandle = requireNonNull(procedureHandle, "procedureHandle is null"); this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); + this.fileIoProperties = requireNonNull(fileIoProperties, "fileIoProperties is null"); } @JsonProperty @@ -65,13 +70,20 @@ public String getTableLocation() return tableLocation; } + @JsonProperty + public Map getFileIoProperties() + { + return fileIoProperties; + } + public IcebergTableExecuteHandle withProcedureHandle(IcebergProcedureHandle procedureHandle) { return new IcebergTableExecuteHandle( schemaTableName, procedureId, procedureHandle, - tableLocation); + tableLocation, + fileIoProperties); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 27e911fb3565..530a6271c490 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -34,6 +34,7 @@ import io.trino.plugin.hive.orc.OrcWriterConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.plugin.iceberg.catalog.rest.DefaultIcebergFileSystemFactory; import io.trino.spi.Page; import io.trino.spi.SplitWeight; import io.trino.spi.block.BlockBuilder; @@ -145,7 +146,8 @@ public void testDynamicSplitPruningOnUnpartitionedTable() PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), PartitionData.toJson(new PartitionData(new Object[] {})), ImmutableList.of(), - SplitWeight.standard()); + SplitWeight.standard(), + ImmutableMap.of()); String tablePath = inputFile.location().fileName(); TableHandle tableHandle = new TableHandle( @@ -260,7 +262,8 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter() PartitionSpecParser.toJson(partitionSpec), PartitionData.toJson(new PartitionData(new Object[] {dateColumnValue})), ImmutableList.of(), - SplitWeight.standard()); + SplitWeight.standard(), + ImmutableMap.of()); String tablePath = inputFile.location().fileName(); TableHandle tableHandle = new TableHandle( @@ -367,7 +370,6 @@ public void testDynamicSplitPruningWithExplicitPartitionFilterPartitionEvolution PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema) .identity(yearColumnName) .build(); - IcebergConfig icebergConfig = new IcebergConfig(); HiveTransactionHandle transaction = new HiveTransactionHandle(false); try (TempFile file = new TempFile()) { @@ -410,7 +412,8 @@ public void testDynamicSplitPruningWithExplicitPartitionFilterPartitionEvolution PartitionSpecParser.toJson(partitionSpec), PartitionData.toJson(new PartitionData(new Object[] {yearColumnValue})), ImmutableList.of(), - SplitWeight.standard()); + SplitWeight.standard(), + ImmutableMap.of()); String tablePath = inputFile.location().fileName(); // Simulate the situation where `month` column is added at a later phase as partitioning column @@ -510,7 +513,7 @@ private static ConnectorPageSource createTestingPageSource( { FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); IcebergPageSourceProvider provider = new IcebergPageSourceProvider( - new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS), + new DefaultIcebergFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)), stats, ORC_READER_CONFIG, PARQUET_READER_CONFIG, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java index 0f055ebed57d..a739e85094a2 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java @@ -237,6 +237,25 @@ public void testRestCatalog() .shutdown(); } + @Test + public void testRestCatalogValidations() + { + ConnectorFactory factory = getConnectorFactory(); + + assertThatThrownBy(() -> factory.create( + "test", + Map.of( + "iceberg.catalog.type", "rest", + "iceberg.register-table-procedure.enabled", "true", + "iceberg.rest-catalog.uri", "https://foo:1234", + "iceberg.rest-catalog.vended-credentials-enabled", "true", + "bootstrap.quiet", "true"), + new TestingConnectorContext()) + .shutdown()) + .isInstanceOf(ApplicationConfigurationException.class) + .hasMessageContaining("Using the `register_table` procedure with vended credentials is currently not supported"); + } + @Test public void testJdbcCatalog() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index f98ffc0a7728..35bb09fe73e7 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -30,6 +30,7 @@ import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; +import io.trino.plugin.iceberg.catalog.rest.DefaultIcebergFileSystemFactory; import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSession; @@ -166,9 +167,10 @@ public void testIncompleteDynamicFilterTimeout() Optional.of(false)); try (IcebergSplitSource splitSource = new IcebergSplitSource( - fileSystemFactory, + new DefaultIcebergFileSystemFactory(fileSystemFactory), SESSION, tableHandle, + ImmutableMap.of(), nationTable.newScan(), Optional.empty(), new DynamicFilter() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java index 8ed61c8af764..1094b3e0c2e0 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java @@ -111,7 +111,7 @@ public void testNonLowercaseNamespace() CatalogHandle.fromId("iceberg:NORMAL:v12345"), jsonCodec(CommitTaskData.class), catalog, - connectorIdentity -> { + (connectorIdentity, fileIoProperties) -> { throw new UnsupportedOperationException(); }, new TableStatisticsWriter(new NodeVersion("test-version"))); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java index e22ca627521e..7ed0bedca63e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java @@ -116,7 +116,7 @@ public void testNonLowercaseGlueDatabase() CatalogHandle.fromId("iceberg:NORMAL:v12345"), jsonCodec(CommitTaskData.class), catalog, - connectorIdentity -> { + (connectorIdentity, fileIoProperties) -> { throw new UnsupportedOperationException(); }, new TableStatisticsWriter(new NodeVersion("test-version"))); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java index 8424be77c89d..b9a64fe3eba8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java @@ -182,7 +182,7 @@ public void testNonLowercaseNamespace() CatalogHandle.fromId("iceberg:NORMAL:v12345"), jsonCodec(CommitTaskData.class), catalog, - connectorIdentity -> { + (connectorIdentity, fileIoProperties) -> { throw new UnsupportedOperationException(); }, new TableStatisticsWriter(new NodeVersion("test-version"))); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java index d544286fcd2a..8d7f53158ff7 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java @@ -31,7 +31,8 @@ public void testDefaults() .setBaseUri(null) .setWarehouse(null) .setSessionType(IcebergRestCatalogConfig.SessionType.NONE) - .setSecurity(IcebergRestCatalogConfig.Security.NONE)); + .setSecurity(IcebergRestCatalogConfig.Security.NONE) + .setVendedCredentialsEnabled(false)); } @Test @@ -42,13 +43,15 @@ public void testExplicitPropertyMappings() .put("iceberg.rest-catalog.warehouse", "test_warehouse_identifier") .put("iceberg.rest-catalog.security", "OAUTH2") .put("iceberg.rest-catalog.session", "USER") + .put("iceberg.rest-catalog.vended-credentials-enabled", "true") .buildOrThrow(); IcebergRestCatalogConfig expected = new IcebergRestCatalogConfig() .setBaseUri("http://localhost:1234") .setWarehouse("test_warehouse_identifier") .setSessionType(IcebergRestCatalogConfig.SessionType.USER) - .setSecurity(IcebergRestCatalogConfig.Security.OAUTH2); + .setSecurity(IcebergRestCatalogConfig.Security.OAUTH2) + .setVendedCredentialsEnabled(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java index 6a1f7559c199..f3ef88a38046 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java @@ -43,7 +43,6 @@ import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog; -import static java.lang.String.format; import static org.apache.iceberg.FileFormat.PARQUET; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @@ -153,7 +152,7 @@ protected String getMetadataLocation(String tableName) @Override protected String schemaPath() { - return format("%s/%s", warehouseLocation, getSession().getSchema()); + return String.format("%s/%s", warehouseLocation, getSession().getSchema()); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..c72f14bb9df5 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java @@ -0,0 +1,355 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.opentelemetry.api.OpenTelemetry; +import io.trino.filesystem.Location; +import io.trino.filesystem.s3.S3FileSystemConfig; +import io.trino.filesystem.s3.S3FileSystemFactory; +import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.containers.IcebergRestCatalogBackendContainer; +import io.trino.testing.containers.Minio; +import io.trino.testing.minio.MinioClient; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Network; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Path; +import java.util.Optional; + +import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_REGION; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.lang.String.format; +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestIcebergVendingRestCatalogConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + private final String bucketName; + private String warehouseLocation; + private IcebergRestCatalogBackendContainer restCatalogBackendContainer; + private Minio minio; + + public TestIcebergVendingRestCatalogConnectorSmokeTest() + { + super(new IcebergConfig().getFileFormat().toIceberg()); + this.bucketName = "test-iceberg-vending-rest-connector-smoke-test-" + randomNameSuffix(); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_COMMENT_ON_VIEW, + SUPPORTS_COMMENT_ON_VIEW_COLUMN, + SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_CREATE_VIEW, + SUPPORTS_RENAME_MATERIALIZED_VIEW, + SUPPORTS_RENAME_SCHEMA -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Network network = Network.newNetwork(); + minio = closeAfterClass(Minio.builder().withNetwork(network).build()); + minio.start(); + minio.createBucket(bucketName); + + this.warehouseLocation = "s3://%s/default/".formatted(bucketName); + + AwsCredentials credentials = AwsBasicCredentials.create(MINIO_ACCESS_KEY, MINIO_SECRET_KEY); + StsClient stsClient = StsClient.builder() + .endpointOverride(URI.create(minio.getMinioAddress())) + .credentialsProvider(StaticCredentialsProvider.create(credentials)) + .region(Region.US_EAST_1) + .build(); + + AssumeRoleResponse assumeRoleResponse = stsClient.assumeRole(AssumeRoleRequest.builder().build()); + restCatalogBackendContainer = closeAfterClass(new IcebergRestCatalogBackendContainer( + Optional.of(network), + warehouseLocation, + assumeRoleResponse.credentials().accessKeyId(), + assumeRoleResponse.credentials().secretAccessKey(), + assumeRoleResponse.credentials().sessionToken())); + restCatalogBackendContainer.start(); + + return IcebergQueryRunner.builder() + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", format.name()) + .put("iceberg.catalog.type", "rest") + .put("iceberg.rest-catalog.uri", "http://" + restCatalogBackendContainer.getRestCatalogEndpoint()) + .put("iceberg.rest-catalog.vended-credentials-enabled", "true") + .put("iceberg.writer-sort-buffer-size", "1MB") + .put("fs.hadoop.enabled", "false") + .put("fs.native-s3.enabled", "true") + .put("s3.region", MINIO_REGION) + .put("s3.endpoint", minio.getMinioAddress()) + .put("s3.path-style-access", "true") + .buildOrThrow()) + .setInitialTables(REQUIRED_TPCH_TABLES) + .build(); + } + + @Override + @BeforeAll + public void initFileSystem() + { + this.fileSystem = new S3FileSystemFactory(OpenTelemetry.noop(), new S3FileSystemConfig() + .setRegion(MINIO_REGION) + .setEndpoint(minio.getMinioAddress()) + .setPathStyleAccess(true) + .setAwsAccessKey(MINIO_ACCESS_KEY) + .setAwsSecretKey(MINIO_SECRET_KEY) + ).create(SESSION); + } + + @Test + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasMessageContaining("createView is not supported for Iceberg REST catalog"); + } + + @Test + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessageContaining("createMaterializedView is not supported for Iceberg REST catalog"); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessageContaining("renameNamespace is not supported for Iceberg REST catalog"); + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + // TODO: Get register table tests working + } + + @Override + protected String getMetadataLocation(String tableName) + { + try (RESTSessionCatalog catalog = new RESTSessionCatalog()) { + catalog.initialize("rest-catalog", ImmutableMap.of(CatalogProperties.URI, "http://" + restCatalogBackendContainer.getRestCatalogEndpoint())); + SessionCatalog.SessionContext context = new SessionCatalog.SessionContext( + "user-default", + "user", + ImmutableMap.of(), + ImmutableMap.of(), + SESSION.getIdentity()); + return ((BaseTable) catalog.loadTable(context, toIdentifier(tableName))).operations().current().metadataFileLocation(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected String schemaPath() + { + return format("%s%s", warehouseLocation, getSession().getSchema().orElseThrow()); + } + + @Override + protected boolean locationExists(String location) + { + return java.nio.file.Files.exists(Path.of(location)); + } + + @Test + @Override + public void testRegisterTableWithTableLocation() + { + assertThatThrownBy(super::testRegisterTableWithTableLocation) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithComments() + { + assertThatThrownBy(super::testRegisterTableWithComments) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithShowCreateTable() + { + assertThatThrownBy(super::testRegisterTableWithShowCreateTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithReInsert() + { + assertThatThrownBy(super::testRegisterTableWithReInsert) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithDroppedTable() + { + assertThatThrownBy(super::testRegisterTableWithDroppedTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithDifferentTableName() + { + assertThatThrownBy(super::testRegisterTableWithDifferentTableName) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithMetadataFile() + { + assertThatThrownBy(super::testRegisterTableWithMetadataFile) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithTrailingSpaceInLocation() + { + assertThatThrownBy(super::testRegisterTableWithTrailingSpaceInLocation) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testUnregisterTable() + { + assertThatThrownBy(super::testUnregisterTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRepeatUnregisterTable() + { + assertThatThrownBy(super::testRepeatUnregisterTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testDropTableWithMissingMetadataFile() + { + assertThatThrownBy(super::testDropTableWithMissingMetadataFile) + .hasMessageMatching("Failed to load table: (.*)"); + } + + @Test + @Override + public void testDropTableWithMissingSnapshotFile() + { + assertThatThrownBy(super::testDropTableWithMissingSnapshotFile) + .hasMessageMatching("Server error: NoSuchKeyException:.*"); + } + + @Test + @Override + public void testDropTableWithMissingManifestListFile() + { + assertThatThrownBy(super::testDropTableWithMissingManifestListFile) + .hasMessageContaining("Table location should not exist"); + } + + @Test + @Override + public void testDropTableWithMissingDataFile() + { + assertThatThrownBy(super::testDropTableWithMissingDataFile) + .hasMessageContaining("Table location should not exist"); + } + + @Test + @Override + public void testDropTableWithNonExistentTableLocation() + { + assertThatThrownBy(super::testDropTableWithNonExistentTableLocation) + .hasMessageMatching("Failed to load table: (.*)"); + } + + @Override + protected boolean isFileSorted(Location path, String sortColumnName) + { + if (format == PARQUET) { + return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); + } + return checkOrcFileSorting(fileSystem, path, sortColumnName); + } + + @Override + protected void deleteDirectory(String location) + { + try (MinioClient minioClient = minio.createMinioClient()) { + String prefix = "s3://" + bucketName + "/"; + String key = location.substring(prefix.length()); + + for (String file : minioClient.listObjects(bucketName, key)) { + minioClient.removeObject(bucketName, file); + } + assertThat(minioClient.listObjects(bucketName, key)).isEmpty(); + } + } + + private TableIdentifier toIdentifier(String tableName) + { + return TableIdentifier.of(getSession().getSchema().orElseThrow(), tableName); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java index e1aff1ff371f..58aa0842b96a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -96,7 +96,7 @@ public void testNonLowercaseNamespace() CatalogHandle.fromId("iceberg:NORMAL:v12345"), jsonCodec(CommitTaskData.class), catalog, - connectorIdentity -> { + (connectorIdentity, fileIoProperties) -> { throw new UnsupportedOperationException(); }, new TableStatisticsWriter(new NodeVersion("test-version"))); diff --git a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergRestCatalogBackendContainer.java b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergRestCatalogBackendContainer.java new file mode 100644 index 000000000000..a66e676a33df --- /dev/null +++ b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergRestCatalogBackendContainer.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.testing.containers; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.testcontainers.containers.Network; + +import java.util.Optional; + +import static io.trino.testing.containers.Minio.MINIO_REGION; + +public class IcebergRestCatalogBackendContainer + extends BaseTestContainer +{ + public IcebergRestCatalogBackendContainer( + Optional network, + String warehouseLocation, + String minioAccessKey, + String minioSecretKey, + String minioSessionToken) + { + super( + "tabulario/iceberg-rest:0.12.0", + "iceberg-rest", + ImmutableSet.of(8181), + ImmutableMap.of(), + ImmutableMap.of( + "CATALOG_INCLUDE__CREDENTIALS", "true", + "CATALOG_WAREHOUSE", warehouseLocation, + "CATALOG_IO__IMPL", "org.apache.iceberg.aws.s3.S3FileIO", + "AWS_REGION", MINIO_REGION, + "CATALOG_S3_ACCESS__KEY__ID", minioAccessKey, + "CATALOG_S3_SECRET__ACCESS__KEY", minioSecretKey, + "CATALOG_S3_SESSION__TOKEN", minioSessionToken, + "CATALOG_S3_ENDPOINT", "http://minio:4566", + "CATALOG_S3_PATH__STYLE__ACCESS", "true"), + network, + 5); + } + + public String getRestCatalogEndpoint() + { + return getMappedHostAndPortForExposedPort(8181).toString(); + } +}