Skip to content

Commit 7f71477

Browse files
committed
v1
1 parent c0f1ffb commit 7f71477

File tree

10 files changed

+170
-42
lines changed

10 files changed

+170
-42
lines changed

paimon-api/src/main/java/org/apache/paimon/Snapshot.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -496,13 +496,13 @@ public boolean equals(Object o) {
496496
/** Type of changes in this snapshot. */
497497
public enum CommitKind {
498498

499-
/** Changes flushed from the mem table. */
499+
/** New data is appended to the table and no data is removed or deleted. */
500500
APPEND,
501501

502502
/** Changes by compacting existing data files. */
503503
COMPACT,
504504

505-
/** Changes that clear up the whole partition and then add new records. */
505+
/** New data is added to overwrite existing data or just delete existing data. */
506506
OVERWRITE,
507507

508508
/** Collect statistics. */

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 55 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@
8787

8888
import static java.util.Collections.emptyList;
8989
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
90-
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
9190
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
9291
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
9392
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
@@ -297,22 +296,22 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
297296
List<ManifestEntry> appendChangelog = new ArrayList<>();
298297
List<ManifestEntry> compactTableFiles = new ArrayList<>();
299298
List<ManifestEntry> compactChangelog = new ArrayList<>();
300-
List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
299+
List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
301300
List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
302301
collectChanges(
303302
committable.fileCommittables(),
304303
appendTableFiles,
305304
appendChangelog,
306305
compactTableFiles,
307306
compactChangelog,
308-
appendHashIndexFiles,
307+
appendIndexFiles,
309308
compactDvIndexFiles);
310309
try {
311310
List<SimpleFileEntry> appendSimpleEntries = SimpleFileEntry.from(appendTableFiles);
312311
if (!ignoreEmptyCommit
313312
|| !appendTableFiles.isEmpty()
314313
|| !appendChangelog.isEmpty()
315-
|| !appendHashIndexFiles.isEmpty()) {
314+
|| !appendIndexFiles.isEmpty()) {
316315
// Optimization for common path.
317316
// Step 1:
318317
// Read manifest entries from changed partitions here and check for conflicts.
@@ -321,6 +320,18 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
321320
// This optimization is mainly used to decrease the number of times we read from
322321
// files.
323322
latestSnapshot = snapshotManager.latestSnapshot();
323+
boolean hasDelete =
324+
appendSimpleEntries.stream()
325+
.anyMatch(entry -> entry.kind().equals(FileKind.DELETE))
326+
|| appendIndexFiles.stream()
327+
.anyMatch(
328+
entry ->
329+
entry.indexFile()
330+
.indexType()
331+
.equals(DELETION_VECTORS_INDEX));
332+
Snapshot.CommitKind commitKind =
333+
hasDelete ? Snapshot.CommitKind.OVERWRITE : Snapshot.CommitKind.APPEND;
334+
324335
if (latestSnapshot != null && checkAppendFiles) {
325336
// it is possible that some partitions only have compact changes,
326337
// so we need to contain all changes
@@ -331,20 +342,20 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
331342
latestSnapshot.commitUser(),
332343
baseEntries,
333344
appendSimpleEntries,
334-
Snapshot.CommitKind.APPEND);
345+
commitKind);
335346
safeLatestSnapshotId = latestSnapshot.id();
336347
}
337348

338349
attempts +=
339350
tryCommit(
340351
appendTableFiles,
341352
appendChangelog,
342-
appendHashIndexFiles,
353+
appendIndexFiles,
343354
committable.identifier(),
344355
committable.watermark(),
345356
committable.logOffsets(),
346357
committable.properties(),
347-
Snapshot.CommitKind.APPEND,
358+
commitKind,
348359
noConflictCheck(),
349360
null);
350361
generatedSnapshot += 1;
@@ -446,15 +457,15 @@ public int overwrite(
446457
List<ManifestEntry> appendChangelog = new ArrayList<>();
447458
List<ManifestEntry> compactTableFiles = new ArrayList<>();
448459
List<ManifestEntry> compactChangelog = new ArrayList<>();
449-
List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
460+
List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
450461
List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
451462
collectChanges(
452463
committable.fileCommittables(),
453464
appendTableFiles,
454465
appendChangelog,
455466
compactTableFiles,
456467
compactChangelog,
457-
appendHashIndexFiles,
468+
appendIndexFiles,
458469
compactDvIndexFiles);
459470

460471
if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
@@ -515,7 +526,7 @@ public int overwrite(
515526
tryOverwrite(
516527
partitionFilter,
517528
appendTableFiles,
518-
appendHashIndexFiles,
529+
appendIndexFiles,
519530
committable.identifier(),
520531
committable.watermark(),
521532
committable.logOffsets(),
@@ -658,16 +669,26 @@ public FileIO fileIO() {
658669
return fileIO;
659670
}
660671

672+
/**
673+
* @param appendIndexFiles Index file changes which include hash index files or deletion vector
674+
* indexes that are not generated by compaction, such as performing row-level changes on a
675+
* non-pk dv table.
676+
* @param compactDvIndexFiles Deletion vector indexes that are generated by compaction, such as
677+
* the dv generation by pk dv table.
678+
*/
661679
private void collectChanges(
662680
List<CommitMessage> commitMessages,
663681
List<ManifestEntry> appendTableFiles,
664682
List<ManifestEntry> appendChangelog,
665683
List<ManifestEntry> compactTableFiles,
666684
List<ManifestEntry> compactChangelog,
667-
List<IndexManifestEntry> appendHashIndexFiles,
685+
List<IndexManifestEntry> appendIndexFiles,
668686
List<IndexManifestEntry> compactDvIndexFiles) {
669687
for (CommitMessage message : commitMessages) {
670688
CommitMessageImpl commitMessage = (CommitMessageImpl) message;
689+
boolean isCompact =
690+
!commitMessage.compactIncrement().compactBefore().isEmpty()
691+
|| !commitMessage.compactIncrement().compactAfter().isEmpty();
671692
commitMessage
672693
.newFilesIncrement()
673694
.newFiles()
@@ -703,44 +724,41 @@ private void collectChanges(
703724
.newIndexFiles()
704725
.forEach(
705726
f -> {
706-
switch (f.indexType()) {
707-
case HASH_INDEX:
708-
appendHashIndexFiles.add(
709-
new IndexManifestEntry(
710-
FileKind.ADD,
711-
commitMessage.partition(),
712-
commitMessage.bucket(),
713-
f));
714-
break;
715-
case DELETION_VECTORS_INDEX:
716-
compactDvIndexFiles.add(
717-
new IndexManifestEntry(
718-
FileKind.ADD,
719-
commitMessage.partition(),
720-
commitMessage.bucket(),
721-
f));
722-
break;
723-
default:
724-
throw new RuntimeException(
725-
"Unknown index type: " + f.indexType());
727+
if (isCompact && f.indexType().equals(DELETION_VECTORS_INDEX)) {
728+
compactDvIndexFiles.add(
729+
new IndexManifestEntry(
730+
FileKind.ADD,
731+
commitMessage.partition(),
732+
commitMessage.bucket(),
733+
f));
734+
} else {
735+
appendIndexFiles.add(
736+
new IndexManifestEntry(
737+
FileKind.ADD,
738+
commitMessage.partition(),
739+
commitMessage.bucket(),
740+
f));
726741
}
727742
});
728743
commitMessage
729744
.indexIncrement()
730745
.deletedIndexFiles()
731746
.forEach(
732747
f -> {
733-
if (f.indexType().equals(DELETION_VECTORS_INDEX)) {
748+
if (isCompact && f.indexType().equals(DELETION_VECTORS_INDEX)) {
734749
compactDvIndexFiles.add(
735750
new IndexManifestEntry(
736751
FileKind.DELETE,
737752
commitMessage.partition(),
738753
commitMessage.bucket(),
739754
f));
740755
} else {
741-
throw new RuntimeException(
742-
"This index type is not supported to delete: "
743-
+ f.indexType());
756+
appendIndexFiles.add(
757+
new IndexManifestEntry(
758+
FileKind.DELETE,
759+
commitMessage.partition(),
760+
commitMessage.bucket(),
761+
f));
744762
}
745763
});
746764
}
@@ -758,8 +776,8 @@ private void collectChanges(
758776
if (!compactChangelog.isEmpty()) {
759777
msg.add(compactChangelog.size() + " compact Changelogs");
760778
}
761-
if (!appendHashIndexFiles.isEmpty()) {
762-
msg.add(appendHashIndexFiles.size() + " append hash index files");
779+
if (!appendIndexFiles.isEmpty()) {
780+
msg.add(appendIndexFiles.size() + " append index files");
763781
}
764782
if (!compactDvIndexFiles.isEmpty()) {
765783
msg.add(compactDvIndexFiles.size() + " compact dv index files");

paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest
3232

3333
class MergeIntoAppendBucketedTableTest
3434
extends MergeIntoTableTestBase
35+
with MergeIntoAppendTableTest
3536
with PaimonAppendBucketedTableTest {}
3637

3738
class MergeIntoAppendNonBucketedTableTest
3839
extends MergeIntoTableTestBase
40+
with MergeIntoAppendTableTest
3941
with PaimonAppendNonBucketTableTest {}

paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest
3232

3333
class MergeIntoAppendBucketedTableTest
3434
extends MergeIntoTableTestBase
35+
with MergeIntoAppendTableTest
3536
with PaimonAppendBucketedTableTest {}
3637

3738
class MergeIntoAppendNonBucketedTableTest
3839
extends MergeIntoTableTestBase
40+
with MergeIntoAppendTableTest
3941
with PaimonAppendNonBucketTableTest {}

paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest
3434

3535
class MergeIntoAppendBucketedTableTest
3636
extends MergeIntoTableTestBase
37+
with MergeIntoAppendTableTest
3738
with MergeIntoNotMatchedBySourceTest
3839
with PaimonAppendBucketedTableTest {}
3940

4041
class MergeIntoAppendNonBucketedTableTest
4142
extends MergeIntoTableTestBase
43+
with MergeIntoAppendTableTest
4244
with MergeIntoNotMatchedBySourceTest
4345
with PaimonAppendNonBucketTableTest {}

paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest
3434

3535
class MergeIntoAppendBucketedTableTest
3636
extends MergeIntoTableTestBase
37+
with MergeIntoAppendTableTest
3738
with MergeIntoNotMatchedBySourceTest
3839
with PaimonAppendBucketedTableTest {}
3940

4041
class MergeIntoAppendNonBucketedTableTest
4142
extends MergeIntoTableTestBase
43+
with MergeIntoAppendTableTest
4244
with MergeIntoNotMatchedBySourceTest
4345
with PaimonAppendNonBucketTableTest {}

paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest
3434

3535
class MergeIntoAppendBucketedTableTest
3636
extends MergeIntoTableTestBase
37+
with MergeIntoAppendTableTest
3738
with MergeIntoNotMatchedBySourceTest
3839
with PaimonAppendBucketedTableTest {}
3940

4041
class MergeIntoAppendNonBucketedTableTest
4142
extends MergeIntoTableTestBase
43+
with MergeIntoAppendTableTest
4244
with MergeIntoNotMatchedBySourceTest
4345
with PaimonAppendNonBucketTableTest {}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.paimon.spark.sql
2020

21-
import org.apache.paimon.CoreOptions
21+
import org.apache.paimon.{CoreOptions, Snapshot}
2222
import org.apache.paimon.CoreOptions.MergeEngine
2323
import org.apache.paimon.spark.PaimonSparkTestBase
2424

@@ -435,4 +435,40 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
435435
}
436436
}
437437
}
438+
439+
test("Paimon delete: non pk table commit kind") {
440+
for (dvEnabled <- Seq(true, false)) {
441+
withTable("t") {
442+
sql(
443+
s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('deletion-vectors.enabled' = '$dvEnabled')")
444+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)")
445+
446+
sql("DELETE FROM t WHERE id = 1")
447+
checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(2, 2), Row(3, 3)))
448+
val table = loadTable("t")
449+
var latestSnapshot = table.latestSnapshot().get()
450+
assert(latestSnapshot.id == 2)
451+
assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
452+
453+
sql("DELETE FROM t WHERE id = 2")
454+
checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(3, 3)))
455+
latestSnapshot = table.latestSnapshot().get()
456+
assert(latestSnapshot.id == 3)
457+
assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
458+
}
459+
}
460+
}
461+
462+
test("Paimon delete: pk dv table commit kind") {
463+
withTable("t") {
464+
sql(
465+
s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'primary-key' = 'id')")
466+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)")
467+
sql("DELETE FROM t WHERE id = 1")
468+
val table = loadTable("t")
469+
val latestSnapshot = table.latestSnapshot().get()
470+
assert(latestSnapshot.id == 4)
471+
assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
472+
}
473+
}
438474
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
package org.apache.paimon.spark.sql
2020

21-
import org.apache.paimon.spark.{PaimonPrimaryKeyTable, PaimonSparkTestBase, PaimonTableTest}
21+
import org.apache.paimon.Snapshot
22+
import org.apache.paimon.spark.{PaimonAppendTable, PaimonPrimaryKeyTable, PaimonSparkTestBase, PaimonTableTest}
2223

2324
import org.apache.spark.sql.Row
2425

@@ -649,3 +650,30 @@ trait MergeIntoPrimaryKeyTableTest extends PaimonSparkTestBase with PaimonPrimar
649650
}
650651
}
651652
}
653+
654+
trait MergeIntoAppendTableTest extends PaimonSparkTestBase with PaimonAppendTable {
655+
656+
test("Paimon MergeInto: non pk table commit kind") {
657+
withTable("s", "t") {
658+
createTable("s", "id INT, b INT, c INT", Seq("id"))
659+
sql("INSERT INTO s VALUES (1, 1, 1)")
660+
661+
createTable("t", "id INT, b INT, c INT", Seq("id"))
662+
sql("INSERT INTO t VALUES (2, 2, 2)")
663+
664+
sql("""
665+
|MERGE INTO t
666+
|USING s
667+
|ON t.id = s.id
668+
|WHEN NOT MATCHED THEN
669+
|INSERT (id, b, c) VALUES (s.id, s.b, s.c);
670+
|""".stripMargin)
671+
672+
val table = loadTable("t")
673+
val latestSnapshot = table.latestSnapshot().get()
674+
assert(latestSnapshot.id == 2)
675+
// no old data is deleted, so the commit kind is APPEND
676+
assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.APPEND))
677+
}
678+
}
679+
}

0 commit comments

Comments
 (0)