Skip to content

Commit 2839e3b

Browse files
committed
io metrics
1 parent 701cf88 commit 2839e3b

31 files changed

+452
-12
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,12 @@
996996
<td>Boolean</td>
997997
<td>Whether to read the changes from overwrite in streaming mode. Cannot be set to true when changelog producer is full-compaction or lookup because it will read duplicated changes.</td>
998998
</tr>
999+
<tr>
1000+
<td><h5>metrics.file-io.enabled</h5></td>
1001+
<td style="word-wrap: break-word;">false</td>
1002+
<td>Boolean</td>
1003+
<td>Whether to enable MetricsFileIO metrics statistics.</td>
1004+
</tr>
9991005
<tr>
10001006
<td><h5>streaming.read.snapshot.delay</h5></td>
10011007
<td style="word-wrap: break-word;">(none)</td>

paimon-common/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ under the License.
198198
<scope>test</scope>
199199
</dependency>
200200

201+
<dependency>
202+
<groupId>org.apache.commons</groupId>
203+
<artifactId>commons-math3</artifactId>
204+
<version>3.6.1</version>
205+
</dependency>
206+
201207
<!-- for the HDFS mini cluster test suite -->
202208
<dependency>
203209
<groupId>org.apache.hadoop</groupId>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,12 @@ public class CoreOptions implements Serializable {
796796
+ "Note: Scale-up this parameter will increase memory usage while scanning manifest files. "
797797
+ "We can consider downsize it when we encounter an out of memory exception while scanning");
798798

799+
public static final ConfigOption<Boolean> METRICS_FILE_IO_ENABLED =
800+
key("metrics.file-io.enabled")
801+
.booleanType()
802+
.defaultValue(false)
803+
.withDescription("Whether to enable MetricsFileIO metrics statistics.");
804+
799805
public static final ConfigOption<Duration> STREAMING_READ_SNAPSHOT_DELAY =
800806
key("streaming.read.snapshot.delay")
801807
.durationType()
@@ -2372,6 +2378,10 @@ public Integer scanManifestParallelism() {
23722378
return options.get(SCAN_MANIFEST_PARALLELISM);
23732379
}
23742380

2381+
public boolean isMetricsFileIOEnabled() {
2382+
return options.get(METRICS_FILE_IO_ENABLED);
2383+
}
2384+
23752385
public Duration streamingReadDelay() {
23762386
return options.get(STREAMING_READ_SNAPSHOT_DELAY);
23772387
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.fs;
20+
21+
import org.apache.paimon.catalog.CatalogContext;
22+
import org.apache.paimon.fs.metrics.InputMetrics;
23+
import org.apache.paimon.fs.metrics.OutputMetrics;
24+
25+
import java.io.IOException;
26+
27+
/**
28+
* FileIOWrapper is a wrapper class for the {@link FileIO}.
29+
*
30+
* <p>It allows users to monitor and track file I/O operations (e.g., read, write, delete, rename).
31+
*/
32+
public class MetricsFileIO implements FileIO {
33+
34+
protected final FileIO fileIO;
35+
protected InputMetrics inputMetrics = null;
36+
protected OutputMetrics outputMetrics = null;
37+
38+
public MetricsFileIO(FileIO fileIO) {
39+
this.fileIO = fileIO;
40+
}
41+
42+
public MetricsFileIO withMetrics(InputMetrics inputMetrics, OutputMetrics outputMetrics) {
43+
this.inputMetrics = inputMetrics;
44+
this.outputMetrics = outputMetrics;
45+
return this;
46+
}
47+
48+
@Override
49+
public boolean isObjectStore() {
50+
return fileIO.isObjectStore();
51+
}
52+
53+
@Override
54+
public void configure(CatalogContext context) {
55+
fileIO.configure(context);
56+
}
57+
58+
@Override
59+
public SeekableInputStream newInputStream(Path path) throws IOException {
60+
SeekableInputStream inputStream = fileIO.newInputStream(path);
61+
return new SeekableInputStreamIOWrapper(inputStream, this.inputMetrics);
62+
}
63+
64+
@Override
65+
public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException {
66+
PositionOutputStream outputStream = fileIO.newOutputStream(path, overwrite);
67+
return new PositionOutputStreamIOWrapper(outputStream, outputMetrics);
68+
}
69+
70+
@Override
71+
public FileStatus getFileStatus(Path path) throws IOException {
72+
return fileIO.getFileStatus(path);
73+
}
74+
75+
@Override
76+
public FileStatus[] listStatus(Path path) throws IOException {
77+
return fileIO.listStatus(path);
78+
}
79+
80+
@Override
81+
public boolean exists(Path path) throws IOException {
82+
return fileIO.exists(path);
83+
}
84+
85+
@Override
86+
public boolean delete(Path path, boolean recursive) throws IOException {
87+
return fileIO.delete(path, recursive);
88+
}
89+
90+
@Override
91+
public boolean mkdirs(Path path) throws IOException {
92+
return fileIO.mkdirs(path);
93+
}
94+
95+
@Override
96+
public boolean rename(Path src, Path dst) throws IOException {
97+
return fileIO.rename(src, dst);
98+
}
99+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.fs;
20+
21+
import org.apache.paimon.fs.metrics.OutputMetrics;
22+
23+
import java.io.IOException;
24+
25+
/** Wrap a {@link PositionOutputStream}. */
26+
public class PositionOutputStreamIOWrapper extends PositionOutputStream {
27+
28+
protected final PositionOutputStream out;
29+
private OutputMetrics metrics;
30+
31+
public PositionOutputStreamIOWrapper(PositionOutputStream out, OutputMetrics outputMetrics) {
32+
this.out = out;
33+
this.metrics = outputMetrics;
34+
}
35+
36+
@Override
37+
public long getPos() throws IOException {
38+
return out.getPos();
39+
}
40+
41+
@Override
42+
public void write(int b) throws IOException {
43+
try {
44+
out.write(b);
45+
} finally {
46+
if (metrics != null) {
47+
metrics.recordWriteEvent(1);
48+
}
49+
}
50+
}
51+
52+
@Override
53+
public void write(byte[] b) throws IOException {
54+
try {
55+
out.write(b);
56+
} finally {
57+
if (metrics != null) {
58+
metrics.recordWriteEvent(b.length);
59+
}
60+
}
61+
}
62+
63+
@Override
64+
public void write(byte[] b, int off, int len) throws IOException {
65+
try {
66+
out.write(b, off, len);
67+
} finally {
68+
if (metrics != null) {
69+
metrics.recordWriteEvent(len);
70+
}
71+
}
72+
}
73+
74+
@Override
75+
public void flush() throws IOException {
76+
out.flush();
77+
}
78+
79+
@Override
80+
public void close() throws IOException {
81+
out.close();
82+
}
83+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.fs;
20+
21+
import org.apache.paimon.fs.metrics.InputMetrics;
22+
23+
import java.io.IOException;
24+
25+
/** Wrap a {@link SeekableInputStream}. */
26+
public class SeekableInputStreamIOWrapper extends SeekableInputStream {
27+
28+
protected final SeekableInputStream in;
29+
private InputMetrics metrics;
30+
31+
public SeekableInputStreamIOWrapper(SeekableInputStream in, InputMetrics metrics) {
32+
this.in = in;
33+
this.metrics = metrics;
34+
}
35+
36+
@Override
37+
public void seek(long desired) throws IOException {
38+
in.seek(desired);
39+
}
40+
41+
@Override
42+
public long getPos() throws IOException {
43+
return in.getPos();
44+
}
45+
46+
@Override
47+
public int read() throws IOException {
48+
int bytesRead = 0;
49+
try {
50+
bytesRead = in.read();
51+
} finally {
52+
if (metrics != null && bytesRead != -1) {
53+
metrics.recordReadEvent(bytesRead);
54+
}
55+
}
56+
return bytesRead;
57+
}
58+
59+
@Override
60+
public int read(byte[] b, int off, int len) throws IOException {
61+
int bytesRead = 0;
62+
try {
63+
bytesRead = in.read(b, off, len);
64+
} finally {
65+
if (metrics != null && bytesRead != -1) {
66+
metrics.recordReadEvent(bytesRead);
67+
}
68+
}
69+
return bytesRead;
70+
}
71+
72+
@Override
73+
public void close() throws IOException {
74+
in.close();
75+
}
76+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.fs.metrics;
20+
21+
import org.apache.paimon.metrics.MetricGroup;
22+
import org.apache.paimon.metrics.MetricRegistry;
23+
24+
import java.util.concurrent.atomic.AtomicLong;
25+
26+
/** Collects and monitors input stream metrics. */
27+
public class InputMetrics {
28+
29+
public static final String GROUP_NAME = "source";
30+
private final MetricGroup metricGroup;
31+
32+
public static final String READ_BYTES = "write.bytes";
33+
public static final String READ_OPERATIONS = "write.operations";
34+
35+
private final AtomicLong readBytes = new AtomicLong(0);
36+
private final AtomicLong readOperations = new AtomicLong(0);
37+
38+
public InputMetrics(MetricRegistry registry, String tableName) {
39+
metricGroup = registry.createTableMetricGroup(GROUP_NAME, tableName);
40+
registerMetrics();
41+
}
42+
43+
private void registerMetrics() {
44+
metricGroup.gauge(READ_BYTES, this::getReadBytes);
45+
metricGroup.gauge(READ_OPERATIONS, this::getReadOperations);
46+
}
47+
48+
public void recordReadEvent(long bytes) {
49+
readBytes.addAndGet(bytes);
50+
readOperations.incrementAndGet();
51+
}
52+
53+
public long getReadBytes() {
54+
return readBytes.get();
55+
}
56+
57+
public long getReadOperations() {
58+
return readOperations.get();
59+
}
60+
}

0 commit comments

Comments
 (0)