Skip to content

Commit 005c0de

Browse files
committed
[core] Automatically detect changes to the expiration policy.
1 parent bd2618b commit 005c0de

File tree

6 files changed

+97
-10
lines changed

6 files changed

+97
-10
lines changed

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,8 @@ public ExpireSnapshots newExpireSnapshots() {
430430
snapshotManager(),
431431
changelogManager(),
432432
store().newSnapshotDeletion(),
433-
store().newTagManager());
433+
store().newTagManager(),
434+
schemaManager());
434435
}
435436

436437
@Override
@@ -439,7 +440,8 @@ public ExpireSnapshots newExpireChangelog() {
439440
snapshotManager(),
440441
changelogManager(),
441442
tagManager(),
442-
store().newChangelogDeletion());
443+
store().newChangelogDeletion(),
444+
schemaManager());
443445
}
444446

445447
@Override

paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
package org.apache.paimon.table;
2020

2121
import org.apache.paimon.Changelog;
22+
import org.apache.paimon.CoreOptions;
2223
import org.apache.paimon.Snapshot;
2324
import org.apache.paimon.consumer.ConsumerManager;
2425
import org.apache.paimon.manifest.ExpireFileEntry;
2526
import org.apache.paimon.operation.ChangelogDeletion;
2627
import org.apache.paimon.options.ExpireConfig;
28+
import org.apache.paimon.schema.SchemaManager;
29+
import org.apache.paimon.schema.TableSchema;
2730
import org.apache.paimon.utils.ChangelogManager;
2831
import org.apache.paimon.utils.Preconditions;
2932
import org.apache.paimon.utils.SnapshotManager;
@@ -51,14 +54,17 @@ public class ExpireChangelogImpl implements ExpireSnapshots {
5154
private final ConsumerManager consumerManager;
5255
private final ChangelogDeletion changelogDeletion;
5356
private final TagManager tagManager;
57+
private final SchemaManager schemaManager;
5458

5559
private ExpireConfig expireConfig;
60+
private long latestSchemaId;
5661

5762
public ExpireChangelogImpl(
5863
SnapshotManager snapshotManager,
5964
ChangelogManager changelogManager,
6065
TagManager tagManager,
61-
ChangelogDeletion changelogDeletion) {
66+
ChangelogDeletion changelogDeletion,
67+
SchemaManager schemaManager) {
6268
this.snapshotManager = snapshotManager;
6369
this.changelogManager = changelogManager;
6470
this.tagManager = tagManager;
@@ -69,6 +75,8 @@ public ExpireChangelogImpl(
6975
snapshotManager.branch());
7076
this.changelogDeletion = changelogDeletion;
7177
this.expireConfig = ExpireConfig.builder().build();
78+
this.schemaManager = schemaManager;
79+
this.latestSchemaId = this.schemaManager.latest().get().id();
7280
}
7381

7482
@Override
@@ -79,6 +87,12 @@ public ExpireSnapshots config(ExpireConfig expireConfig) {
7987

8088
@Override
8189
public int expire() {
90+
TableSchema latestTableSchema = this.schemaManager.latest().get();
91+
if (this.latestSchemaId != latestTableSchema.id()) {
92+
this.expireConfig = CoreOptions.fromMap(latestTableSchema.options()).expireConfig();
93+
this.latestSchemaId = latestTableSchema.id();
94+
}
95+
8296
int retainMax = expireConfig.getChangelogRetainMax();
8397
int retainMin = expireConfig.getChangelogRetainMin();
8498
int maxDeletes = expireConfig.getChangelogMaxDeletes();

paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
package org.apache.paimon.table;
2020

2121
import org.apache.paimon.Changelog;
22+
import org.apache.paimon.CoreOptions;
2223
import org.apache.paimon.Snapshot;
2324
import org.apache.paimon.annotation.VisibleForTesting;
2425
import org.apache.paimon.consumer.ConsumerManager;
2526
import org.apache.paimon.manifest.ExpireFileEntry;
2627
import org.apache.paimon.operation.SnapshotDeletion;
2728
import org.apache.paimon.options.ExpireConfig;
29+
import org.apache.paimon.schema.SchemaManager;
30+
import org.apache.paimon.schema.TableSchema;
2831
import org.apache.paimon.utils.ChangelogManager;
2932
import org.apache.paimon.utils.Preconditions;
3033
import org.apache.paimon.utils.SnapshotManager;
@@ -55,14 +58,17 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
5558
private final ConsumerManager consumerManager;
5659
private final SnapshotDeletion snapshotDeletion;
5760
private final TagManager tagManager;
61+
private final SchemaManager schemaManager;
5862

5963
private ExpireConfig expireConfig;
64+
private long latestSchemaId;
6065

6166
public ExpireSnapshotsImpl(
6267
SnapshotManager snapshotManager,
6368
ChangelogManager changelogManager,
6469
SnapshotDeletion snapshotDeletion,
65-
TagManager tagManager) {
70+
TagManager tagManager,
71+
SchemaManager schemaManager) {
6672
this.snapshotManager = snapshotManager;
6773
this.changelogManager = changelogManager;
6874
this.consumerManager =
@@ -73,6 +79,8 @@ public ExpireSnapshotsImpl(
7379
this.snapshotDeletion = snapshotDeletion;
7480
this.tagManager = tagManager;
7581
this.expireConfig = ExpireConfig.builder().build();
82+
this.schemaManager = schemaManager;
83+
this.latestSchemaId = this.schemaManager.latest().get().id();
7684
}
7785

7886
@Override
@@ -83,6 +91,12 @@ public ExpireSnapshots config(ExpireConfig expireConfig) {
8391

8492
@Override
8593
public int expire() {
94+
TableSchema latestTableSchema = this.schemaManager.latest().get();
95+
if (this.latestSchemaId != latestTableSchema.id()) {
96+
this.expireConfig = CoreOptions.fromMap(latestTableSchema.options()).expireConfig();
97+
this.latestSchemaId = latestTableSchema.id();
98+
}
99+
86100
snapshotDeletion.setChangelogDecoupled(expireConfig.isChangelogDecoupled());
87101
int retainMax = expireConfig.getSnapshotRetainMax();
88102
int retainMin = expireConfig.getSnapshotRetainMin();

paimon-core/src/test/java/org/apache/paimon/TestFileStore.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long mi
176176
snapshotManager(),
177177
changelogManager(),
178178
newSnapshotDeletion(),
179-
new TagManager(fileIO, options.path()))
179+
new TagManager(fileIO, options.path()),
180+
schemaManager)
180181
.config(
181182
ExpireConfig.builder()
182183
.snapshotRetainMax(numRetainedMax)
@@ -190,7 +191,8 @@ public ExpireSnapshots newExpire(ExpireConfig expireConfig) {
190191
snapshotManager(),
191192
changelogManager(),
192193
newSnapshotDeletion(),
193-
new TagManager(fileIO, options.path()))
194+
new TagManager(fileIO, options.path()),
195+
schemaManager)
194196
.config(expireConfig);
195197
}
196198

@@ -200,7 +202,8 @@ public ExpireSnapshots newChangelogExpire(ExpireConfig config) {
200202
snapshotManager(),
201203
changelogManager(),
202204
new TagManager(fileIO, options.path()),
203-
newChangelogDeletion());
205+
newChangelogDeletion(),
206+
schemaManager);
204207
impl.config(config);
205208
return impl;
206209
}

paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
3939
import org.apache.paimon.options.ExpireConfig;
4040
import org.apache.paimon.schema.Schema;
41+
import org.apache.paimon.schema.SchemaChange;
4142
import org.apache.paimon.schema.SchemaManager;
4243
import org.apache.paimon.table.ExpireSnapshots;
4344
import org.apache.paimon.table.ExpireSnapshotsImpl;
@@ -68,6 +69,8 @@
6869
import java.util.stream.Collectors;
6970

7071
import static java.util.Objects.requireNonNull;
72+
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
73+
import static org.apache.paimon.CoreOptions.SNAPSHOT_TIME_RETAINED;
7174
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
7275
import static org.apache.paimon.utils.HintFileUtils.EARLIEST;
7376
import static org.assertj.core.api.Assertions.assertThat;
@@ -657,6 +660,45 @@ public void testManifestFileSkippingSetFileNotFoundException() throws Exception
657660
assertSnapshot(latestSnapshotId, allData, snapshotPositions);
658661
}
659662

663+
@Test
664+
public void testExpireSnapshotsWithChangedConfig() throws Exception {
665+
ExpireConfig.Builder builder = ExpireConfig.builder();
666+
builder.snapshotRetainMin(10)
667+
.snapshotRetainMax(Integer.MAX_VALUE)
668+
.snapshotTimeRetain(Duration.ofMillis(60 * 1000));
669+
ExpireSnapshots expire = store.newExpire(builder.build());
670+
671+
List<KeyValue> allData = new ArrayList<>();
672+
List<Integer> snapshotPositions = new ArrayList<>();
673+
commit(20, allData, snapshotPositions);
674+
675+
expire.expire();
676+
int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue();
677+
for (int i = 1; i <= latestSnapshotId; i++) {
678+
assertThat((snapshotManager.snapshotExists(i))).isTrue();
679+
}
680+
681+
int maxRetained = 15;
682+
SchemaManager schemaManager = new SchemaManager(fileIO, new Path(tempDir.toUri()));
683+
schemaManager.commitChanges(
684+
SchemaChange.setOption(
685+
SNAPSHOT_NUM_RETAINED_MAX.key(), String.valueOf(maxRetained)));
686+
687+
expire.expire();
688+
for (int i = 1; i <= latestSnapshotId - maxRetained; i++) {
689+
assertThat((snapshotManager.snapshotExists(i))).isFalse();
690+
}
691+
692+
schemaManager.commitChanges(SchemaChange.setOption(SNAPSHOT_TIME_RETAINED.key(), "1s"));
693+
Thread.sleep(1500);
694+
expire.expire();
695+
for (int i = 1; i <= latestSnapshotId - 10; i++) {
696+
assertThat((snapshotManager.snapshotExists(i))).isFalse();
697+
}
698+
699+
store.assertCleaned();
700+
}
701+
660702
private TestFileStore createStore() {
661703
ThreadLocalRandom random = ThreadLocalRandom.current();
662704

paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,11 @@ public void testExpireWithDeletingTags() throws Exception {
683683
// result: exist A & B (because of tag2)
684684
ExpireSnapshots expireSnapshots =
685685
new ExpireSnapshotsImpl(
686-
snapshotManager, changelogManager, store.newSnapshotDeletion(), tagManager);
686+
snapshotManager,
687+
changelogManager,
688+
store.newSnapshotDeletion(),
689+
tagManager,
690+
new SchemaManager(fileIO, store.options().path()));
687691
expireSnapshots
688692
.config(
689693
ExpireConfig.builder()
@@ -755,7 +759,11 @@ public void testDataFileSkippingSetException() throws Exception {
755759

756760
ExpireSnapshots expireSnapshots =
757761
new ExpireSnapshotsImpl(
758-
snapshotManager, changelogManager, snapshotDeletion, tagManager);
762+
snapshotManager,
763+
changelogManager,
764+
snapshotDeletion,
765+
tagManager,
766+
new SchemaManager(fileIO, store.options().path()));
759767
snapshotDeletion.readMergedDataFilesThrowException = true;
760768
expireSnapshots
761769
.config(
@@ -819,7 +827,11 @@ public void testManifestFileSkippingSetException() throws Exception {
819827
store.options().deleteFileThreadNum());
820828
ExpireSnapshots expireSnapshots =
821829
new ExpireSnapshotsImpl(
822-
snapshotManager, changelogManager, snapshotDeletion, tagManager);
830+
snapshotManager,
831+
changelogManager,
832+
snapshotDeletion,
833+
tagManager,
834+
new SchemaManager(fileIO, store.options().path()));
823835
snapshotDeletion.manifestSkippingSetThrowException = true;
824836
expireSnapshots
825837
.config(

0 commit comments

Comments
 (0)