Skip to content

[core] Support MULTI_WRITE mode for KeyValueFileStoreWrite when multiple jobs are writing to one table simultaneously… #5301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,12 @@
<td>Integer</td>
<td>Write batch size for any file format if it supports.</td>
</tr>
<tr>
<td><h5>write.mode</h5></td>
<td style="word-wrap: break-word;">SINGLE_WRITE</td>
<td><p>Enum</p></td>
<td>Specify the write mode for table. When multiple jobs are writing to one table, the MULTI_WRITE mode should be used for the write-and-compact job. In this case, the CompactManager can scan files generated by other users. Notice: This option does not work for dedicated compaction job.<br /><br />Possible values:<ul><li>"SINGLE_WRITE"</li><li>"MULTI_WRITE"</li></ul></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding of this PR :
1: Specify the write mode for table. -> Specifies the write mode for the current write job.
2: the MULTI_WRITE mode should be used for the write-and-compact job that wirte-only=false and other write jobs should be write-only=true.

</tr>
<tr>
<td><h5>zorder.var-length-contribution</h5></td>
<td style="word-wrap: break-word;">8</td>
Expand Down
23 changes: 23 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ public class CoreOptions implements Serializable {
"If set to true, compactions and snapshot expiration will be skipped. "
+ "This option is used along with dedicated compact jobs.");

public static final ConfigOption<WriteMode> WRITE_MODE =
key("write.mode")
.enumType(WriteMode.class)
.defaultValue(WriteMode.SINGLE_WRITE)
.withDescription(
"Specify the write mode for table. When multiple jobs are writing to "
+ "one table, the MULTI_WRITE mode should be used for the write-and-compact job. "
+ "In this case, the CompactManager can scan files generated by other users. "
+ "Notice: This option does not work for dedicated compaction job.");

public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
key("source.split.target-size")
.memoryType()
Expand Down Expand Up @@ -2140,6 +2150,10 @@ public Optional<Integer> compactionMaxFileNum() {
return options.getOptional(COMPACTION_MAX_FILE_NUM);
}

public boolean multiWriteModeEnabled() {
return WriteMode.MULTI_WRITE.equals(options.get(WRITE_MODE));
}

public long dynamicBucketTargetRowNum() {
return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
}
Expand Down Expand Up @@ -3364,6 +3378,15 @@ public enum LookupCompactMode {
GENTLE
}

/** The write mode. */
public enum WriteMode {
/** Only one write is creating new files for current table. */
SINGLE_WRITE,

/** Multiple writes are creating new files for current table. */
MULTI_WRITE
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion : SINGLE_WRITER / MULTI_WRITER will be better.


/** Partition strategy for unaware bucket partitioned append only table. */
public enum PartitionSinkStrategy {
NONE,
Expand Down
43 changes: 41 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.BucketSelectConverter;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
import org.apache.paimon.operation.MergeFileSplitRead;
Expand All @@ -41,6 +43,10 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.source.OrphanFilesScan;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.KeyComparatorSupplier;
Expand All @@ -54,6 +60,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static org.apache.paimon.predicate.PredicateBuilder.and;
Expand All @@ -70,6 +77,8 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
private final RowType valueType;
private final KeyValueFieldsExtractor keyValueFieldsExtractor;
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
private final Supplier<SplitGenerator> splitGeneratorSupplier;
private final Supplier<BiConsumer<FileStoreScan, Predicate>> nonPartitionFilterConsumerSupplier;
private final Supplier<RecordEqualiser> logDedupEqualSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;

Expand All @@ -86,7 +95,9 @@ public KeyValueFileStore(
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory<KeyValue> mfFactory,
String tableName,
CatalogEnvironment catalogEnvironment) {
CatalogEnvironment catalogEnvironment,
Supplier<SplitGenerator> splitGeneratorSupplier,
Supplier<BiConsumer<FileStoreScan, Predicate>> nonPartitionFilterConsumerSupplier) {
super(fileIO, schemaManager, schema, tableName, options, partitionType, catalogEnvironment);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
Expand All @@ -95,6 +106,8 @@ public KeyValueFileStore(
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
this.mfFactory = mfFactory;
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
this.splitGeneratorSupplier = splitGeneratorSupplier;
this.nonPartitionFilterConsumerSupplier = nonPartitionFilterConsumerSupplier;
List<String> logDedupIgnoreFields = options.changelogRowDeduplicateIgnoreFields();
this.logDedupEqualSupplier =
options.changelogRowDeduplicate()
Expand Down Expand Up @@ -184,6 +197,31 @@ public AbstractFileStoreWrite<KeyValue> newWrite(
options,
tableName);
} else {
OrphanFilesScan orphanFilesScan = null;
if (options.multiWriteModeEnabled()) {
SnapshotReader snapshotReader =
new SnapshotReaderImpl(
newScan(),
schema,
options,
snapshotManager(),
changelogManager(),
splitGeneratorSupplier.get(),
nonPartitionFilterConsumerSupplier.get(),
DefaultValueAssigner.create(schema),
pathFactory(),
tableName,
newIndexFileHandler());

orphanFilesScan =
new OrphanFilesScan.Builder()
.withOptions(options)
.withCurrentUser(commitUser)
.withSnapshotReader(snapshotReader)
.withSnapshotManager(snapshotManager())
.build();
}

return new KeyValueFileStoreWrite(
fileIO,
schemaManager,
Expand All @@ -204,7 +242,8 @@ public AbstractFileStoreWrite<KeyValue> newWrite(
deletionVectorsMaintainerFactory,
options,
keyValueFieldsExtractor,
tableName);
tableName,
orphanFilesScan);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class MergeTreeCompactManager extends CompactFutureManager {
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
private final boolean lazyGenDeletionFile;
private final boolean needLookup;
private final Supplier<Optional<List<DataFileMeta>>> orphanFilesSupplier;

@Nullable private final RecordLevelExpire recordLevelExpire;

Expand All @@ -81,6 +82,36 @@ public MergeTreeCompactManager(
boolean lazyGenDeletionFile,
boolean needLookup,
@Nullable RecordLevelExpire recordLevelExpire) {
this(
executor,
levels,
strategy,
keyComparator,
compactionFileSize,
numSortedRunStopTrigger,
rewriter,
metricsReporter,
dvMaintainer,
lazyGenDeletionFile,
needLookup,
recordLevelExpire,
null);
}

public MergeTreeCompactManager(
ExecutorService executor,
Levels levels,
CompactStrategy strategy,
Comparator<InternalRow> keyComparator,
long compactionFileSize,
int numSortedRunStopTrigger,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter,
@Nullable DeletionVectorsMaintainer dvMaintainer,
boolean lazyGenDeletionFile,
boolean needLookup,
@Nullable RecordLevelExpire recordLevelExpire,
@Nullable Supplier<Optional<List<DataFileMeta>>> orphanFilesSupplier) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
Expand All @@ -93,6 +124,7 @@ public MergeTreeCompactManager(
this.lazyGenDeletionFile = lazyGenDeletionFile;
this.recordLevelExpire = recordLevelExpire;
this.needLookup = needLookup;
this.orphanFilesSupplier = orphanFilesSupplier;

MetricUtils.safeCall(this::reportMetrics, LOG);
}
Expand Down Expand Up @@ -121,6 +153,9 @@ public List<DataFileMeta> allFiles() {

@Override
public void triggerCompaction(boolean fullCompaction) {
if (orphanFilesSupplier != null) {
syncOrphanFiles();
}
Optional<CompactUnit> optionalUnit;
List<LevelSortedRun> runs = levels.levelSortedRuns();
if (fullCompaction) {
Expand Down Expand Up @@ -185,6 +220,23 @@ public void triggerCompaction(boolean fullCompaction) {
});
}

@VisibleForTesting
protected void syncOrphanFiles() {
Optional<List<DataFileMeta>> orphanFiles = orphanFilesSupplier.get();

if (!orphanFiles.isPresent()) {
return;
}

if (LOG.isDebugEnabled()) {
LOG.debug(
"Sync {} new orphan files committed by other users.", orphanFiles.get().size());
}

// Add orphan files to level 0
orphanFiles.get().forEach(this::addNewFile);
}

@VisibleForTesting
public Levels levels() {
return levels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.OrphanFilesScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
Expand All @@ -79,9 +82,12 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -109,6 +115,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
private final RowType partitionType;
private final String commitUser;
@Nullable private final RecordLevelExpire recordLevelExpire;
@Nullable private final OrphanFilesScan orphanFilesScan;
@Nullable private Cache<String, LookupFile> lookupFileCache;

public KeyValueFileStoreWrite(
Expand All @@ -131,7 +138,8 @@ public KeyValueFileStoreWrite(
@Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory,
CoreOptions options,
KeyValueFieldsExtractor extractor,
String tableName) {
String tableName,
@Nullable OrphanFilesScan orphanFilesScan) {
super(
snapshotManager,
scan,
Expand Down Expand Up @@ -172,6 +180,7 @@ public KeyValueFileStoreWrite(
this.logDedupEqualSupplier = logDedupEqualSupplier;
this.mfFactory = mfFactory;
this.options = options;
this.orphanFilesScan = orphanFilesScan;
}

@Override
Expand Down Expand Up @@ -268,22 +277,92 @@ private CompactManager createCompactManager(
userDefinedSeqComparator,
levels,
dvMaintainer);
return new MergeTreeCompactManager(
compactExecutor,
levels,
compactStrategy,
keyComparator,
options.compactionFileSize(true),
options.numSortedRunStopTrigger(),
rewriter,
compactionMetrics == null
? null
: compactionMetrics.createReporter(partition, bucket),
dvMaintainer,
options.prepareCommitWaitCompaction(),
options.needLookup(),
recordLevelExpire);
if (orphanFilesScan != null) {
Supplier<Optional<List<DataFileMeta>>> orphanFilesSupplier =
() -> scanOrphanFiles(orphanFilesScan, snapshotManager, bucket, partition);
return new MergeTreeCompactManager(
compactExecutor,
levels,
compactStrategy,
keyComparator,
options.compactionFileSize(true),
options.numSortedRunStopTrigger(),
rewriter,
compactionMetrics == null
? null
: compactionMetrics.createReporter(partition, bucket),
dvMaintainer,
options.prepareCommitWaitCompaction(),
options.needLookup(),
recordLevelExpire,
orphanFilesSupplier);
} else {
return new MergeTreeCompactManager(
compactExecutor,
levels,
compactStrategy,
keyComparator,
options.compactionFileSize(true),
options.numSortedRunStopTrigger(),
rewriter,
compactionMetrics == null
? null
: compactionMetrics.createReporter(partition, bucket),
dvMaintainer,
options.prepareCommitWaitCompaction(),
options.needLookup(),
recordLevelExpire);
}
}
}

private Optional<List<DataFileMeta>> scanOrphanFiles(
OrphanFilesScan scan,
SnapshotManager snapshotManager,
int bucket,
BinaryRow partition) {
if (scan.checkpoint() == null) {
// scan not initialized
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
if (earliestSnapshotId == null) {
return Optional.empty();
}

// restore scan with the earliest snapshot id
scan.restore(earliestSnapshotId);
}

// set bucket, partition and level scan filter. For pk table, new files committed by
// other users should only be placed in level 0
scan.withBucket(bucket);
scan.withPartitionFilter(Collections.singletonList(partition));
scan.withLevelFilter(level -> level == 0);

List<DataFileMeta> orphanFiles = new ArrayList<>();
while (true) {
// generate scan plan
List<Split> splits = scan.plan().splits();
if (!splits.isEmpty()) {
splits.stream()
.flatMap(split -> ((DataSplit) split).dataFiles().stream())
.forEach(orphanFiles::add);
}

Long nextSnapshotId = scan.checkpoint();
Long latestSnapshotId = snapshotManager.latestSnapshotId();

if (nextSnapshotId == null || latestSnapshotId == null) {
LOG.debug("There is currently no snapshot. Wait for the snapshot generation.");
break;
}

if (nextSnapshotId > latestSnapshotId) {
LOG.debug("NextSnapshotId greater than the latest snapshot. Wait for the next.");
break;
}
}

return Optional.of(orphanFiles);
}

private MergeTreeCompactRewriter createRewriter(
Expand Down
Loading
Loading