87
87
88
88
import static java .util .Collections .emptyList ;
89
89
import static org .apache .paimon .deletionvectors .DeletionVectorsIndexFile .DELETION_VECTORS_INDEX ;
90
- import static org .apache .paimon .index .HashIndexFile .HASH_INDEX ;
91
90
import static org .apache .paimon .manifest .ManifestEntry .recordCount ;
92
91
import static org .apache .paimon .manifest .ManifestEntry .recordCountAdd ;
93
92
import static org .apache .paimon .manifest .ManifestEntry .recordCountDelete ;
@@ -297,22 +296,22 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
297
296
List <ManifestEntry > appendChangelog = new ArrayList <>();
298
297
List <ManifestEntry > compactTableFiles = new ArrayList <>();
299
298
List <ManifestEntry > compactChangelog = new ArrayList <>();
300
- List <IndexManifestEntry > appendHashIndexFiles = new ArrayList <>();
299
+ List <IndexManifestEntry > appendIndexFiles = new ArrayList <>();
301
300
List <IndexManifestEntry > compactDvIndexFiles = new ArrayList <>();
302
301
collectChanges (
303
302
committable .fileCommittables (),
304
303
appendTableFiles ,
305
304
appendChangelog ,
306
305
compactTableFiles ,
307
306
compactChangelog ,
308
- appendHashIndexFiles ,
307
+ appendIndexFiles ,
309
308
compactDvIndexFiles );
310
309
try {
311
310
List <SimpleFileEntry > appendSimpleEntries = SimpleFileEntry .from (appendTableFiles );
312
311
if (!ignoreEmptyCommit
313
312
|| !appendTableFiles .isEmpty ()
314
313
|| !appendChangelog .isEmpty ()
315
- || !appendHashIndexFiles .isEmpty ()) {
314
+ || !appendIndexFiles .isEmpty ()) {
316
315
// Optimization for common path.
317
316
// Step 1:
318
317
// Read manifest entries from changed partitions here and check for conflicts.
@@ -321,6 +320,18 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
321
320
// This optimization is mainly used to decrease the number of times we read from
322
321
// files.
323
322
latestSnapshot = snapshotManager .latestSnapshot ();
323
+ boolean hasDelete =
324
+ appendSimpleEntries .stream ()
325
+ .anyMatch (entry -> entry .kind ().equals (FileKind .DELETE ))
326
+ || appendIndexFiles .stream ()
327
+ .anyMatch (
328
+ entry ->
329
+ entry .indexFile ()
330
+ .indexType ()
331
+ .equals (DELETION_VECTORS_INDEX ));
332
+ Snapshot .CommitKind commitKind =
333
+ hasDelete ? Snapshot .CommitKind .OVERWRITE : Snapshot .CommitKind .APPEND ;
334
+
324
335
if (latestSnapshot != null && checkAppendFiles ) {
325
336
// it is possible that some partitions only have compact changes,
326
337
// so we need to contain all changes
@@ -331,20 +342,20 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
331
342
latestSnapshot .commitUser (),
332
343
baseEntries ,
333
344
appendSimpleEntries ,
334
- Snapshot . CommitKind . APPEND );
345
+ commitKind );
335
346
safeLatestSnapshotId = latestSnapshot .id ();
336
347
}
337
348
338
349
attempts +=
339
350
tryCommit (
340
351
appendTableFiles ,
341
352
appendChangelog ,
342
- appendHashIndexFiles ,
353
+ appendIndexFiles ,
343
354
committable .identifier (),
344
355
committable .watermark (),
345
356
committable .logOffsets (),
346
357
committable .properties (),
347
- Snapshot . CommitKind . APPEND ,
358
+ commitKind ,
348
359
noConflictCheck (),
349
360
null );
350
361
generatedSnapshot += 1 ;
@@ -446,15 +457,15 @@ public int overwrite(
446
457
List <ManifestEntry > appendChangelog = new ArrayList <>();
447
458
List <ManifestEntry > compactTableFiles = new ArrayList <>();
448
459
List <ManifestEntry > compactChangelog = new ArrayList <>();
449
- List <IndexManifestEntry > appendHashIndexFiles = new ArrayList <>();
460
+ List <IndexManifestEntry > appendIndexFiles = new ArrayList <>();
450
461
List <IndexManifestEntry > compactDvIndexFiles = new ArrayList <>();
451
462
collectChanges (
452
463
committable .fileCommittables (),
453
464
appendTableFiles ,
454
465
appendChangelog ,
455
466
compactTableFiles ,
456
467
compactChangelog ,
457
- appendHashIndexFiles ,
468
+ appendIndexFiles ,
458
469
compactDvIndexFiles );
459
470
460
471
if (!appendChangelog .isEmpty () || !compactChangelog .isEmpty ()) {
@@ -515,7 +526,7 @@ public int overwrite(
515
526
tryOverwrite (
516
527
partitionFilter ,
517
528
appendTableFiles ,
518
- appendHashIndexFiles ,
529
+ appendIndexFiles ,
519
530
committable .identifier (),
520
531
committable .watermark (),
521
532
committable .logOffsets (),
@@ -658,16 +669,26 @@ public FileIO fileIO() {
658
669
return fileIO ;
659
670
}
660
671
672
+ /**
673
+ * @param appendIndexFiles Index file changes which include hash index files or deletion vectors
674
+ * that are not generated by compaction, such as non-pk dv table with row level
675
+ * modification.
676
+ * @param compactDvIndexFiles Deletion vectors that are generated by compaction, such as pk dv
677
+ * table.
678
+ */
661
679
private void collectChanges (
662
680
List <CommitMessage > commitMessages ,
663
681
List <ManifestEntry > appendTableFiles ,
664
682
List <ManifestEntry > appendChangelog ,
665
683
List <ManifestEntry > compactTableFiles ,
666
684
List <ManifestEntry > compactChangelog ,
667
- List <IndexManifestEntry > appendHashIndexFiles ,
685
+ List <IndexManifestEntry > appendIndexFiles ,
668
686
List <IndexManifestEntry > compactDvIndexFiles ) {
669
687
for (CommitMessage message : commitMessages ) {
670
688
CommitMessageImpl commitMessage = (CommitMessageImpl ) message ;
689
+ boolean isCompact =
690
+ !commitMessage .compactIncrement ().compactBefore ().isEmpty ()
691
+ || !commitMessage .compactIncrement ().compactAfter ().isEmpty ();
671
692
commitMessage
672
693
.newFilesIncrement ()
673
694
.newFiles ()
@@ -703,44 +724,41 @@ private void collectChanges(
703
724
.newIndexFiles ()
704
725
.forEach (
705
726
f -> {
706
- switch (f .indexType ()) {
707
- case HASH_INDEX :
708
- appendHashIndexFiles .add (
709
- new IndexManifestEntry (
710
- FileKind .ADD ,
711
- commitMessage .partition (),
712
- commitMessage .bucket (),
713
- f ));
714
- break ;
715
- case DELETION_VECTORS_INDEX :
716
- compactDvIndexFiles .add (
717
- new IndexManifestEntry (
718
- FileKind .ADD ,
719
- commitMessage .partition (),
720
- commitMessage .bucket (),
721
- f ));
722
- break ;
723
- default :
724
- throw new RuntimeException (
725
- "Unknown index type: " + f .indexType ());
727
+ if (isCompact && f .indexType ().equals (DELETION_VECTORS_INDEX )) {
728
+ compactDvIndexFiles .add (
729
+ new IndexManifestEntry (
730
+ FileKind .ADD ,
731
+ commitMessage .partition (),
732
+ commitMessage .bucket (),
733
+ f ));
734
+ } else {
735
+ appendIndexFiles .add (
736
+ new IndexManifestEntry (
737
+ FileKind .ADD ,
738
+ commitMessage .partition (),
739
+ commitMessage .bucket (),
740
+ f ));
726
741
}
727
742
});
728
743
commitMessage
729
744
.indexIncrement ()
730
745
.deletedIndexFiles ()
731
746
.forEach (
732
747
f -> {
733
- if (f .indexType ().equals (DELETION_VECTORS_INDEX )) {
748
+ if (isCompact && f .indexType ().equals (DELETION_VECTORS_INDEX )) {
734
749
compactDvIndexFiles .add (
735
750
new IndexManifestEntry (
736
751
FileKind .DELETE ,
737
752
commitMessage .partition (),
738
753
commitMessage .bucket (),
739
754
f ));
740
755
} else {
741
- throw new RuntimeException (
742
- "This index type is not supported to delete: "
743
- + f .indexType ());
756
+ appendIndexFiles .add (
757
+ new IndexManifestEntry (
758
+ FileKind .DELETE ,
759
+ commitMessage .partition (),
760
+ commitMessage .bucket (),
761
+ f ));
744
762
}
745
763
});
746
764
}
@@ -758,8 +776,8 @@ private void collectChanges(
758
776
if (!compactChangelog .isEmpty ()) {
759
777
msg .add (compactChangelog .size () + " compact Changelogs" );
760
778
}
761
- if (!appendHashIndexFiles .isEmpty ()) {
762
- msg .add (appendHashIndexFiles .size () + " append hash index files" );
779
+ if (!appendIndexFiles .isEmpty ()) {
780
+ msg .add (appendIndexFiles .size () + " append index files" );
763
781
}
764
782
if (!compactDvIndexFiles .isEmpty ()) {
765
783
msg .add (compactDvIndexFiles .size () + " compact dv index files" );
0 commit comments