Skip to content

[core] Fix the commit kind when performing row-level changes on non-pk table #6025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions paimon-api/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,16 @@ public boolean equals(Object o) {
/** Type of changes in this snapshot. */
public enum CommitKind {

/** Changes flushed from the mem table. */
/** New data files are appended to the table and no data file is deleted. */
APPEND,

/** Changes by compacting existing data files. */
COMPACT,

/** Changes that clear up the whole partition and then add new records. */
/**
* New data files are added to overwrite existing data files or just delete existing data
* files.
*/
OVERWRITE,

/** Collect statistics. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@

import static java.util.Collections.emptyList;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
Expand Down Expand Up @@ -297,22 +296,22 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
List<ManifestEntry> appendChangelog = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
appendHashIndexFiles,
appendIndexFiles,
compactDvIndexFiles);
try {
List<SimpleFileEntry> appendSimpleEntries = SimpleFileEntry.from(appendTableFiles);
if (!ignoreEmptyCommit
|| !appendTableFiles.isEmpty()
|| !appendChangelog.isEmpty()
|| !appendHashIndexFiles.isEmpty()) {
|| !appendIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 1:
// Read manifest entries from changed partitions here and check for conflicts.
Expand All @@ -321,6 +320,10 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot();
boolean hasDelete = hasDelete(appendSimpleEntries, appendIndexFiles);
Snapshot.CommitKind commitKind =
hasDelete ? Snapshot.CommitKind.OVERWRITE : Snapshot.CommitKind.APPEND;

if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
Expand All @@ -331,20 +334,20 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
latestSnapshot.commitUser(),
baseEntries,
appendSimpleEntries,
Snapshot.CommitKind.APPEND);
commitKind);
safeLatestSnapshotId = latestSnapshot.id();
}

attempts +=
tryCommit(
appendTableFiles,
appendChangelog,
appendHashIndexFiles,
appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
committable.properties(),
Snapshot.CommitKind.APPEND,
commitKind,
noConflictCheck(),
null);
generatedSnapshot += 1;
Expand Down Expand Up @@ -422,6 +425,21 @@ private void reportCommit(
commitMetrics.reportCommit(commitStats);
}

private boolean hasDelete(
List<SimpleFileEntry> appendSimpleEntries, List<IndexManifestEntry> appendIndexFiles) {
for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) {
if (appendSimpleEntry.kind().equals(FileKind.DELETE)) {
return true;
}
}
for (IndexManifestEntry appendIndexFile : appendIndexFiles) {
if (appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
return true;
}
}
return false;
}

@Override
public int overwrite(
Map<String, String> partition,
Expand All @@ -446,15 +464,15 @@ public int overwrite(
List<ManifestEntry> appendChangelog = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
appendHashIndexFiles,
appendIndexFiles,
compactDvIndexFiles);

if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
Expand Down Expand Up @@ -515,7 +533,7 @@ public int overwrite(
tryOverwrite(
partitionFilter,
appendTableFiles,
appendHashIndexFiles,
appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
Expand Down Expand Up @@ -658,14 +676,22 @@ public FileIO fileIO() {
return fileIO;
}

/**
* @param appendIndexFiles Index file changes which include hash index files or deletion vector
* indexes that are not generated by compaction, such as performing row-level changes on a
* non-pk dv table.
* @param compactDvIndexFiles Deletion vector indexes that are generated by compaction, such as
* the dv generation by pk dv table.
*/
private void collectChanges(
List<CommitMessage> commitMessages,
List<ManifestEntry> appendTableFiles,
List<ManifestEntry> appendChangelog,
List<ManifestEntry> compactTableFiles,
List<ManifestEntry> compactChangelog,
List<IndexManifestEntry> appendHashIndexFiles,
List<IndexManifestEntry> appendIndexFiles,
List<IndexManifestEntry> compactDvIndexFiles) {
boolean isCompact = isCompact(commitMessages);
for (CommitMessage message : commitMessages) {
CommitMessageImpl commitMessage = (CommitMessageImpl) message;
commitMessage
Expand Down Expand Up @@ -703,44 +729,41 @@ private void collectChanges(
.newIndexFiles()
.forEach(
f -> {
switch (f.indexType()) {
case HASH_INDEX:
appendHashIndexFiles.add(
new IndexManifestEntry(
FileKind.ADD,
commitMessage.partition(),
commitMessage.bucket(),
f));
break;
case DELETION_VECTORS_INDEX:
compactDvIndexFiles.add(
new IndexManifestEntry(
FileKind.ADD,
commitMessage.partition(),
commitMessage.bucket(),
f));
break;
default:
throw new RuntimeException(
"Unknown index type: " + f.indexType());
if (isCompact && f.indexType().equals(DELETION_VECTORS_INDEX)) {
compactDvIndexFiles.add(
new IndexManifestEntry(
FileKind.ADD,
commitMessage.partition(),
commitMessage.bucket(),
f));
} else {
appendIndexFiles.add(
new IndexManifestEntry(
FileKind.ADD,
commitMessage.partition(),
commitMessage.bucket(),
f));
}
});
commitMessage
.indexIncrement()
.deletedIndexFiles()
.forEach(
f -> {
if (f.indexType().equals(DELETION_VECTORS_INDEX)) {
if (isCompact && f.indexType().equals(DELETION_VECTORS_INDEX)) {
compactDvIndexFiles.add(
new IndexManifestEntry(
FileKind.DELETE,
commitMessage.partition(),
commitMessage.bucket(),
f));
} else {
throw new RuntimeException(
"This index type is not supported to delete: "
+ f.indexType());
appendIndexFiles.add(
new IndexManifestEntry(
FileKind.DELETE,
commitMessage.partition(),
commitMessage.bucket(),
f));
}
});
}
Expand All @@ -758,8 +781,8 @@ private void collectChanges(
if (!compactChangelog.isEmpty()) {
msg.add(compactChangelog.size() + " compact Changelogs");
}
if (!appendHashIndexFiles.isEmpty()) {
msg.add(appendHashIndexFiles.size() + " append hash index files");
if (!appendIndexFiles.isEmpty()) {
msg.add(appendIndexFiles.size() + " append index files");
}
if (!compactDvIndexFiles.isEmpty()) {
msg.add(compactDvIndexFiles.size() + " compact dv index files");
Expand All @@ -768,6 +791,17 @@ private void collectChanges(
}
}

private boolean isCompact(List<CommitMessage> commitMessages) {
for (CommitMessage message : commitMessages) {
CommitMessageImpl commitMessage = (CommitMessageImpl) message;
if (!commitMessage.compactIncrement().compactBefore().isEmpty()
|| !commitMessage.compactIncrement().compactAfter().isEmpty()) {
return true;
}
}
return false;
}

private ManifestEntry makeEntry(FileKind kind, CommitMessage commitMessage, DataFileMeta file) {
Integer totalBuckets = commitMessage.totalBuckets();
if (totalBuckets == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest

class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with PaimonAppendBucketedTableTest {}

class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with PaimonAppendNonBucketTableTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest

class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with PaimonAppendBucketedTableTest {}

class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with PaimonAppendNonBucketTableTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest

class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendBucketedTableTest {}

class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendNonBucketTableTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest

class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendBucketedTableTest {}

class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendNonBucketTableTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest

class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendBucketedTableTest {}

class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendNonBucketTableTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.CoreOptions
import org.apache.paimon.{CoreOptions, Snapshot}
import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.spark.PaimonSparkTestBase

Expand Down Expand Up @@ -435,4 +435,40 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
}
}
}

test("Paimon delete: non pk table commit kind") {
for (dvEnabled <- Seq(true, false)) {
withTable("t") {
sql(
s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('deletion-vectors.enabled' = '$dvEnabled')")
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)")

sql("DELETE FROM t WHERE id = 1")
checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(2, 2), Row(3, 3)))
val table = loadTable("t")
var latestSnapshot = table.latestSnapshot().get()
assert(latestSnapshot.id == 2)
assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))

sql("DELETE FROM t WHERE id = 2")
checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(3, 3)))
latestSnapshot = table.latestSnapshot().get()
assert(latestSnapshot.id == 3)
assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
}
}
}

test("Paimon delete: pk dv table commit kind") {
withTable("t") {
sql(
s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'primary-key' = 'id')")
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)")
sql("DELETE FROM t WHERE id = 1")
val table = loadTable("t")
val latestSnapshot = table.latestSnapshot().get()
assert(latestSnapshot.id == 4)
assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
}
}
}
Loading
Loading