Skip to content

Fix ConflictChecker for READ-APPEND after OPTIMIZE #1305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

sezruby
Copy link
Contributor

@sezruby sezruby commented Aug 1, 2022

Description

Allow read with newly added files with dataChange=false and removed files with dataChange=false by concurrent transactions.
dataChange=false files are created by OPTIMIZE operation, so the content should be same.

The following functions are to check both cases.

  • checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn
  • checkForDeletedFilesAgainstCurrentTxnReadFiles

We can address the issue by

  • fix changedDataAddedFiles to check dataChange flag of AddFile
  • add changedDataRemovedFiles for RemoveFile

How was this patch tested?

Unit tests

Does this PR introduce any user-facing changes?

No, a bug fix when Optimize transaction is committed while concurrent Reads.

Signed-off-by: Eunjin Song <[email protected]>
@tdas
Copy link
Contributor

tdas commented Aug 9, 2022

Can you elaborate on the issue? Which operation throws ConcurrentAppendException? Optimize or Append? And can give an example error and stacktrace? Also, can you explain the semantics of why this change is correct and maintain serializability?

@sezruby
Copy link
Contributor Author

sezruby commented Sep 8, 2022

@tdas Append operation throws ConcurrentAppendException.

io.delta.exceptions.ConcurrentAppendException: Files were added to partition [colC=1] by a concurrent update. Please try the operation again.
Conflicting commit: {"timestamp":1662599111087,"operation":"OPTIMIZE", ...

/**
* Check if the new files added by the already committed transactions should have been read by
* the current transaction.
*/
protected def checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn(): Unit = {
recordTime("checked-appends") {
// Fail if new files have been added that the txn should have read.
val addedFilesToCheckForConflicts = isolationLevel match {
case WriteSerializable if !currentTransactionInfo.metadataChanged =>
winningCommitSummary.changedDataAddedFiles // don't conflict with blind appends
case Serializable | WriteSerializable =>
winningCommitSummary.changedDataAddedFiles ++ winningCommitSummary.blindAppendAddedFiles

This change makes optimized files are not conflicting with read operation for Append operation. Actually it's not conflicting with APPEND but with the scan.

This is the repro:

// session1 - performing OPTIMIZE
(1 to 10).foreach { i => 
  println(spark.sql(s"OPTIMIZE delta.`$dataPath`").collect.toSeq)
}
// session 2 - performing APPEND
(1 to 10).foreach { i => 

spark.read.format("delta").load(dataPath).limit(10).write.mode("append").format("delta").partitionBy("colC").save(dataPath)
}

@sezruby sezruby changed the title Fix ConflictChecker for Append after Optimize conflict Fix ConflictChecker for Read after Optimize Sep 8, 2022
@tdas tdas self-requested a review September 8, 2022 18:17
@tdas
Copy link
Contributor

tdas commented Sep 12, 2022

@sezruby Thank you for the explanation, and the repro. So its a problem not with blind appends but read-then-append.

I have think about it, whether this change has other unintended consequence in other kind of workloads. Actually can you provide a logical argument why this change will not produce unintended consequence in other combination of operations .. like DELETE/UPDATE/MERGE + OPTIMIZE? I know that you have put some tests with delete, but tests can have coverage gaps. So it would good to have a logical convincing argument that this change is safe no matter what operation

@sezruby
Copy link
Contributor Author

sezruby commented Sep 13, 2022

@tdas I found that there's still can be a conflict if concurrent transaction reads a RemoveFile from Optimize.

io.delta.exceptions.ConcurrentDeleteReadException: This transaction attempted to read one or more files that were deleted (for example colC=0/part-00000-cf1e16ab-27b2-4c9c-b5e1-bccfb0e79a58.c000.snappy.parquet in partition [colC=0]) by a concurrent update. Please try the operation again.
Conflicting commit: {"timestamp":1663052151972,"operation":"OPTIMIZE"

* Check if [[RemoveFile]] actions added by already committed transactions conflicts with files
* read by the current transaction.
*/
protected def checkForDeletedFilesAgainstCurrentTxnReadFiles(): Unit = {
recordTime("checked-deletes") {
// Fail if files have been deleted that the txn read.
val readFilePaths = currentTransactionInfo.readFiles.map(
f => f.path -> f.partitionValues).toMap
val deleteReadOverlap = winningCommitSummary.removedFiles
.find(r => readFilePaths.contains(r.path))

I think we can add changedDataRemovedFiles like changedDataAddedFiles and use it for the check function.
We could allow Read to the previous version before OPTIMIZE without any serialization issue if the transaction doesn't update the existing data. With the fix, we could support concurrent OPTIMZIE while performing append/insert only operations which utilizes existing data for appending new rows.

For other type of operation which may have RemoveFile (DELETE UPDATE MERGE OPTIMIZE), it will fail with the following check:

/**
* Check if [[RemoveFile]] actions added by already committed transactions conflicts with
* [[RemoveFile]] actions this transaction is trying to add.
*/
protected def checkForDeletedFilesAgainstCurrentTxnDeletedFiles(): Unit = {
recordTime("checked-2x-deletes") {
// Fail if a file is deleted twice.
val txnDeletes = currentTransactionInfo.actions
.collect { case r: RemoveFile => r }
.map(_.path).toSet
val deleteOverlap = winningCommitSummary.removedFiles.map(_.path).toSet intersect txnDeletes
if (deleteOverlap.nonEmpty) {
throw DeltaErrors.concurrentDeleteDeleteException(
winningCommitSummary.commitInfo, deleteOverlap.head)
}
}
}

Signed-off-by: Eunjin Song <[email protected]>
Signed-off-by: Eunjin Song <[email protected]>
@scottsand-db
Copy link
Collaborator

Wouldn't this PR #1262 help with this?

@sezruby
Copy link
Contributor Author

sezruby commented Sep 15, 2022

@scottsand-db Seems the PR allows to switch isolation level to WriteSerializable. Not sure any other changes will be delivered, but with the current code, the issue still exists:

protected def checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn(): Unit = {
recordTime("checked-appends") {
// Fail if new files have been added that the txn should have read.
val addedFilesToCheckForConflicts = isolationLevel match {
case WriteSerializable if !currentTransactionInfo.metadataChanged =>
winningCommitSummary.changedDataAddedFiles // don't conflict with blind appends
case Serializable | WriteSerializable =>
winningCommitSummary.changedDataAddedFiles ++ winningCommitSummary.blindAppendAddedFiles

it still checks changedDataAddedFiles with WriteSerializable.

Also with this change, we could allow concurrent read-append & optimize for Serializable level.

@sezruby sezruby changed the title Fix ConflictChecker for Read after Optimize Fix ConflictChecker for READ-APPEND after OPTIMIZE Sep 15, 2022
@sezruby
Copy link
Contributor Author

sezruby commented Sep 21, 2022

@tdas Ready-For-Review

@sezruby
Copy link
Contributor Author

sezruby commented Sep 30, 2022

Seems the PR fixes #326

Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks for this change! tests look good.

Can you please update scaladoc for method checkForDeletedFilesAgainstCurrentTxnDeletedFiles explaining that we check for conflicts of remove files regardless of datachange status.

@@ -96,8 +96,9 @@ private[delta] class WinningCommitSummary(val actions: Seq[Action], val commitVe
val changedDataAddedFiles: Seq[AddFile] = if (isBlindAppendOption.getOrElse(false)) {
Seq()
} else {
addedFiles
addedFiles.filter(_.dataChange)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is worth an inline comment explaining why we want this. of course, with the variable name being changedDataAddedFiles, this seems obvious. but, obviously, it wasn't :) that's why you are fixing this now :)

so if you could add an inline comment explaining why this is necessary, e.g. perhaps even with an example, that would be great

}
val changedDataRemovedFiles: Seq[RemoveFile] = removedFiles.filter(_.dataChange)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. inline comment.

@@ -231,16 +232,17 @@ private[delta] class ConflictChecker(
// Fail if files have been deleted that the txn read.
val readFilePaths = currentTransactionInfo.readFiles.map(
f => f.path -> f.partitionValues).toMap
val deleteReadOverlap = winningCommitSummary.removedFiles
val deleteReadOverlap = winningCommitSummary.changedDataRemovedFiles
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the scaladoc for this method. we aren't just checking RemoveFile actions. we are checking RemoveFIle actions that actually change data (e.g. ignore OPTIMIZE)

@scottsand-db
Copy link
Collaborator

@tdas I've taken a first pass. Can you also please take a look?

@@ -231,16 +232,17 @@ private[delta] class ConflictChecker(
// Fail if files have been deleted that the txn read.
val readFilePaths = currentTransactionInfo.readFiles.map(
f => f.path -> f.partitionValues).toMap
val deleteReadOverlap = winningCommitSummary.removedFiles
val deleteReadOverlap = winningCommitSummary.changedDataRemovedFiles
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring the dataChange=false files can cause serializability issues. A txn which is trying to commit relies on the compacted files and so it indirectly relies on new files created by OPTIMIZE.

Consider the following scenario:

Suppose table has 2 files initially: f1, f2.
t0 -------------   TXN-1 Start:   	            Reads F1 and Wants to Write F11
t1 ---             OPTIMIZE-1 Starts and Commits:   Compacts F1, F2 to F3
t2 -------------   TXN-1 Commits:   	            Tries to commit [AddFile-F11, RemoveFile-F1] (This would have failed in master because of
                                                    ConcurrentDeleteReadException around file f1) (This commits successfully with new changes.)

At time t2 when TXN-1 tries to commit, it depends on file f1 and file f1 shouldn't change concurrently. In this case f1 was just rewritten to f3 by the concurrent OPTIMIZE and so this way we have a transitive dependency on f3 which we are missing here.

If we allow such TXN-1 to commit here, we could have issues like this:

Suppose table has 2 files initially: f1, f2.
t0 -------------   TXN-1 Start:   	            Reads F1 and Wants to Write F11
t1 ---             OPTIMIZE-1 Starts and Commits:   Compacts F1, F2 to F3
t1.5 ------        TXN-2 starts and commits:        It deletes F3. i.e. it commits RemoveFile-F3
t2 -------------   TXN-1 Commits:   	            Tries to commit F11 and it works as it doesn't conflict with OPTIMIZE (with new logic) and it doesn't conflict with TXN-2 (TXN-2 just deleted F3 which TXN-1 was not reading directly).

Here at time t2, we will allow TXN-1 to commit successfully (after proposed changes) although TXN-1 can't be serialized after TXN-2. So ideally TXN-1 should have failed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prakharjain09 - this makes sense to me. thanks for reviewing this PR and lending a hand.

@sezruby do you understand + agree with this counter-example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I understand the case. But OPTIMIZE-APPEND is common case for many users.
If there are frequent append jobs to a partition 24/7, we cannot run optimize for the partition.
Can we add additional check for the case?

We have readPredicates used in checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn and we can detect if there is a data change commit in each partition based on it.
I think partition level detection could be feasible. (I was thinking tracking optimized files using BF, but checking partition would be simple/enough)

What do you think about this approach? keep the current change + partition level dataChange='true' detection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scottsand-db @prakharjain09 I updated the PR with additional check & test for the case. Could you have a look at it?
(I haven't addressed comment related change yet)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sezruby Could you give some example scenarios (like the one in this comment) which shows the behavior before your changes and after your changes.

I think the main issue that you are trying to fix is: When Optimize commits, it affects a concurrent Append/Update/Delete/Merge query that is in progress and wants to commit due to checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn check? Is my understanding correct?

Copy link
Contributor Author

@sezruby sezruby Oct 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to support OPTIMIZE - APPEND scenario only.
Other operation should fail as we cannot track modified/removed rows from optimized files and it's acceptable.
Others with RemoveFile will conflict with optimize commit in checkForDeletedFilesAgainstCurrentTxnDeletedFiles and fail.

Suppose table has 2 files initially: f1, f2.
t0 -------------   TXN-1 Start:   	            Reads F1 and Wants to Write F11
t1 ---             OPTIMIZE-1 Starts and Commits:   Compacts F1, F2 to F3
t1.5 ------        TXN-2 starts and commits:        It deletes F3. i.e. it commits RemoveFile-F3
t2 -------------   TXN-1 Commits:   	            Tries to commit F11, but it fails in checkForDeletedFilesAgainstCurrentTxnReadFiles, as winningCommitSummary.changedDataRemovedFiles with readPredicate filter is not empty (RemoveFile-F3).

In current implementation,
checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn checks newly added files using read predicates.
checkForDeletedFilesAgainstCurrentTxnReadFiles checks newly removed files only for readFiles of the current transaction.

I added read predicate check in checkForDeletedFilesAgainstCurrentTxnReadFiles so that we could detect removed files other than readFiles.

Copy link
Contributor Author

@sezruby sezruby Oct 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this (APPEND_start / OPTIMIZE - DELETE / APPEND_end) is only case that we need to check "RemoveFiles" other than readFiles.
Using data filter, there could be other files not in readFiles list (but in the same partition), so the additional check would incur unnecessary conflicts.
(e.g. fileA only contains id=1, fileB only contains id=2 and with data filter id=1, fileB is not in readFiles)

Do you have any other idea how we could support OPTIMIZE-APPEND?
Can we collect (all optimized files after readPredicate) as optimizedReadFiles?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sezruby - things are getting a little complicated here :)

Can we transition this to a more high-level discussion on the PR (not on this code) explaining the higher-level explanation/reasoning for this PR.

Inside of checkForDeletedFilesAgainstCurrentTxnReadFiles you seem to be checking the read predicates ... against a winningCommit's deleted files ....

@prakharjain09 is this valid?

Signed-off-by: Eunjin Song <[email protected]>
@zmeir
Copy link

zmeir commented Aug 14, 2023

@sezruby Any news on this PR? We're running into this issue a lot, having a streaming job that writes to the same partition 24/7, and so we are having a hard time running OPTIMIZE on that partition.

Also, did you see #626 ?

@sezruby
Copy link
Contributor Author

sezruby commented Aug 14, 2023

@sezruby Any news on this PR? We're running into this issue a lot, having a streaming job that writes to the same partition 24/7, and so we are having a hard time running OPTIMIZE on that partition.

Also, did you see #626 ?

Hi, you can cherry-pick this PR to support OPTIMIZE - APPEND.
This PR contains 2 parts

  • changedAddedFiles in newly added file check
    • with this, APPEND operation would succeed, as newly added files by OPTIMIZE are "dataChange=false"
  • removedFiles check with readPredicates
    • master version only checks if there's newly removed files among only "readFiles" of the transaction
    • so, if there's newly deleted file after OPTIMIZE, it cannot be detected when APPEND Fix ConflictChecker for READ-APPEND after OPTIMIZE #1305 (comment)
    • To prevent this, I added the additional check using readPredicates, to see if there's other "newly removed files" matching read predicates.
    • This may cause unnecessary conflicts between delete - append, I think that is very rare.
      • t0 txn0 APPEND: read file2 using data filter
      • t1 txn1 APPEND: add file1 - commit
      • t2 txn2 DELETE: file1 - commit
      • t3 txn0 APEND: commit => no conflict with master version, but with this PR, removed file1 can cause conflict

@zmeir
Copy link

zmeir commented Aug 14, 2023

Thanks @sezruby !

Yeah I figured I can do that, but was hoping I could simplify my build by using an existing version artifact.

rolls up sleeves Oh well...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants