-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add concurrent writes reconciliation for blind append INSERT
in Delta Lake
#18506
Add concurrent writes reconciliation for blind append INSERT
in Delta Lake
#18506
Conversation
8caeeee
to
fa9b21d
Compare
b3514ff
to
c8a13ef
Compare
b723285
to
882f911
Compare
661f4ba
to
0023308
Compare
0023308
to
bf19346
Compare
/test-with-secrets sha=bf19346564de038c46513dccb0ef1630bbe4f5f6 |
The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/6084460202 |
bf19346
to
507bae0
Compare
@ebyhr pls do run the PR again with secrets. I've discovered a little issue in identifying transaction conflicts in my code. |
507bae0
to
a4d1c72
Compare
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Show resolved
Hide resolved
writeCommitted = true; | ||
writeCheckpointIfNeeded(session, handle.getTableName(), handle.getLocation(), checkpointInterval, commitVersion); | ||
writeCheckpointIfNeeded(session, handle.getTableName(), handle.getLocation(), handle.getMetadataEntry().getCheckpointInterval(), commitVersion); | ||
|
||
if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty() && !dataFileInfos.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current mechanism also prevents concurrent writes to extended statistics. With this new change there is possibility to race condition when writing them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be only one checkpoint per version written - if the process manages to write the x.log transaction log, there's no other process trying to create a checkpoint for the same version.
Comment relates to #16088
This could probably be done on a mechanism similar to what we have on https://github.com/trinodb/trino/blob/675defb40e12734f5d067848a14857ff66c80019/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogSynchronizer.java implementations. Doesn't make sense because we currently update the same stats file, not always a new one.
a4d1c72
to
c70f3a7
Compare
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
c70f3a7
to
3f4a218
Compare
2e39b72
to
f3d4706
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good besides a few nits
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
@@ -2263,6 +2269,63 @@ public void testPartitionFilterIncluded() | |||
} | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd run this a few times
@Disabled | ||
public void testConcurrentInsertsReconciliationForBlindInserts() | ||
{ | ||
// testConcurrentInsertsReconciliation requires safe writes capability to avoid test flakiness |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all the writes are happening in Trino, this should work. The writes on s3 are unsafe if you're also writing from Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all the writes are happening in Trino, this should work.
It works most of the time, but sometimes it fails - it happened in the CI and this is why I've decided to disable the test.
IIRC it was linked to Transaction log locked...
messages.
f3d4706
to
c3b4200
Compare
In the context of retrying to flush a transaction log file from multiple Trino queries it may happen that multiple locks will exist for the same transaction log file.
Add the possibility to perform analysis on the dependencies of the INSERT statement. Specifically one connector could potentially figure out whether concurrent INSERT operations which add data into the same table as the one from which data is being selected collide with each other.
Rebasing on
|
6231236
to
e3c91e5
Compare
optionalCommitInfo = Optional.of(transactionLogEntry.getCommitInfo()); | ||
} | ||
else if (transactionLogEntry.getAdd() != null) { | ||
addedFilesBuilder.add(transactionLogEntry.getAdd()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't need these saved in memory yet, let's not keep them around. You'll need them for updates/deletes but we're not there yet. Same with remove entries. The ones that are singletons not lists are fine with me.
@@ -2446,6 +2543,26 @@ private CommitInfoEntry getCommitInfoEntry( | |||
long createdTime, | |||
String operation, | |||
long readVersion) | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably just inline this one, rather than have two helper methods which just call a constructor
Make sure to get a run with secrets done |
/test-with-secrets sha=e3c91e5238fbe099145d20924dcb13804f645ac9 |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/7893679486 |
Allow committing blind append INSERT operations in a concurrent context by placing these operations right after any other previously concurrently completed write operations. Disallow committing blind insert operations in any of the following cases: - table schema change has been committed in the meantime - table protocol change has been committed in the meantime INSERT operations that contain subqueries reading the same table are subject to concurrent write failures.
e3c91e5
to
efe6102
Compare
/test-with-secrets sha=efe610257a489242ee65cb62b54d8919adfa217f |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/7925295984 |
Description
Allow committing blind append INSERT operations in a concurrent context by
placing these operations right after any other previously concurrently
completed write operations.
Disallow committing blind insert operations in any of the following cases:
INSERT operations that contain subqueries reading the same table are
subject to concurrent write failures.
Relates to #16985
This change primarily affects storages which offer "put-If-Absent" consistency guarantees (ABFS, GCS, ).
In case of AWS S3 it is attempted to offer a "best effort" alternative to having a full-blown locking mechanism, by retrying to persist the transaction log file corresponding to the transaction with jitter.
https://docs.databricks.com/en/optimizations/isolation-level.html#isolation-levels
Additional context and related issues
https://docs.delta.io/latest/concurrency-control.html
https://docs.databricks.com/en/optimizations/isolation-level.html#isolation-levels
Delta Lake OSS Source Code
https://github.com/delta-io/delta/blob/28881f5710e7b0b0bf2d959e36a5b6ebd75c62ae/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala
Delta Lake OSS Code source explained
https://books.japila.pl/delta-lake-internals/OptimisticTransactionImpl/#checkForConflicts
This PR introduces the
trino-spi
method inConnectorMetadata
This is done in order to be able to figure out whether the INSERT is not a blind append operation
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text: