-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Fix ConflictChecker for READ-APPEND after OPTIMIZE #1305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Eunjin Song <[email protected]>
Can you elaborate on the issue? Which operation throws ConcurrentAppendException? Optimize or Append? And can give an example error and stacktrace? Also, can you explain the semantics of why this change is correct and maintain serializability? |
@tdas Append operation throws ConcurrentAppendException.
delta/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala Lines 177 to 188 in 2041c3b
This change makes optimized files are not conflicting with read operation for Append operation. Actually it's not conflicting with APPEND but with the scan. This is the repro: // session1 - performing OPTIMIZE
(1 to 10).foreach { i =>
println(spark.sql(s"OPTIMIZE delta.`$dataPath`").collect.toSeq)
} // session 2 - performing APPEND
(1 to 10).foreach { i =>
spark.read.format("delta").load(dataPath).limit(10).write.mode("append").format("delta").partitionBy("colC").save(dataPath)
} |
@sezruby Thank you for the explanation, and the repro. So its a problem not with blind appends but read-then-append. I have think about it, whether this change has other unintended consequence in other kind of workloads. Actually can you provide a logical argument why this change will not produce unintended consequence in other combination of operations .. like DELETE/UPDATE/MERGE + OPTIMIZE? I know that you have put some tests with delete, but tests can have coverage gaps. So it would good to have a logical convincing argument that this change is safe no matter what operation |
@tdas I found that there's still can be a conflict if concurrent transaction reads a RemoveFile from Optimize. io.delta.exceptions.ConcurrentDeleteReadException: This transaction attempted to read one or more files that were deleted (for example colC=0/part-00000-cf1e16ab-27b2-4c9c-b5e1-bccfb0e79a58.c000.snappy.parquet in partition [colC=0]) by a concurrent update. Please try the operation again. delta/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala Lines 226 to 235 in d2785aa
I think we can add For other type of operation which may have RemoveFile (DELETE UPDATE MERGE OPTIMIZE), it will fail with the following check: delta/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala Lines 250 to 266 in d2785aa
|
Signed-off-by: Eunjin Song <[email protected]>
273553d
to
e9b429b
Compare
Signed-off-by: Eunjin Song <[email protected]>
e9b429b
to
b80bea2
Compare
Wouldn't this PR #1262 help with this? |
@scottsand-db Seems the PR allows to switch isolation level to WriteSerializable. Not sure any other changes will be delivered, but with the current code, the issue still exists: delta/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala Lines 181 to 188 in d2785aa
it still checks Also with this change, we could allow concurrent read-append & optimize for Serializable level. |
@tdas Ready-For-Review |
Seems the PR fixes #326 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks for this change! tests look good.
Can you please update scaladoc for method checkForDeletedFilesAgainstCurrentTxnDeletedFiles
explaining that we check for conflicts of remove files regardless of datachange status.
@@ -96,8 +96,9 @@ private[delta] class WinningCommitSummary(val actions: Seq[Action], val commitVe | |||
val changedDataAddedFiles: Seq[AddFile] = if (isBlindAppendOption.getOrElse(false)) { | |||
Seq() | |||
} else { | |||
addedFiles | |||
addedFiles.filter(_.dataChange) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is worth an inline comment explaining why we want this. of course, with the variable name being changedDataAddedFiles
, this seems obvious. but, obviously, it wasn't :) that's why you are fixing this now :)
so if you could add an inline comment explaining why this is necessary, e.g. perhaps even with an example, that would be great
} | ||
val changedDataRemovedFiles: Seq[RemoveFile] = removedFiles.filter(_.dataChange) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. inline comment.
@@ -231,16 +232,17 @@ private[delta] class ConflictChecker( | |||
// Fail if files have been deleted that the txn read. | |||
val readFilePaths = currentTransactionInfo.readFiles.map( | |||
f => f.path -> f.partitionValues).toMap | |||
val deleteReadOverlap = winningCommitSummary.removedFiles | |||
val deleteReadOverlap = winningCommitSummary.changedDataRemovedFiles |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update the scaladoc for this method. we aren't just checking RemoveFile actions. we are checking RemoveFIle actions that actually change data (e.g. ignore OPTIMIZE)
@tdas I've taken a first pass. Can you also please take a look? |
@@ -231,16 +232,17 @@ private[delta] class ConflictChecker( | |||
// Fail if files have been deleted that the txn read. | |||
val readFilePaths = currentTransactionInfo.readFiles.map( | |||
f => f.path -> f.partitionValues).toMap | |||
val deleteReadOverlap = winningCommitSummary.removedFiles | |||
val deleteReadOverlap = winningCommitSummary.changedDataRemovedFiles |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignoring the dataChange=false files can cause serializability issues. A txn which is trying to commit relies on the compacted files and so it indirectly relies on new files created by OPTIMIZE.
Consider the following scenario:
Suppose table has 2 files initially: f1, f2.
t0 ------------- TXN-1 Start: Reads F1 and Wants to Write F11
t1 --- OPTIMIZE-1 Starts and Commits: Compacts F1, F2 to F3
t2 ------------- TXN-1 Commits: Tries to commit [AddFile-F11, RemoveFile-F1] (This would have failed in master because of
ConcurrentDeleteReadException around file f1) (This commits successfully with new changes.)
At time t2 when TXN-1 tries to commit, it depends on file f1 and file f1 shouldn't change concurrently. In this case f1 was just rewritten to f3 by the concurrent OPTIMIZE and so this way we have a transitive dependency on f3 which we are missing here.
If we allow such TXN-1 to commit here, we could have issues like this:
Suppose table has 2 files initially: f1, f2.
t0 ------------- TXN-1 Start: Reads F1 and Wants to Write F11
t1 --- OPTIMIZE-1 Starts and Commits: Compacts F1, F2 to F3
t1.5 ------ TXN-2 starts and commits: It deletes F3. i.e. it commits RemoveFile-F3
t2 ------------- TXN-1 Commits: Tries to commit F11 and it works as it doesn't conflict with OPTIMIZE (with new logic) and it doesn't conflict with TXN-2 (TXN-2 just deleted F3 which TXN-1 was not reading directly).
Here at time t2, we will allow TXN-1 to commit successfully (after proposed changes) although TXN-1 can't be serialized after TXN-2. So ideally TXN-1 should have failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prakharjain09 - this makes sense to me. thanks for reviewing this PR and lending a hand.
@sezruby do you understand + agree with this counter-example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I understand the case. But OPTIMIZE-APPEND is common case for many users.
If there are frequent append jobs to a partition 24/7, we cannot run optimize for the partition.
Can we add additional check for the case?
We have readPredicates used in checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn
and we can detect if there is a data change commit in each partition based on it.
I think partition level detection could be feasible. (I was thinking tracking optimized files using BF, but checking partition would be simple/enough)
What do you think about this approach? keep the current change + partition level dataChange='true' detection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scottsand-db @prakharjain09 I updated the PR with additional check & test for the case. Could you have a look at it?
(I haven't addressed comment related change yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sezruby Could you give some example scenarios (like the one in this comment) which shows the behavior before your changes and after your changes.
I think the main issue that you are trying to fix is: When Optimize commits, it affects a concurrent Append/Update/Delete/Merge query that is in progress and wants to commit due to checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn check? Is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to support OPTIMIZE - APPEND scenario only.
Other operation should fail as we cannot track modified/removed rows from optimized files and it's acceptable.
Others with RemoveFile will conflict with optimize commit in checkForDeletedFilesAgainstCurrentTxnDeletedFiles and fail.
Suppose table has 2 files initially: f1, f2.
t0 ------------- TXN-1 Start: Reads F1 and Wants to Write F11
t1 --- OPTIMIZE-1 Starts and Commits: Compacts F1, F2 to F3
t1.5 ------ TXN-2 starts and commits: It deletes F3. i.e. it commits RemoveFile-F3
t2 ------------- TXN-1 Commits: Tries to commit F11, but it fails in checkForDeletedFilesAgainstCurrentTxnReadFiles, as winningCommitSummary.changedDataRemovedFiles with readPredicate filter is not empty (RemoveFile-F3).
In current implementation,
checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn
checks newly added files using read predicates.
checkForDeletedFilesAgainstCurrentTxnReadFiles
checks newly removed files only for readFiles of the current transaction.
I added read predicate check in checkForDeletedFilesAgainstCurrentTxnReadFiles
so that we could detect removed files other than readFiles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this (APPEND_start / OPTIMIZE - DELETE / APPEND_end) is only case that we need to check "RemoveFiles" other than readFiles.
Using data filter, there could be other files not in readFiles list (but in the same partition), so the additional check would incur unnecessary conflicts.
(e.g. fileA only contains id=1, fileB only contains id=2 and with data filter id=1, fileB is not in readFiles)
Do you have any other idea how we could support OPTIMIZE-APPEND?
Can we collect (all optimized files after readPredicate) as optimizedReadFiles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @sezruby - things are getting a little complicated here :)
Can we transition this to a more high-level discussion on the PR (not on this code) explaining the higher-level explanation/reasoning for this PR.
Inside of checkForDeletedFilesAgainstCurrentTxnReadFiles
you seem to be checking the read predicates ... against a winningCommit's deleted files ....
@prakharjain09 is this valid?
Signed-off-by: Eunjin Song <[email protected]>
Hi, you can cherry-pick this PR to support OPTIMIZE - APPEND.
|
Thanks @sezruby ! Yeah I figured I can do that, but was hoping I could simplify my build by using an existing version artifact. rolls up sleeves Oh well... |
Description
Allow read with newly added files with dataChange=false and removed files with dataChange=false by concurrent transactions.
dataChange=false files are created by OPTIMIZE operation, so the content should be same.
The following functions are to check both cases.
We can address the issue by
changedDataAddedFiles
to check dataChange flag of AddFilechangedDataRemovedFiles
for RemoveFileHow was this patch tested?
Unit tests
Does this PR introduce any user-facing changes?
No, a bug fix when Optimize transaction is committed while concurrent Reads.