Skip to content

Commit fe37f96

Browse files
committed
[core] Support MULTI_WRITE mode for KeyValueFileStoreWrite when multiple jobs are writing to one table simultaneously
1 parent 0ba4b95 commit fe37f96

File tree

12 files changed

+711
-54
lines changed

12 files changed

+711
-54
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,12 @@
10981098
<td>Integer</td>
10991099
<td>Write batch size for any file format if it supports.</td>
11001100
</tr>
1101+
<tr>
1102+
<td><h5>write.mode</h5></td>
1103+
<td style="word-wrap: break-word;">SINGLE_WRITE</td>
1104+
<td><p>Enum</p></td>
1105+
<td>Specify the write mode for table. When multiple jobs are writing to one table, the MULTI_WRITE mode should be used for the write-and-compact job. In this case, the CompactManager can scan files generated by other users. Notice: This option does not work for dedicated compaction job.<br /><br />Possible values:<ul><li>"SINGLE_WRITE"</li><li>"MULTI_WRITE"</li></ul></td>
1106+
</tr>
11011107
<tr>
11021108
<td><h5>zorder.var-length-contribution</h5></td>
11031109
<td style="word-wrap: break-word;">8</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,16 @@ public class CoreOptions implements Serializable {
444444
"If set to true, compactions and snapshot expiration will be skipped. "
445445
+ "This option is used along with dedicated compact jobs.");
446446

447+
public static final ConfigOption<WriteMode> WRITE_MODE =
448+
key("write.mode")
449+
.enumType(WriteMode.class)
450+
.defaultValue(WriteMode.SINGLE_WRITE)
451+
.withDescription(
452+
"Specify the write mode for table. When multiple jobs are writing to "
453+
+ "one table, the MULTI_WRITE mode should be used for the write-and-compact job. "
454+
+ "In this case, the CompactManager can scan files generated by other users. "
455+
+ "Notice: This option does not work for dedicated compaction job.");
456+
447457
public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
448458
key("source.split.target-size")
449459
.memoryType()
@@ -2140,6 +2150,10 @@ public Optional<Integer> compactionMaxFileNum() {
21402150
return options.getOptional(COMPACTION_MAX_FILE_NUM);
21412151
}
21422152

2153+
public boolean multiWriteModeEnabled() {
2154+
return WriteMode.MULTI_WRITE.equals(options.get(WRITE_MODE));
2155+
}
2156+
21432157
public long dynamicBucketTargetRowNum() {
21442158
return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
21452159
}
@@ -3364,6 +3378,15 @@ public enum LookupCompactMode {
33643378
GENTLE
33653379
}
33663380

3381+
/** The write mode. */
3382+
public enum WriteMode {
3383+
/** Only one write is creating new files for current table. */
3384+
SINGLE_WRITE,
3385+
3386+
/** Multiple writes are creating new files for current table. */
3387+
MULTI_WRITE
3388+
}
3389+
33673390
/** Partition strategy for unaware bucket partitioned append only table. */
33683391
public enum PartitionSinkStrategy {
33693392
NONE,

paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
3131
import org.apache.paimon.operation.AbstractFileStoreWrite;
3232
import org.apache.paimon.operation.BucketSelectConverter;
33+
import org.apache.paimon.operation.DefaultValueAssigner;
34+
import org.apache.paimon.operation.FileStoreScan;
3335
import org.apache.paimon.operation.KeyValueFileStoreScan;
3436
import org.apache.paimon.operation.KeyValueFileStoreWrite;
3537
import org.apache.paimon.operation.MergeFileSplitRead;
@@ -41,6 +43,10 @@
4143
import org.apache.paimon.schema.TableSchema;
4244
import org.apache.paimon.table.BucketMode;
4345
import org.apache.paimon.table.CatalogEnvironment;
46+
import org.apache.paimon.table.source.OrphanFilesScan;
47+
import org.apache.paimon.table.source.SplitGenerator;
48+
import org.apache.paimon.table.source.snapshot.SnapshotReader;
49+
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
4450
import org.apache.paimon.types.RowType;
4551
import org.apache.paimon.utils.FileStorePathFactory;
4652
import org.apache.paimon.utils.KeyComparatorSupplier;
@@ -54,6 +60,7 @@
5460
import java.util.Map;
5561
import java.util.Optional;
5662
import java.util.Set;
63+
import java.util.function.BiConsumer;
5764
import java.util.function.Supplier;
5865

5966
import static org.apache.paimon.predicate.PredicateBuilder.and;
@@ -70,6 +77,8 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
7077
private final RowType valueType;
7178
private final KeyValueFieldsExtractor keyValueFieldsExtractor;
7279
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
80+
private final Supplier<SplitGenerator> splitGeneratorSupplier;
81+
private final Supplier<BiConsumer<FileStoreScan, Predicate>> nonPartitionFilterConsumerSupplier;
7382
private final Supplier<RecordEqualiser> logDedupEqualSupplier;
7483
private final MergeFunctionFactory<KeyValue> mfFactory;
7584

@@ -86,7 +95,9 @@ public KeyValueFileStore(
8695
KeyValueFieldsExtractor keyValueFieldsExtractor,
8796
MergeFunctionFactory<KeyValue> mfFactory,
8897
String tableName,
89-
CatalogEnvironment catalogEnvironment) {
98+
CatalogEnvironment catalogEnvironment,
99+
Supplier<SplitGenerator> splitGeneratorSupplier,
100+
Supplier<BiConsumer<FileStoreScan, Predicate>> nonPartitionFilterConsumerSupplier) {
90101
super(fileIO, schemaManager, schema, tableName, options, partitionType, catalogEnvironment);
91102
this.crossPartitionUpdate = crossPartitionUpdate;
92103
this.bucketKeyType = bucketKeyType;
@@ -95,6 +106,8 @@ public KeyValueFileStore(
95106
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
96107
this.mfFactory = mfFactory;
97108
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
109+
this.splitGeneratorSupplier = splitGeneratorSupplier;
110+
this.nonPartitionFilterConsumerSupplier = nonPartitionFilterConsumerSupplier;
98111
List<String> logDedupIgnoreFields = options.changelogRowDeduplicateIgnoreFields();
99112
this.logDedupEqualSupplier =
100113
options.changelogRowDeduplicate()
@@ -184,6 +197,31 @@ public AbstractFileStoreWrite<KeyValue> newWrite(
184197
options,
185198
tableName);
186199
} else {
200+
OrphanFilesScan orphanFilesScan = null;
201+
if (options.multiWriteModeEnabled()) {
202+
SnapshotReader snapshotReader =
203+
new SnapshotReaderImpl(
204+
newScan(),
205+
schema,
206+
options,
207+
snapshotManager(),
208+
changelogManager(),
209+
splitGeneratorSupplier.get(),
210+
nonPartitionFilterConsumerSupplier.get(),
211+
DefaultValueAssigner.create(schema),
212+
pathFactory(),
213+
tableName,
214+
newIndexFileHandler());
215+
216+
orphanFilesScan =
217+
new OrphanFilesScan.Builder()
218+
.withOptions(options)
219+
.withCurrentUser(commitUser)
220+
.withSnapshotReader(snapshotReader)
221+
.withSnapshotManager(snapshotManager())
222+
.build();
223+
}
224+
187225
return new KeyValueFileStoreWrite(
188226
fileIO,
189227
schemaManager,
@@ -204,7 +242,8 @@ public AbstractFileStoreWrite<KeyValue> newWrite(
204242
deletionVectorsMaintainerFactory,
205243
options,
206244
keyValueFieldsExtractor,
207-
tableName);
245+
tableName,
246+
orphanFilesScan);
208247
}
209248
}
210249

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class MergeTreeCompactManager extends CompactFutureManager {
6565
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
6666
private final boolean lazyGenDeletionFile;
6767
private final boolean needLookup;
68+
private final Supplier<Optional<List<DataFileMeta>>> orphanFilesSupplier;
6869

6970
@Nullable private final RecordLevelExpire recordLevelExpire;
7071

@@ -81,6 +82,36 @@ public MergeTreeCompactManager(
8182
boolean lazyGenDeletionFile,
8283
boolean needLookup,
8384
@Nullable RecordLevelExpire recordLevelExpire) {
85+
this(
86+
executor,
87+
levels,
88+
strategy,
89+
keyComparator,
90+
compactionFileSize,
91+
numSortedRunStopTrigger,
92+
rewriter,
93+
metricsReporter,
94+
dvMaintainer,
95+
lazyGenDeletionFile,
96+
needLookup,
97+
recordLevelExpire,
98+
null);
99+
}
100+
101+
public MergeTreeCompactManager(
102+
ExecutorService executor,
103+
Levels levels,
104+
CompactStrategy strategy,
105+
Comparator<InternalRow> keyComparator,
106+
long compactionFileSize,
107+
int numSortedRunStopTrigger,
108+
CompactRewriter rewriter,
109+
@Nullable CompactionMetrics.Reporter metricsReporter,
110+
@Nullable DeletionVectorsMaintainer dvMaintainer,
111+
boolean lazyGenDeletionFile,
112+
boolean needLookup,
113+
@Nullable RecordLevelExpire recordLevelExpire,
114+
@Nullable Supplier<Optional<List<DataFileMeta>>> orphanFilesSupplier) {
84115
this.executor = executor;
85116
this.levels = levels;
86117
this.strategy = strategy;
@@ -93,6 +124,7 @@ public MergeTreeCompactManager(
93124
this.lazyGenDeletionFile = lazyGenDeletionFile;
94125
this.recordLevelExpire = recordLevelExpire;
95126
this.needLookup = needLookup;
127+
this.orphanFilesSupplier = orphanFilesSupplier;
96128

97129
MetricUtils.safeCall(this::reportMetrics, LOG);
98130
}
@@ -121,6 +153,9 @@ public List<DataFileMeta> allFiles() {
121153

122154
@Override
123155
public void triggerCompaction(boolean fullCompaction) {
156+
if (orphanFilesSupplier != null) {
157+
syncOrphanFiles();
158+
}
124159
Optional<CompactUnit> optionalUnit;
125160
List<LevelSortedRun> runs = levels.levelSortedRuns();
126161
if (fullCompaction) {
@@ -185,6 +220,23 @@ public void triggerCompaction(boolean fullCompaction) {
185220
});
186221
}
187222

223+
@VisibleForTesting
224+
protected void syncOrphanFiles() {
225+
Optional<List<DataFileMeta>> orphanFiles = orphanFilesSupplier.get();
226+
227+
if (!orphanFiles.isPresent()) {
228+
return;
229+
}
230+
231+
if (LOG.isDebugEnabled()) {
232+
LOG.debug(
233+
"Sync {} new orphan files committed by other users.", orphanFiles.get().size());
234+
}
235+
236+
// Add orphan files to level 0
237+
orphanFiles.get().forEach(this::addNewFile);
238+
}
239+
188240
@VisibleForTesting
189241
public Levels levels() {
190242
return levels;

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java

Lines changed: 95 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@
6565
import org.apache.paimon.schema.KeyValueFieldsExtractor;
6666
import org.apache.paimon.schema.SchemaManager;
6767
import org.apache.paimon.schema.TableSchema;
68+
import org.apache.paimon.table.source.DataSplit;
69+
import org.apache.paimon.table.source.OrphanFilesScan;
70+
import org.apache.paimon.table.source.Split;
6871
import org.apache.paimon.types.RowType;
6972
import org.apache.paimon.utils.CommitIncrement;
7073
import org.apache.paimon.utils.FieldsComparator;
@@ -79,9 +82,12 @@
7982

8083
import javax.annotation.Nullable;
8184

85+
import java.util.ArrayList;
86+
import java.util.Collections;
8287
import java.util.Comparator;
8388
import java.util.List;
8489
import java.util.Map;
90+
import java.util.Optional;
8591
import java.util.concurrent.ExecutorService;
8692
import java.util.function.Function;
8793
import java.util.function.Supplier;
@@ -109,6 +115,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
109115
private final RowType partitionType;
110116
private final String commitUser;
111117
@Nullable private final RecordLevelExpire recordLevelExpire;
118+
@Nullable private final OrphanFilesScan orphanFilesScan;
112119
@Nullable private Cache<String, LookupFile> lookupFileCache;
113120

114121
public KeyValueFileStoreWrite(
@@ -131,7 +138,8 @@ public KeyValueFileStoreWrite(
131138
@Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory,
132139
CoreOptions options,
133140
KeyValueFieldsExtractor extractor,
134-
String tableName) {
141+
String tableName,
142+
@Nullable OrphanFilesScan orphanFilesScan) {
135143
super(
136144
snapshotManager,
137145
scan,
@@ -172,6 +180,7 @@ public KeyValueFileStoreWrite(
172180
this.logDedupEqualSupplier = logDedupEqualSupplier;
173181
this.mfFactory = mfFactory;
174182
this.options = options;
183+
this.orphanFilesScan = orphanFilesScan;
175184
}
176185

177186
@Override
@@ -268,22 +277,92 @@ private CompactManager createCompactManager(
268277
userDefinedSeqComparator,
269278
levels,
270279
dvMaintainer);
271-
return new MergeTreeCompactManager(
272-
compactExecutor,
273-
levels,
274-
compactStrategy,
275-
keyComparator,
276-
options.compactionFileSize(true),
277-
options.numSortedRunStopTrigger(),
278-
rewriter,
279-
compactionMetrics == null
280-
? null
281-
: compactionMetrics.createReporter(partition, bucket),
282-
dvMaintainer,
283-
options.prepareCommitWaitCompaction(),
284-
options.needLookup(),
285-
recordLevelExpire);
280+
if (orphanFilesScan != null) {
281+
Supplier<Optional<List<DataFileMeta>>> orphanFilesSupplier =
282+
() -> scanOrphanFiles(orphanFilesScan, snapshotManager, bucket, partition);
283+
return new MergeTreeCompactManager(
284+
compactExecutor,
285+
levels,
286+
compactStrategy,
287+
keyComparator,
288+
options.compactionFileSize(true),
289+
options.numSortedRunStopTrigger(),
290+
rewriter,
291+
compactionMetrics == null
292+
? null
293+
: compactionMetrics.createReporter(partition, bucket),
294+
dvMaintainer,
295+
options.prepareCommitWaitCompaction(),
296+
options.needLookup(),
297+
recordLevelExpire,
298+
orphanFilesSupplier);
299+
} else {
300+
return new MergeTreeCompactManager(
301+
compactExecutor,
302+
levels,
303+
compactStrategy,
304+
keyComparator,
305+
options.compactionFileSize(true),
306+
options.numSortedRunStopTrigger(),
307+
rewriter,
308+
compactionMetrics == null
309+
? null
310+
: compactionMetrics.createReporter(partition, bucket),
311+
dvMaintainer,
312+
options.prepareCommitWaitCompaction(),
313+
options.needLookup(),
314+
recordLevelExpire);
315+
}
316+
}
317+
}
318+
319+
private Optional<List<DataFileMeta>> scanOrphanFiles(
320+
OrphanFilesScan scan,
321+
SnapshotManager snapshotManager,
322+
int bucket,
323+
BinaryRow partition) {
324+
if (scan.checkpoint() == null) {
325+
// scan not initialized
326+
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
327+
if (earliestSnapshotId == null) {
328+
return Optional.empty();
329+
}
330+
331+
// restore scan with the earliest snapshot id
332+
scan.restore(earliestSnapshotId);
286333
}
334+
335+
// set bucket, partition and level scan filter. For pk table, new files committed by
336+
// other users should only be placed in level 0
337+
scan.withBucket(bucket);
338+
scan.withPartitionFilter(Collections.singletonList(partition));
339+
scan.withLevelFilter(level -> level == 0);
340+
341+
List<DataFileMeta> orphanFiles = new ArrayList<>();
342+
while (true) {
343+
// generate scan plan
344+
List<Split> splits = scan.plan().splits();
345+
if (!splits.isEmpty()) {
346+
splits.stream()
347+
.flatMap(split -> ((DataSplit) split).dataFiles().stream())
348+
.forEach(orphanFiles::add);
349+
}
350+
351+
Long nextSnapshotId = scan.checkpoint();
352+
Long latestSnapshotId = snapshotManager.latestSnapshotId();
353+
354+
if (nextSnapshotId == null || latestSnapshotId == null) {
355+
LOG.debug("There is currently no snapshot. Wait for the snapshot generation.");
356+
break;
357+
}
358+
359+
if (nextSnapshotId > latestSnapshotId) {
360+
LOG.debug("NextSnapshotId greater than the latest snapshot. Wait for the next.");
361+
break;
362+
}
363+
}
364+
365+
return Optional.of(orphanFiles);
287366
}
288367

289368
private MergeTreeCompactRewriter createRewriter(

0 commit comments

Comments
 (0)