Skip to content

Commit 39316de

Browse files
committed
adjust
1 parent c4b47a2 commit 39316de

File tree

8 files changed

+56
-255
lines changed

8 files changed

+56
-255
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,12 @@
996996
<td>Boolean</td>
997997
<td>Whether to read the changes from overwrite in streaming mode. Cannot be set to true when changelog producer is full-compaction or lookup because it will read duplicated changes.</td>
998998
</tr>
999+
<tr>
1000+
<td><h5>metrics.file-io.enabled</h5></td>
1001+
<td style="word-wrap: break-word;">false</td>
1002+
<td>Boolean</td>
1003+
<td>Whether to enable MetricsFileIO metrics statistics.</td>
1004+
</tr>
9991005
<tr>
10001006
<td><h5>streaming.read.snapshot.delay</h5></td>
10011007
<td style="word-wrap: break-word;">(none)</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,12 @@ public class CoreOptions implements Serializable {
796796
+ "Note: Scale-up this parameter will increase memory usage while scanning manifest files. "
797797
+ "We can consider downsize it when we encounter an out of memory exception while scanning");
798798

799+
public static final ConfigOption<Boolean> METRICS_FILE_IO_ENABLED =
800+
key("metrics.file-io.enabled")
801+
.booleanType()
802+
.defaultValue(false)
803+
.withDescription("Whether to enable MetricsFileIO metrics statistics.");
804+
799805
public static final ConfigOption<Duration> STREAMING_READ_SNAPSHOT_DELAY =
800806
key("streaming.read.snapshot.delay")
801807
.durationType()
@@ -2372,6 +2378,10 @@ public Integer scanManifestParallelism() {
23722378
return options.get(SCAN_MANIFEST_PARALLELISM);
23732379
}
23742380

2381+
public boolean isMetricsFileIOEnabled() {
2382+
return options.get(METRICS_FILE_IO_ENABLED);
2383+
}
2384+
23752385
public Duration streamingReadDelay() {
23762386
return options.get(STREAMING_READ_SNAPSHOT_DELAY);
23772387
}

paimon-common/src/main/java/org/apache/paimon/fs/FileIOWrapper.java renamed to paimon-common/src/main/java/org/apache/paimon/fs/MetricsFileIO.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,17 @@
2929
*
3030
* <p>It allows users to monitor and track file I/O operations (e.g., read, write, delete, rename).
3131
*/
32-
public class FileIOWrapper implements FileIO {
32+
public class MetricsFileIO implements FileIO {
3333

3434
protected final FileIO fileIO;
3535
protected InputMetrics inputMetrics = null;
3636
protected OutputMetrics outputMetrics = null;
3737

38-
public FileIOWrapper(FileIO fileIO) {
38+
public MetricsFileIO(FileIO fileIO) {
3939
this.fileIO = fileIO;
4040
}
4141

42-
public FileIOWrapper withMetrics(InputMetrics inputMetrics, OutputMetrics outputMetrics) {
42+
public MetricsFileIO withMetrics(InputMetrics inputMetrics, OutputMetrics outputMetrics) {
4343
this.inputMetrics = inputMetrics;
4444
this.outputMetrics = outputMetrics;
4545
return this;
@@ -69,49 +69,31 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws
6969

7070
@Override
7171
public FileStatus getFileStatus(Path path) throws IOException {
72-
if (inputMetrics != null) {
73-
inputMetrics.incrementGetFileStatusOperationCount();
74-
}
7572
return fileIO.getFileStatus(path);
7673
}
7774

7875
@Override
7976
public FileStatus[] listStatus(Path path) throws IOException {
80-
if (inputMetrics != null) {
81-
inputMetrics.incrementListStatusOperationCount();
82-
}
8377
return fileIO.listStatus(path);
8478
}
8579

8680
@Override
8781
public boolean exists(Path path) throws IOException {
88-
if (inputMetrics != null) {
89-
inputMetrics.incrementExistsOperationCount();
90-
}
9182
return fileIO.exists(path);
9283
}
9384

9485
@Override
9586
public boolean delete(Path path, boolean recursive) throws IOException {
96-
if (outputMetrics != null) {
97-
outputMetrics.incrementDeleteOperationCount();
98-
}
9987
return fileIO.delete(path, recursive);
10088
}
10189

10290
@Override
10391
public boolean mkdirs(Path path) throws IOException {
104-
if (outputMetrics != null) {
105-
outputMetrics.incrementMkdirsOperationCount();
106-
}
10792
return fileIO.mkdirs(path);
10893
}
10994

11095
@Override
11196
public boolean rename(Path src, Path dst) throws IOException {
112-
if (outputMetrics != null) {
113-
outputMetrics.incrementRenameOperationCount();
114-
}
11597
return fileIO.rename(src, dst);
11698
}
11799
}

paimon-common/src/main/java/org/apache/paimon/fs/PositionOutputStreamIOWrapper.java

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,66 +40,44 @@ public long getPos() throws IOException {
4040

4141
@Override
4242
public void write(int b) throws IOException {
43-
long startTime = System.nanoTime();
4443
try {
4544
out.write(b);
4645
} finally {
47-
long endTime = System.nanoTime();
4846
if (metrics != null) {
49-
metrics.recordWriteEvent(1, (endTime - startTime) / 1_000_000);
47+
metrics.recordWriteEvent(1);
5048
}
5149
}
5250
}
5351

5452
@Override
5553
public void write(byte[] b) throws IOException {
56-
long startTime = System.nanoTime();
5754
try {
5855
out.write(b);
5956
} finally {
60-
long endTime = System.nanoTime();
6157
if (metrics != null) {
62-
metrics.recordWriteEvent(b.length, (endTime - startTime) / 1_000_000);
58+
metrics.recordWriteEvent(b.length);
6359
}
6460
}
6561
}
6662

6763
@Override
6864
public void write(byte[] b, int off, int len) throws IOException {
69-
long startTime = System.nanoTime();
7065
try {
7166
out.write(b, off, len);
7267
} finally {
73-
long endTime = System.nanoTime();
7468
if (metrics != null) {
75-
metrics.recordWriteEvent(len, (endTime - startTime) / 1_000_000);
69+
metrics.recordWriteEvent(len);
7670
}
7771
}
7872
}
7973

8074
@Override
8175
public void flush() throws IOException {
82-
long startTime = System.nanoTime();
83-
try {
84-
out.flush();
85-
} finally {
86-
long endTime = System.nanoTime();
87-
if (metrics != null) {
88-
metrics.recordWriteEvent(0, (endTime - startTime) / 1_000_000);
89-
}
90-
}
76+
out.flush();
9177
}
9278

9379
@Override
9480
public void close() throws IOException {
95-
long startTime = System.nanoTime();
96-
try {
97-
out.close();
98-
} finally {
99-
long endTime = System.nanoTime();
100-
if (metrics != null) {
101-
metrics.recordWriteEvent(0, (endTime - startTime) / 1_000_000);
102-
}
103-
}
81+
out.close();
10482
}
10583
}

paimon-common/src/main/java/org/apache/paimon/fs/SeekableInputStreamIOWrapper.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,11 @@ public long getPos() throws IOException {
4646
@Override
4747
public int read() throws IOException {
4848
int bytesRead = 0;
49-
long startTime = System.nanoTime();
5049
try {
5150
bytesRead = in.read();
5251
} finally {
53-
long endTime = System.nanoTime();
5452
if (metrics != null && bytesRead != -1) {
55-
metrics.recordReadEvent(bytesRead, (endTime - startTime) / 1_000_000);
53+
metrics.recordReadEvent(bytesRead);
5654
}
5755
}
5856
return bytesRead;
@@ -61,28 +59,18 @@ public int read() throws IOException {
6159
@Override
6260
public int read(byte[] b, int off, int len) throws IOException {
6361
int bytesRead = 0;
64-
long startTime = System.nanoTime();
6562
try {
6663
bytesRead = in.read(b, off, len);
6764
} finally {
68-
long endTime = System.nanoTime();
6965
if (metrics != null && bytesRead != -1) {
70-
metrics.recordReadEvent(bytesRead, (endTime - startTime) / 1_000_000);
66+
metrics.recordReadEvent(bytesRead);
7167
}
7268
}
7369
return bytesRead;
7470
}
7571

7672
@Override
7773
public void close() throws IOException {
78-
long startTime = System.nanoTime();
79-
try {
80-
in.close();
81-
} finally {
82-
long endTime = System.nanoTime();
83-
if (metrics != null) {
84-
metrics.recordReadEvent(0, (endTime - startTime) / 1_000_000);
85-
}
86-
}
74+
in.close();
8775
}
8876
}

paimon-common/src/main/java/org/apache/paimon/fs/metrics/InputMetrics.java

Lines changed: 10 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.paimon.metrics.MetricGroup;
2222
import org.apache.paimon.metrics.MetricRegistry;
2323

24-
import java.util.concurrent.ConcurrentLinkedQueue;
2524
import java.util.concurrent.atomic.AtomicLong;
2625

2726
/** Collects and monitors input stream metrics. */
@@ -30,113 +29,32 @@ public class InputMetrics {
3029
public static final String GROUP_NAME = "source";
3130
private final MetricGroup metricGroup;
3231

33-
private final AtomicLong readBytes = new AtomicLong(0);
34-
private final AtomicLong totalReadTime = new AtomicLong(0);
35-
private final AtomicLong getFileStatusOperationCount = new AtomicLong(0);
36-
private final AtomicLong listStatusOperationCount = new AtomicLong(0);
37-
private final AtomicLong existsOperationCount = new AtomicLong(0);
32+
public static final String READ_BYTES = "write.bytes";
33+
public static final String READ_OPERATIONS = "write.operations";
3834

39-
private final ConcurrentLinkedQueue<ReadEvent> recentEvents = new ConcurrentLinkedQueue<>();
40-
private static final long WINDOW_SIZE_MS = 60 * 1000;
35+
private final AtomicLong readBytes = new AtomicLong(0);
36+
private final AtomicLong readOperations = new AtomicLong(0);
4137

4238
public InputMetrics(MetricRegistry registry, String tableName) {
4339
metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
4440
registerMetrics();
4541
}
4642

4743
private void registerMetrics() {
48-
metricGroup.gauge("readBytes", this::getReadBytes);
49-
metricGroup.gauge("totalReadTime", this::getTotalReadTime);
50-
metricGroup.gauge("getFileStatusOperationCount", this::getGetFileStatusOperationCount);
51-
metricGroup.gauge("listStatusOperationCount", this::getListStatusOperationCount);
52-
metricGroup.gauge("existsOperationCount", this::getExistsOperationCount);
53-
metricGroup.gauge("recentMinuteReadThroughput", this::getRecentMinuteReadThroughput);
54-
metricGroup.gauge("recentMinuteReadTime", this::getRecentMinuteReadTime);
55-
}
56-
57-
public void recordReadEvent(long bytes, long time) {
58-
addReadEvent(bytes, time);
59-
updateReadStatistics(bytes, time);
60-
cleanOldEvents();
44+
metricGroup.gauge(READ_BYTES, this::getReadBytes);
45+
metricGroup.gauge(READ_OPERATIONS, this::getReadOperations);
6146
}
6247

63-
private void updateReadStatistics(long bytes, long time) {
48+
public void recordReadEvent(long bytes) {
6449
readBytes.addAndGet(bytes);
65-
totalReadTime.addAndGet(time);
66-
}
67-
68-
private void addReadEvent(long bytes, long time) {
69-
long currentTime = System.currentTimeMillis();
70-
recentEvents.add(new ReadEvent(currentTime, bytes, time));
71-
}
72-
73-
public void incrementGetFileStatusOperationCount() {
74-
getFileStatusOperationCount.incrementAndGet();
75-
}
76-
77-
public void incrementListStatusOperationCount() {
78-
listStatusOperationCount.incrementAndGet();
79-
}
80-
81-
public void incrementExistsOperationCount() {
82-
existsOperationCount.incrementAndGet();
50+
readOperations.incrementAndGet();
8351
}
8452

8553
public long getReadBytes() {
8654
return readBytes.get();
8755
}
8856

89-
public long getTotalReadTime() {
90-
return totalReadTime.get();
91-
}
92-
93-
public long getGetFileStatusOperationCount() {
94-
return getFileStatusOperationCount.get();
95-
}
96-
97-
public long getListStatusOperationCount() {
98-
return listStatusOperationCount.get();
99-
}
100-
101-
public long getExistsOperationCount() {
102-
return existsOperationCount.get();
103-
}
104-
105-
private void cleanOldEvents() {
106-
long currentTime = System.currentTimeMillis();
107-
while (!recentEvents.isEmpty()
108-
&& (currentTime - recentEvents.peek().timestamp) > WINDOW_SIZE_MS) {
109-
recentEvents.poll();
110-
}
111-
}
112-
113-
public long getRecentMinuteReadThroughput() {
114-
cleanOldEvents();
115-
long totalBytes = 0;
116-
for (ReadEvent event : recentEvents) {
117-
totalBytes += event.bytes;
118-
}
119-
return totalBytes;
120-
}
121-
122-
public long getRecentMinuteReadTime() {
123-
cleanOldEvents();
124-
long totalTime = 0;
125-
for (ReadEvent event : recentEvents) {
126-
totalTime += event.time;
127-
}
128-
return totalTime;
129-
}
130-
131-
static class ReadEvent {
132-
long timestamp;
133-
long bytes;
134-
long time;
135-
136-
ReadEvent(long timestamp, long bytes, long time) {
137-
this.timestamp = timestamp;
138-
this.bytes = bytes;
139-
this.time = time;
140-
}
57+
public long getReadOperations() {
58+
return readOperations.get();
14159
}
14260
}

0 commit comments

Comments
 (0)