From bb9d6483aebe7182ff201356d522be089ad9a146 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 19 Dec 2024 08:17:10 +0900 Subject: [PATCH] Include deletion vector when filtering active add entries in Delta --- .../transactionlog/DeletionVectorEntry.java | 10 +++++++++ .../transactionlog/TransactionLogAccess.java | 21 ++++++++++++------- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java index cf23159250f1..b1c0c2805787 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java @@ -34,6 +34,16 @@ public record DeletionVectorEntry(String storageType, String pathOrInlineDv, Opt requireNonNull(offset, "offset is null"); } + // https://github.com/delta-io/delta/blob/34f02d8/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DeletionVectorDescriptor.java#L167-L174 + public String uniqueId() + { + String uniqueFileId = storageType + pathOrInlineDv; + if (offset.isPresent()) { + return uniqueFileId + "@" + offset; + } + return uniqueFileId; + } + public long getRetainedSizeInBytes() { return INSTANCE_SIZE diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 27ecdbe7cb58..a2b76fa3e4dc 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -429,21 +429,23 @@ public static ImmutableList columnsWithStats(List activeAddEntries(Stream checkpointEntries, List transactions) { - Map activeJsonEntries = new LinkedHashMap<>(); - HashSet removedFiles = new HashSet<>(); + Map activeJsonEntries = new LinkedHashMap<>(); + HashSet removedFiles = new HashSet<>(); // The json entries containing the last few entries in the log need to be applied on top of the parquet snapshot: // - Any files which have been removed need to be excluded // - Any files with newer add actions need to be updated with the most recent metadata transactions.forEach(transaction -> { - Map addFilesInTransaction = new LinkedHashMap<>(); - Set removedFilesInTransaction = new HashSet<>(); + Map addFilesInTransaction = new LinkedHashMap<>(); + Set removedFilesInTransaction = new HashSet<>(); transaction.transactionEntries().forEach(deltaLakeTransactionLogEntry -> { if (deltaLakeTransactionLogEntry.getAdd() != null) { - addFilesInTransaction.put(deltaLakeTransactionLogEntry.getAdd().getPath(), deltaLakeTransactionLogEntry.getAdd()); + AddFileEntry add = deltaLakeTransactionLogEntry.getAdd(); + addFilesInTransaction.put(new FileEntryKey(add.getPath(), add.getDeletionVector().map(DeletionVectorEntry::uniqueId)), add); } else if (deltaLakeTransactionLogEntry.getRemove() != null) { - removedFilesInTransaction.add(deltaLakeTransactionLogEntry.getRemove().path()); + RemoveFileEntry remove = deltaLakeTransactionLogEntry.getRemove(); + removedFilesInTransaction.add(new FileEntryKey(remove.path(), remove.deletionVector().map(DeletionVectorEntry::uniqueId))); } }); @@ -456,11 +458,16 @@ else if (deltaLakeTransactionLogEntry.getRemove() != null) { Stream filteredCheckpointEntries = checkpointEntries .map(DeltaLakeTransactionLogEntry::getAdd) .filter(Objects::nonNull) - .filter(addEntry -> !removedFiles.contains(addEntry.getPath()) && !activeJsonEntries.containsKey(addEntry.getPath())); + .filter(addEntry -> { + FileEntryKey key = new FileEntryKey(addEntry.getPath(), addEntry.getDeletionVector().map(DeletionVectorEntry::uniqueId)); + return !removedFiles.contains(key) && !activeJsonEntries.containsKey(key); + }); return Stream.concat(filteredCheckpointEntries, activeJsonEntries.values().stream()); } + private record FileEntryKey(String path, Optional deletionVectorId) {} + public Stream getRemoveEntries(ConnectorSession session, TableSnapshot tableSnapshot) { return getEntries(