Skip to content

Commit

Permalink
Include deletion vector when filtering active add entries in Delta
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Dec 19, 2024
1 parent d7aba15 commit bb9d648
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ public record DeletionVectorEntry(String storageType, String pathOrInlineDv, Opt
requireNonNull(offset, "offset is null");
}

// https://github.com/delta-io/delta/blob/34f02d8/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DeletionVectorDescriptor.java#L167-L174
public String uniqueId()
{
String uniqueFileId = storageType + pathOrInlineDv;
if (offset.isPresent()) {
return uniqueFileId + "@" + offset;
}
return uniqueFileId;
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,21 +429,23 @@ public static ImmutableList<DeltaLakeColumnMetadata> columnsWithStats(List<Delta

private Stream<AddFileEntry> activeAddEntries(Stream<DeltaLakeTransactionLogEntry> checkpointEntries, List<Transaction> transactions)
{
Map<String, AddFileEntry> activeJsonEntries = new LinkedHashMap<>();
HashSet<String> removedFiles = new HashSet<>();
Map<FileEntryKey, AddFileEntry> activeJsonEntries = new LinkedHashMap<>();
HashSet<FileEntryKey> removedFiles = new HashSet<>();

// The json entries containing the last few entries in the log need to be applied on top of the parquet snapshot:
// - Any files which have been removed need to be excluded
// - Any files with newer add actions need to be updated with the most recent metadata
transactions.forEach(transaction -> {
Map<String, AddFileEntry> addFilesInTransaction = new LinkedHashMap<>();
Set<String> removedFilesInTransaction = new HashSet<>();
Map<FileEntryKey, AddFileEntry> addFilesInTransaction = new LinkedHashMap<>();
Set<FileEntryKey> removedFilesInTransaction = new HashSet<>();
transaction.transactionEntries().forEach(deltaLakeTransactionLogEntry -> {
if (deltaLakeTransactionLogEntry.getAdd() != null) {
addFilesInTransaction.put(deltaLakeTransactionLogEntry.getAdd().getPath(), deltaLakeTransactionLogEntry.getAdd());
AddFileEntry add = deltaLakeTransactionLogEntry.getAdd();
addFilesInTransaction.put(new FileEntryKey(add.getPath(), add.getDeletionVector().map(DeletionVectorEntry::uniqueId)), add);
}
else if (deltaLakeTransactionLogEntry.getRemove() != null) {
removedFilesInTransaction.add(deltaLakeTransactionLogEntry.getRemove().path());
RemoveFileEntry remove = deltaLakeTransactionLogEntry.getRemove();
removedFilesInTransaction.add(new FileEntryKey(remove.path(), remove.deletionVector().map(DeletionVectorEntry::uniqueId)));
}
});

Expand All @@ -456,11 +458,16 @@ else if (deltaLakeTransactionLogEntry.getRemove() != null) {
Stream<AddFileEntry> filteredCheckpointEntries = checkpointEntries
.map(DeltaLakeTransactionLogEntry::getAdd)
.filter(Objects::nonNull)
.filter(addEntry -> !removedFiles.contains(addEntry.getPath()) && !activeJsonEntries.containsKey(addEntry.getPath()));
.filter(addEntry -> {
FileEntryKey key = new FileEntryKey(addEntry.getPath(), addEntry.getDeletionVector().map(DeletionVectorEntry::uniqueId));
return !removedFiles.contains(key) && !activeJsonEntries.containsKey(key);
});

return Stream.concat(filteredCheckpointEntries, activeJsonEntries.values().stream());
}

private record FileEntryKey(String path, Optional<String> deletionVectorId) {}

public Stream<RemoveFileEntry> getRemoveEntries(ConnectorSession session, TableSnapshot tableSnapshot)
{
return getEntries(
Expand Down

0 comments on commit bb9d648

Please sign in to comment.