Skip to content

Commit 0ba4b95

Browse files
authored
[core] Support watermark partition markdone mode (#5284)
1 parent 73cf2aa commit 0ba4b95

File tree

5 files changed

+313
-9
lines changed

5 files changed

+313
-9
lines changed

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@
8686
<td>Duration</td>
8787
<td>Set a time duration when a partition has no new data after this time duration, mark the done status to indicate that the data is ready.</td>
8888
</tr>
89+
<tr>
90+
<td><h5>partition.mark-done-action.mode</h5></td>
91+
<td style="word-wrap: break-word;">process-time</td>
92+
<td><p>Enum</p></td>
93+
<td>How to trigger partition mark done action.<br /><br />Possible values:<ul><li>"process-time": Based on the time of the machine, mark the partition done once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, mark the partition done once the watermark passes period time plus delay.</li></ul></td>
94+
</tr>
8995
<tr>
9096
<td><h5>partition.time-interval</h5></td>
9197
<td style="word-wrap: break-word;">(none)</td>

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,12 @@ public class FlinkConnectorOptions {
356356
.withDescription(
357357
"Allow sink committer and writer operator to be chained together");
358358

359+
public static final ConfigOption<PartitionMarkDoneActionMode> PARTITION_MARK_DONE_MODE =
360+
key("partition.mark-done-action.mode")
361+
.enumType(PartitionMarkDoneActionMode.class)
362+
.defaultValue(PartitionMarkDoneActionMode.PROCESS_TIME)
363+
.withDescription("How to trigger partition mark done action.");
364+
359365
public static final ConfigOption<Duration> PARTITION_IDLE_TIME_TO_DONE =
360366
key("partition.idle-time-to-done")
361367
.durationType()
@@ -544,4 +550,32 @@ public InlineElement getDescription() {
544550
return text(description);
545551
}
546552
}
553+
554+
/** The mode for partition mark done. */
555+
public enum PartitionMarkDoneActionMode implements DescribedEnum {
556+
PROCESS_TIME(
557+
"process-time",
558+
"Based on the time of the machine, mark the partition done once the processing time passes period time plus delay."),
559+
WATERMARK(
560+
"watermark",
561+
"Based on the watermark of the input, mark the partition done once the watermark passes period time plus delay.");
562+
563+
private final String value;
564+
private final String description;
565+
566+
PartitionMarkDoneActionMode(String value, String description) {
567+
this.value = value;
568+
this.description = description;
569+
}
570+
571+
@Override
572+
public String toString() {
573+
return value;
574+
}
575+
576+
@Override
577+
public InlineElement getDescription() {
578+
return text(description);
579+
}
580+
}
547581
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.CoreOptions.MergeEngine;
2323
import org.apache.paimon.data.BinaryRow;
24+
import org.apache.paimon.flink.FlinkConnectorOptions.PartitionMarkDoneActionMode;
2425
import org.apache.paimon.manifest.ManifestCommittable;
2526
import org.apache.paimon.options.Options;
2627
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
@@ -32,24 +33,33 @@
3233
import org.apache.paimon.utils.PartitionPathUtils;
3334

3435
import org.apache.flink.api.common.state.OperatorStateStore;
36+
import org.apache.flink.api.java.tuple.Tuple2;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
3539

3640
import java.io.IOException;
3741
import java.time.Duration;
42+
import java.util.HashMap;
3843
import java.util.HashSet;
3944
import java.util.List;
45+
import java.util.Map;
4046
import java.util.Optional;
4147
import java.util.Set;
4248

4349
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
4450
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
51+
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_MODE;
4552

4653
/** Mark partition done. */
4754
public class PartitionMarkDone implements PartitionListener {
4855

56+
private static final Logger LOG = LoggerFactory.getLogger(PartitionMarkDone.class);
57+
4958
private final InternalRowPartitionComputer partitionComputer;
5059
private final PartitionMarkDoneTrigger trigger;
5160
private final List<PartitionMarkDoneAction> actions;
5261
private final boolean waitCompaction;
62+
private final PartitionMarkDoneActionMode partitionMarkDoneActionMode;
5363

5464
public static Optional<PartitionMarkDone> create(
5565
ClassLoader cl,
@@ -86,7 +96,12 @@ public static Optional<PartitionMarkDone> create(
8696
|| coreOptions.mergeEngine() == MergeEngine.FIRST_ROW);
8797

8898
return Optional.of(
89-
new PartitionMarkDone(partitionComputer, trigger, actions, waitCompaction));
99+
new PartitionMarkDone(
100+
partitionComputer,
101+
trigger,
102+
actions,
103+
waitCompaction,
104+
options.get(PARTITION_MARK_DONE_MODE)));
90105
}
91106

92107
private static boolean disablePartitionMarkDone(
@@ -108,15 +123,25 @@ public PartitionMarkDone(
108123
InternalRowPartitionComputer partitionComputer,
109124
PartitionMarkDoneTrigger trigger,
110125
List<PartitionMarkDoneAction> actions,
111-
boolean waitCompaction) {
126+
boolean waitCompaction,
127+
PartitionMarkDoneActionMode partitionMarkDoneActionMode) {
112128
this.partitionComputer = partitionComputer;
113129
this.trigger = trigger;
114130
this.actions = actions;
115131
this.waitCompaction = waitCompaction;
132+
this.partitionMarkDoneActionMode = partitionMarkDoneActionMode;
116133
}
117134

118135
@Override
119136
public void notifyCommittable(List<ManifestCommittable> committables) {
137+
if (partitionMarkDoneActionMode == PartitionMarkDoneActionMode.WATERMARK) {
138+
markDoneByWatermark(committables);
139+
} else {
140+
markDoneByProcessTime(committables);
141+
}
142+
}
143+
144+
private void markDoneByProcessTime(List<ManifestCommittable> committables) {
120145
Set<BinaryRow> partitions = new HashSet<>();
121146
boolean endInput = false;
122147
for (ManifestCommittable committable : committables) {
@@ -141,6 +166,58 @@ public void notifyCommittable(List<ManifestCommittable> committables) {
141166
markDone(trigger.donePartitions(endInput), actions);
142167
}
143168

169+
private void markDoneByWatermark(List<ManifestCommittable> committables) {
170+
// extract watermarks from committables and update partition watermarks
171+
Tuple2<Map<BinaryRow, Long>, Boolean> extractedWatermarks =
172+
extractPartitionWatermarks(committables);
173+
Map<BinaryRow, Long> partitionWatermarks = extractedWatermarks.f0;
174+
boolean endInput = extractedWatermarks.f1;
175+
Optional<Long> latestWatermark = partitionWatermarks.values().stream().max(Long::compareTo);
176+
177+
if (!latestWatermark.isPresent()) {
178+
LOG.warn("No watermark found in this batch of committables, skip partition mark done.");
179+
return;
180+
}
181+
182+
partitionWatermarks.forEach(
183+
(row, value) -> {
184+
String partition =
185+
PartitionPathUtils.generatePartitionPath(
186+
partitionComputer.generatePartValues(row));
187+
trigger.notifyPartition(partition, value);
188+
});
189+
190+
markDone(trigger.donePartitions(endInput, latestWatermark.get(), true), actions);
191+
}
192+
193+
private Tuple2<Map<BinaryRow, Long>, Boolean> extractPartitionWatermarks(
194+
List<ManifestCommittable> committables) {
195+
boolean endInput = false;
196+
Map<BinaryRow, Long> partitionWatermarks = new HashMap<>();
197+
for (ManifestCommittable committable : committables) {
198+
Long watermark = committable.watermark();
199+
if (watermark != null) {
200+
for (CommitMessage commitMessage : committable.fileCommittables()) {
201+
CommitMessageImpl message = (CommitMessageImpl) commitMessage;
202+
if (waitCompaction
203+
|| !message.indexIncrement().isEmpty()
204+
|| !message.newFilesIncrement().isEmpty()) {
205+
partitionWatermarks.compute(
206+
message.partition(),
207+
(partition, old) ->
208+
old == null ? watermark : Math.max(old, watermark));
209+
}
210+
}
211+
}
212+
213+
if (committable.identifier() == Long.MAX_VALUE) {
214+
endInput = true;
215+
}
216+
}
217+
218+
return Tuple2.of(partitionWatermarks, endInput);
219+
}
220+
144221
public static void markDone(List<String> partitions, List<PartitionMarkDoneAction> actions) {
145222
for (String partition : partitions) {
146223
try {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,16 @@ void notifyPartition(String partition, long currentTimeMillis) {
111111
}
112112

113113
public List<String> donePartitions(boolean endInput) {
114-
return donePartitions(endInput, System.currentTimeMillis());
114+
return donePartitions(endInput, System.currentTimeMillis(), false);
115115
}
116116

117-
@VisibleForTesting
118117
List<String> donePartitions(boolean endInput, long currentTimeMillis) {
118+
return donePartitions(endInput, currentTimeMillis, false);
119+
}
120+
121+
@VisibleForTesting
122+
List<String> donePartitions(
123+
boolean endInput, long currentTimeMillis, boolean watermarkEnabled) {
119124
if (endInput && markDoneWhenEndInput) {
120125
return new ArrayList<>(pendingPartitions.keySet());
121126
}
@@ -131,11 +136,21 @@ List<String> donePartitions(boolean endInput, long currentTimeMillis) {
131136
String partition = entry.getKey();
132137

133138
long lastUpdateTime = entry.getValue();
134-
long partitionStartTime =
135-
extractDateTime(partition)
136-
.atZone(ZoneId.systemDefault())
137-
.toInstant()
138-
.toEpochMilli();
139+
long partitionStartTime;
140+
if (watermarkEnabled) {
141+
// watermark should be compared as UTC time
142+
partitionStartTime =
143+
extractDateTime(partition)
144+
.atZone(ZoneId.of("UTC"))
145+
.toInstant()
146+
.toEpochMilli();
147+
} else {
148+
partitionStartTime =
149+
extractDateTime(partition)
150+
.atZone(ZoneId.systemDefault())
151+
.toInstant()
152+
.toEpochMilli();
153+
}
139154
long partitionEndTime = partitionStartTime + timeInterval;
140155
lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);
141156

0 commit comments

Comments
 (0)