Skip to content

Commit

Permalink
Support vended s3 credentials in the Iceberg REST catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 committed Feb 2, 2024
1 parent 763d00c commit 4c44ca6
Show file tree
Hide file tree
Showing 38 changed files with 859 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
package io.trino.filesystem.s3;

import io.trino.filesystem.s3.S3FileSystemConfig.S3SseType;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.services.s3.model.RequestPayer;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String sseKmsKeyId)
record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String sseKmsKeyId, Optional<AwsCredentialsProvider> credentialsProviderOverride)
{
private static final int MIN_PART_SIZE = 5 * 1024 * 1024; // S3 requirement

Expand All @@ -28,10 +32,26 @@ record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String
checkArgument(partSize >= MIN_PART_SIZE, "partSize must be at least %s bytes", MIN_PART_SIZE);
requireNonNull(sseType, "sseType is null");
checkArgument((sseType != S3SseType.KMS) || (sseKmsKeyId != null), "sseKmsKeyId is null for SSE-KMS");
requireNonNull(credentialsProviderOverride, "credentialsProviderOverride is null");
}

public RequestPayer requestPayer()
{
return requesterPays ? RequestPayer.REQUESTER : null;
}

public S3Context withCredentialsProviderOverride(AwsCredentialsProvider credentialsProviderOverride)
{
return new S3Context(
partSize,
requesterPays,
sseType,
sseKmsKeyId,
Optional.of(credentialsProviderOverride));
}

public void applyCredentialProviderOverride(AwsRequestOverrideConfiguration.Builder builder)
{
credentialsProviderOverride.ifPresent(builder::credentialsProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void deleteFile(Location location)
location.verifyValidFileLocation();
S3Location s3Location = new S3Location(location);
DeleteObjectRequest request = DeleteObjectRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.key(s3Location.key())
.bucket(s3Location.bucket())
Expand Down Expand Up @@ -136,6 +137,7 @@ public void deleteFiles(Collection<Location> locations)
.toList();

DeleteObjectsRequest request = DeleteObjectsRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(bucket)
.delete(builder -> builder.objects(objects).quiet(true))
Expand Down Expand Up @@ -177,6 +179,7 @@ public FileIterator listFiles(Location location)
}

ListObjectsV2Request request = ListObjectsV2Request.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.bucket(s3Location.bucket())
.prefix(key)
.build();
Expand Down Expand Up @@ -230,6 +233,7 @@ public Set<Location> listDirectories(Location location)
}

ListObjectsV2Request request = ListObjectsV2Request.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.bucket(s3Location.bucket())
.prefix(key)
.delimiter("/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.trino.spi.security.ConnectorIdentity;
import jakarta.annotation.PreDestroy;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
Expand All @@ -35,6 +37,9 @@
import java.net.URI;
import java.util.Optional;

import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY;
import static java.lang.Math.toIntExact;

public final class S3FileSystemFactory
Expand Down Expand Up @@ -100,7 +105,8 @@ public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig confi
toIntExact(config.getStreamingPartSize().toBytes()),
config.isRequesterPays(),
config.getSseType(),
config.getSseKmsKeyId());
config.getSseKmsKeyId(),
Optional.empty());
}

@PreDestroy
Expand All @@ -112,6 +118,14 @@ public void destroy()
@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
if (identity.getExtraCredentials().containsKey(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY)) {
AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsSessionCredentials.create(
identity.getExtraCredentials().get(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY),
identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY),
identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY)));
return new S3FileSystem(client, context.withCredentialsProviderOverride(credentialsProvider));
}

return new S3FileSystem(client, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final class S3InputFile
{
private final S3Client client;
private final S3Location location;
private final S3Context context;
private final RequestPayer requestPayer;
private Long length;
private Instant lastModified;
Expand All @@ -44,6 +45,7 @@ public S3InputFile(S3Client client, S3Context context, S3Location location, Long
{
this.client = requireNonNull(client, "client is null");
this.location = requireNonNull(location, "location is null");
this.context = requireNonNull(context, "context is null");
this.requestPayer = context.requestPayer();
this.length = length;
location.location().verifyValidFileLocation();
Expand Down Expand Up @@ -97,6 +99,7 @@ public Location location()
private GetObjectRequest newGetObjectRequest()
{
return GetObjectRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
Expand All @@ -107,6 +110,7 @@ private boolean headObject()
throws IOException
{
HeadObjectRequest request = HeadObjectRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ final class S3OutputStream
private final LocalMemoryContext memoryContext;
private final S3Client client;
private final S3Location location;
private final S3Context context;
private final int partSize;
private final RequestPayer requestPayer;
private final S3SseType sseType;
Expand All @@ -80,6 +81,7 @@ public S3OutputStream(AggregatedMemoryContext memoryContext, S3Client client, S3
this.memoryContext = memoryContext.newLocalMemoryContext(S3OutputStream.class.getSimpleName());
this.client = requireNonNull(client, "client is null");
this.location = requireNonNull(location, "location is null");
this.context = requireNonNull(context, "context is null");
this.partSize = context.partSize();
this.requestPayer = context.requestPayer();
this.sseType = context.sseType();
Expand Down Expand Up @@ -192,6 +194,7 @@ private void flushBuffer(boolean finished)
// skip multipart upload if there would only be one part
if (finished && !multipartUploadStarted) {
PutObjectRequest request = PutObjectRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
Expand Down Expand Up @@ -268,6 +271,7 @@ private CompletedPart uploadPage(byte[] data, int length)
{
if (uploadId.isEmpty()) {
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
Expand All @@ -285,6 +289,7 @@ private CompletedPart uploadPage(byte[] data, int length)

currentPartNumber++;
UploadPartRequest request = UploadPartRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
Expand All @@ -309,6 +314,7 @@ private CompletedPart uploadPage(byte[] data, int length)
private void finishUpload(String uploadId)
{
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
Expand All @@ -322,6 +328,7 @@ private void finishUpload(String uploadId)
private void abortUpload()
{
uploadId.map(id -> AbortMultipartUploadRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.s3;

public final class S3FileSystemConstants
{
public static final String EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY = "internal$s3_aws_access_key";
public static final String EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY = "internal$s3_aws_secret_key";
public static final String EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY = "internal$s3_aws_session_token";

private S3FileSystemConstants() {}
}
36 changes: 36 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
Expand Down Expand Up @@ -374,6 +380,36 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import io.trino.filesystem.TrinoFileSystem;
import io.trino.spi.security.ConnectorIdentity;

import java.util.Map;

public interface IcebergFileSystemFactory
{
TrinoFileSystem create(ConnectorIdentity identity, Map<String, String> fileIoProperties);
}
Loading

0 comments on commit 4c44ca6

Please sign in to comment.