Skip to content

Commit d023cf2

Browse files
authored
[Bug][Zeta] SeaTunnelClient can not exit with error (#9281)
1 parent ecff2e8 commit d023cf2

File tree

6 files changed

+139
-2
lines changed

6 files changed

+139
-2
lines changed

pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
<maven.compiler.source>${java.version}</maven.compiler.source>
6767
<maven.compiler.target>${java.version}</maven.compiler.target>
6868

69+
<system-rules.version>1.2.1</system-rules.version>
70+
<powermock.version>2.0.9</powermock.version>
6971
<slf4j.version>1.7.36</slf4j.version>
7072
<log4j2.version>2.17.1</log4j2.version>
7173
<log4j2-disruptor.version>3.4.4</log4j2-disruptor.version>
@@ -574,6 +576,24 @@
574576
<version>${mockito.version}</version>
575577
<scope>test</scope>
576578
</dependency>
579+
<dependency>
580+
<groupId>com.github.stefanbirkner</groupId>
581+
<artifactId>system-lambda</artifactId>
582+
<version>${system-rules.version}</version>
583+
<scope>test</scope>
584+
</dependency>
585+
<dependency>
586+
<groupId>org.powermock</groupId>
587+
<artifactId>powermock-module-junit4</artifactId>
588+
<version>${powermock.version}</version>
589+
<scope>test</scope>
590+
</dependency>
591+
<dependency>
592+
<groupId>org.powermock</groupId>
593+
<artifactId>powermock-api-mockito2</artifactId>
594+
<version>${powermock.version}</version>
595+
<scope>test</scope>
596+
</dependency>
577597

578598
<!-- The prometheus simpleclient -->
579599
<dependency>

seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/SeaTunnel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ private static void showConfigError(Throwable throwable) {
5353
String errorMsg = throwable.getMessage();
5454
log.error("Config Error:\n");
5555
log.error("Reason: {} \n", errorMsg);
56+
log.error("Exception StackTrace:{} ", ExceptionUtils.getStackTrace(throwable));
5657
log.error(
5758
"\n===============================================================================\n\n\n");
5859
}

seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
2424
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
2525

26+
import org.apache.commons.lang3.exception.ExceptionUtils;
27+
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
@Slf4j
2631
public class SeaTunnelClient {
2732
public static void main(String[] args) throws CommandException {
2833
ClientCommandArgs clientCommandArgs =
@@ -31,6 +36,11 @@ public static void main(String[] args) throws CommandException {
3136
new ClientCommandArgs(),
3237
EngineType.SEATUNNEL.getStarterShellName(),
3338
true);
34-
SeaTunnel.run(clientCommandArgs.buildCommand());
39+
try {
40+
SeaTunnel.run(clientCommandArgs.buildCommand());
41+
} catch (Error e) {
42+
log.error("Exception StackTrace: {}", ExceptionUtils.getStackTrace(e));
43+
System.exit(1);
44+
}
3545
}
3646
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.core.starter.seatunnel;
19+
20+
import org.apache.seatunnel.core.starter.SeaTunnel;
21+
import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
22+
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
23+
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.Test;
26+
import org.mockito.MockedStatic;
27+
import org.mockito.Mockito;
28+
29+
import lombok.extern.slf4j.Slf4j;
30+
31+
import static com.github.stefanbirkner.systemlambda.SystemLambda.catchSystemExit;
32+
33+
@Slf4j
34+
public class SeaTunnelClientOOMTest {
35+
36+
@Test
37+
public void testHazelcastOOMExitBehavior() throws Exception {
38+
// Prepare command line arguments
39+
String[] args = {"--config", "fake_config.conf"};
40+
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
41+
42+
// Mock CommandLineUtils.parse to return our clientCommandArgs
43+
try (MockedStatic<CommandLineUtils> mockedCommandLineUtils =
44+
Mockito.mockStatic(CommandLineUtils.class)) {
45+
mockedCommandLineUtils
46+
.when(
47+
() ->
48+
CommandLineUtils.parse(
49+
Mockito.any(String[].class),
50+
Mockito.any(ClientCommandArgs.class),
51+
Mockito.anyString(),
52+
Mockito.anyBoolean()))
53+
.thenReturn(clientCommandArgs);
54+
55+
// Mock SeaTunnel.run to throw OutOfMemoryError
56+
try (MockedStatic<SeaTunnel> mockedSeaTunnel = Mockito.mockStatic(SeaTunnel.class)) {
57+
// Simulate Hazelcast thread allocation OOM
58+
OutOfMemoryError oomError =
59+
new OutOfMemoryError("Java heap space during Hazelcast thread allocation");
60+
61+
// Mock run to throw OOM
62+
mockedSeaTunnel.when(() -> SeaTunnel.run(Mockito.any())).thenThrow(oomError);
63+
64+
// Test that System.exit(1) is called
65+
int statusCode =
66+
catchSystemExit(
67+
() -> {
68+
SeaTunnelClient.main(args);
69+
});
70+
71+
// Verify exit code is 1
72+
Assertions.assertEquals(1, statusCode);
73+
}
74+
}
75+
}
76+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"env": {
3+
"parallelism": 1,
4+
"job.mode": "BATCH"
5+
},
6+
"source": [
7+
{
8+
"plugin_name": "FakeSource",
9+
"plugin_output": "fake_oom_test",
10+
"row.num": 100,
11+
"split.num": 5,
12+
"schema": {
13+
"fields": {
14+
"name": "string",
15+
"age": "int"
16+
}
17+
},
18+
"parallelism": 1
19+
}
20+
],
21+
"transform": [
22+
],
23+
"sink": [
24+
{
25+
"plugin_name": "InMemory",
26+
"plugin_input": "fake_oom_test",
27+
"throw_out_of_memory": true
28+
}
29+
]
30+
}

seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,4 @@ sink {
5959
}
6060
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
6161
# please go to https://seatunnel.apache.org/docs/connector-v2/sink
62-
}
62+
}

0 commit comments

Comments
 (0)