Skip to content

Commit a431454

Browse files
committed
[core] Automatically detect changes to the expiration policy.
1 parent 005c0de commit a431454

File tree

7 files changed

+54
-20
lines changed

7 files changed

+54
-20
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,12 @@ public InlineElement getDescription() {
443443
.withDescription(
444444
"The maximum number of snapshots allowed to expire at a time.");
445445

446+
public static final ConfigOption<Boolean> DETECT_EXPIRATION_SETTING_ENABLED =
447+
key("detect-expiration-setting.enabled")
448+
.booleanType()
449+
.defaultValue(false)
450+
.withDescription("Whether detect changes to expiration settings.");
451+
446452
public static final ConfigOption<Boolean> SNAPSHOT_CLEAN_EMPTY_DIRECTORIES =
447453
key("snapshot.clean-empty-directories")
448454
.booleanType()
@@ -2202,6 +2208,10 @@ public int snapshotExpireLimit() {
22022208
return options.get(SNAPSHOT_EXPIRE_LIMIT);
22032209
}
22042210

2211+
public boolean detectExpirationSettingEnabled() {
2212+
return options.get(DETECT_EXPIRATION_SETTING_ENABLED);
2213+
}
2214+
22052215
public boolean cleanEmptyDirectories() {
22062216
return options.get(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES);
22072217
}

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,8 @@ public ExpireSnapshots newExpireSnapshots() {
431431
changelogManager(),
432432
store().newSnapshotDeletion(),
433433
store().newTagManager(),
434-
schemaManager());
434+
schemaManager(),
435+
coreOptions().detectExpirationSettingEnabled());
435436
}
436437

437438
@Override
@@ -441,7 +442,8 @@ public ExpireSnapshots newExpireChangelog() {
441442
changelogManager(),
442443
tagManager(),
443444
store().newChangelogDeletion(),
444-
schemaManager());
445+
schemaManager(),
446+
coreOptions().detectExpirationSettingEnabled());
445447
}
446448

447449
@Override

paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class ExpireChangelogImpl implements ExpireSnapshots {
5555
private final ChangelogDeletion changelogDeletion;
5656
private final TagManager tagManager;
5757
private final SchemaManager schemaManager;
58+
private final boolean detectExpirationSettingEnabled;
5859

5960
private ExpireConfig expireConfig;
6061
private long latestSchemaId;
@@ -64,7 +65,8 @@ public ExpireChangelogImpl(
6465
ChangelogManager changelogManager,
6566
TagManager tagManager,
6667
ChangelogDeletion changelogDeletion,
67-
SchemaManager schemaManager) {
68+
SchemaManager schemaManager,
69+
boolean detectExpirationSettingEnabled) {
6870
this.snapshotManager = snapshotManager;
6971
this.changelogManager = changelogManager;
7072
this.tagManager = tagManager;
@@ -76,7 +78,10 @@ public ExpireChangelogImpl(
7678
this.changelogDeletion = changelogDeletion;
7779
this.expireConfig = ExpireConfig.builder().build();
7880
this.schemaManager = schemaManager;
79-
this.latestSchemaId = this.schemaManager.latest().get().id();
81+
this.detectExpirationSettingEnabled = detectExpirationSettingEnabled;
82+
if (this.detectExpirationSettingEnabled) {
83+
this.latestSchemaId = this.schemaManager.latest().get().id();
84+
}
8085
}
8186

8287
@Override
@@ -87,10 +92,12 @@ public ExpireSnapshots config(ExpireConfig expireConfig) {
8792

8893
@Override
8994
public int expire() {
90-
TableSchema latestTableSchema = this.schemaManager.latest().get();
91-
if (this.latestSchemaId != latestTableSchema.id()) {
92-
this.expireConfig = CoreOptions.fromMap(latestTableSchema.options()).expireConfig();
93-
this.latestSchemaId = latestTableSchema.id();
95+
if (this.detectExpirationSettingEnabled) {
96+
TableSchema latestTableSchema = this.schemaManager.latest().get();
97+
if (this.latestSchemaId != latestTableSchema.id()) {
98+
this.expireConfig = CoreOptions.fromMap(latestTableSchema.options()).expireConfig();
99+
this.latestSchemaId = latestTableSchema.id();
100+
}
94101
}
95102

96103
int retainMax = expireConfig.getChangelogRetainMax();

paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
5959
private final SnapshotDeletion snapshotDeletion;
6060
private final TagManager tagManager;
6161
private final SchemaManager schemaManager;
62+
private final boolean detectExpirationSettingEnabled;
6263

6364
private ExpireConfig expireConfig;
6465
private long latestSchemaId;
@@ -68,7 +69,8 @@ public ExpireSnapshotsImpl(
6869
ChangelogManager changelogManager,
6970
SnapshotDeletion snapshotDeletion,
7071
TagManager tagManager,
71-
SchemaManager schemaManager) {
72+
SchemaManager schemaManager,
73+
boolean detectExpirationSettingEnabled) {
7274
this.snapshotManager = snapshotManager;
7375
this.changelogManager = changelogManager;
7476
this.consumerManager =
@@ -80,7 +82,10 @@ public ExpireSnapshotsImpl(
8082
this.tagManager = tagManager;
8183
this.expireConfig = ExpireConfig.builder().build();
8284
this.schemaManager = schemaManager;
83-
this.latestSchemaId = this.schemaManager.latest().get().id();
85+
this.detectExpirationSettingEnabled = detectExpirationSettingEnabled;
86+
if (this.detectExpirationSettingEnabled) {
87+
this.latestSchemaId = this.schemaManager.latest().get().id();
88+
}
8489
}
8590

8691
@Override
@@ -91,10 +96,12 @@ public ExpireSnapshots config(ExpireConfig expireConfig) {
9196

9297
@Override
9398
public int expire() {
94-
TableSchema latestTableSchema = this.schemaManager.latest().get();
95-
if (this.latestSchemaId != latestTableSchema.id()) {
96-
this.expireConfig = CoreOptions.fromMap(latestTableSchema.options()).expireConfig();
97-
this.latestSchemaId = latestTableSchema.id();
99+
if (this.detectExpirationSettingEnabled) {
100+
TableSchema latestTableSchema = this.schemaManager.latest().get();
101+
if (this.latestSchemaId != latestTableSchema.id()) {
102+
this.expireConfig = CoreOptions.fromMap(latestTableSchema.options()).expireConfig();
103+
this.latestSchemaId = latestTableSchema.id();
104+
}
98105
}
99106

100107
snapshotDeletion.setChangelogDecoupled(expireConfig.isChangelogDecoupled());

paimon-core/src/test/java/org/apache/paimon/TestFileStore.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long mi
177177
changelogManager(),
178178
newSnapshotDeletion(),
179179
new TagManager(fileIO, options.path()),
180-
schemaManager)
180+
schemaManager,
181+
options.detectExpirationSettingEnabled())
181182
.config(
182183
ExpireConfig.builder()
183184
.snapshotRetainMax(numRetainedMax)
@@ -192,7 +193,8 @@ public ExpireSnapshots newExpire(ExpireConfig expireConfig) {
192193
changelogManager(),
193194
newSnapshotDeletion(),
194195
new TagManager(fileIO, options.path()),
195-
schemaManager)
196+
schemaManager,
197+
options.detectExpirationSettingEnabled())
196198
.config(expireConfig);
197199
}
198200

@@ -203,7 +205,8 @@ public ExpireSnapshots newChangelogExpire(ExpireConfig config) {
203205
changelogManager(),
204206
new TagManager(fileIO, options.path()),
205207
newChangelogDeletion(),
206-
schemaManager);
208+
schemaManager,
209+
options.detectExpirationSettingEnabled());
207210
impl.config(config);
208211
return impl;
209212
}

paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,8 @@ public void testManifestFileSkippingSetFileNotFoundException() throws Exception
662662

663663
@Test
664664
public void testExpireSnapshotsWithChangedConfig() throws Exception {
665+
store.options().toConfiguration().set(CoreOptions.DETECT_EXPIRATION_SETTING_ENABLED, true);
666+
665667
ExpireConfig.Builder builder = ExpireConfig.builder();
666668
builder.snapshotRetainMin(10)
667669
.snapshotRetainMax(Integer.MAX_VALUE)

paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,8 @@ public void testExpireWithDeletingTags() throws Exception {
687687
changelogManager,
688688
store.newSnapshotDeletion(),
689689
tagManager,
690-
new SchemaManager(fileIO, store.options().path()));
690+
new SchemaManager(fileIO, store.options().path()),
691+
false);
691692
expireSnapshots
692693
.config(
693694
ExpireConfig.builder()
@@ -763,7 +764,8 @@ public void testDataFileSkippingSetException() throws Exception {
763764
changelogManager,
764765
snapshotDeletion,
765766
tagManager,
766-
new SchemaManager(fileIO, store.options().path()));
767+
new SchemaManager(fileIO, store.options().path()),
768+
false);
767769
snapshotDeletion.readMergedDataFilesThrowException = true;
768770
expireSnapshots
769771
.config(
@@ -831,7 +833,8 @@ public void testManifestFileSkippingSetException() throws Exception {
831833
changelogManager,
832834
snapshotDeletion,
833835
tagManager,
834-
new SchemaManager(fileIO, store.options().path()));
836+
new SchemaManager(fileIO, store.options().path()),
837+
false);
835838
snapshotDeletion.manifestSkippingSetThrowException = true;
836839
expireSnapshots
837840
.config(

0 commit comments

Comments
 (0)