Skip to content

Commit e8d6f54

Browse files
pan3793yaooqinn
authored andcommitted
[SPARK-52426][CORE] Support redirecting stdout/stderr to logging system
### What changes were proposed in this pull request? Though directly printing messages to stdout/stderr is discouraged, but as a framework, Spark does not forbid users from doing that. This PR adds a RedirectConsolePlugin to allow Spark to redirect stdout/stderr to the logging system (Log4J2 via Slf4J API). Apache Flink also has the same capability, see [FLINK-31234](https://issues.apache.org/jira/browse/FLINK-31234) ### Why are the changes needed? This is especially useful for integrating with external logging services, for example, use Kafka Log4J appender to send logs to Kafka, then drain to ElasticSearch. ### Does this PR introduce _any_ user-facing change? Yes, new feature. ### How was this patch tested? I don't write UT as it will affect other cases run in the same JVM, I test it manually. ``` $ cp conf/log4j2.properties.template conf/log4j2.properties $ cat >> conf/log4j2.properties <<EOF logger.stdout.name = stdout logger.stdout.level = info EOF $ bin/spark-shell --conf spark.plugins=org.apache.spark.deploy.RedirectConsolePlugin WARNING: Using incubator modules: jdk.incubator.vector Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.1.0-SNAPSHOT /_/ Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.13) Type in expressions to have them evaluated. Type :help for more information. 25/06/13 17:28:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://10.242.159.140:4040 Spark context available as 'sc' (master = local[*], app id = local-1749806903960). Spark session available as 'spark'. scala> System.out.println("driver stdout") | System.err.println("driver stderr") | sc.parallelize(Array(1, 2, 3)).foreach { x => | System.out.println(s"executor stdout: $x") | System.err.println(s"executor stderr: $x") | } warning: 1 deprecation (since 2.13.0); for details, enable `:setting -deprecation` or `:replay -deprecation` 25/06/13 17:28:50 INFO stdout: driver stdout 25/06/13 17:28:50 ERROR stderr: driver stderr 25/06/13 17:28:50 INFO stdout: executor stdout: 2 25/06/13 17:28:50 ERROR stderr: executor stderr: 2 25/06/13 17:28:50 INFO stdout: executor stdout: 1 25/06/13 17:28:50 ERROR stderr: executor stderr: 1 25/06/13 17:28:50 INFO stdout: executor stdout: 3 25/06/13 17:28:50 ERROR stderr: executor stderr: 3 scala> ``` Console progress bar (without console redirection) ``` $ bin/spark-shell WARNING: Using incubator modules: jdk.incubator.vector Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.1.0-SNAPSHOT /_/ Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.13) Type in expressions to have them evaluated. Type :help for more information. 25/06/13 19:49:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://10.242.159.140:4040 Spark context available as 'sc' (master = local[*], app id = local-1749815345805). Spark session available as 'spark'. scala> import org.apache.spark._ | | System.out.println("driver stdout") | System.err.println("driver stderr") | sc.parallelize(Array(1, 2, 3)).foreach { x => | Thread.sleep(TaskContext.getPartitionId() * 10000) | System.out.println(s"executor stdout: $x") | System.err.println(s"executor stderr: $x") | } warning: 1 deprecation (since 2.13.0); for details, enable `:setting -deprecation` or `:replay -deprecation` driver stdout driver stderr executor stdout: 1===============================> (7 + 3) / 10] executor stderr: 1 executor stdout: 2=====================================> (8 + 2) / 10] executor stderr: 2 executor stdout: 3===========================================> (9 + 1) / 10] executor stderr: 3 import org.apache.spark._ scala> ``` Console progress bar (with console redirection) ``` $ bin/spark-shell --conf spark.plugins=org.apache.spark.deploy.RedirectConsolePlugin WARNING: Using incubator modules: jdk.incubator.vector Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.1.0-SNAPSHOT /_/ Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.13) Type in expressions to have them evaluated. Type :help for more information. 25/06/13 19:54:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 25/06/13 19:54:14 WARN DriverRedirectConsolePlugin: Redirect driver's stderr to logging system may affect console progress bar, consider disabling either spark.driver.log.redirectConsole.enabled or spark.ui.showConsoleProgress. Spark context Web UI available at http://10.242.159.140:4040 Spark context available as 'sc' (master = local[*], app id = local-1749815654458). Spark session available as 'spark'. scala> import org.apache.spark._ | | System.out.println("driver stdout") | System.err.println("driver stderr") | sc.parallelize(Array(1, 2, 3)).foreach { x => | Thread.sleep(TaskContext.getPartitionId() * 10000) | System.out.println(s"executor stdout: $x") | System.err.println(s"executor stderr: $x") | } warning: 1 deprecation (since 2.13.0); for details, enable `:setting -deprecation` or `:replay -deprecation` 25/06/13 19:54:22 INFO stdout: driver stdout 25/06/13 19:54:22 ERROR stderr: driver stderr 25/06/13 19:54:52 INFO stdout: executor stdout: 1> (7 + 3) / 10] 25/06/13 19:54:52 ERROR stderr: executor stderr: 1 25/06/13 19:55:22 INFO stdout: executor stdout: 2======> (8 + 2) / 10] 25/06/13 19:55:22 ERROR stderr: executor stderr: 2 25/06/13 19:55:52 INFO stdout: executor stdout: 3============> (9 + 1) / 10] 25/06/13 19:55:52 ERROR stderr: executor stderr: 3 import org.apache.spark._ scala> ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51130 from pan3793/SPARK-52426. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Kent Yao <[email protected]>
1 parent be04876 commit e8d6f54

File tree

3 files changed

+167
-2
lines changed

3 files changed

+167
-2
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import java.io.{ByteArrayOutputStream, PrintStream}
21+
import java.util.{Collections, Map => JMap}
22+
23+
import org.apache.spark.SparkContext
24+
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
25+
import org.apache.spark.internal.{Logging, SparkLoggerFactory}
26+
import org.apache.spark.internal.config._
27+
28+
/**
29+
* A built-in plugin to allow redirecting stdout/stderr to logging system (SLF4J).
30+
*/
31+
class RedirectConsolePlugin extends SparkPlugin {
32+
override def driverPlugin(): DriverPlugin = new DriverRedirectConsolePlugin()
33+
34+
override def executorPlugin(): ExecutorPlugin = new ExecRedirectConsolePlugin()
35+
}
36+
37+
object RedirectConsolePlugin {
38+
39+
def redirectStdoutToLog(): Unit = {
40+
val stdoutLogger = SparkLoggerFactory.getLogger("stdout")
41+
System.setOut(new LoggingPrintStream(stdoutLogger.info))
42+
}
43+
44+
def redirectStderrToLog(): Unit = {
45+
val stderrLogger = SparkLoggerFactory.getLogger("stderr")
46+
System.setErr(new LoggingPrintStream(stderrLogger.error))
47+
}
48+
}
49+
50+
class DriverRedirectConsolePlugin extends DriverPlugin with Logging {
51+
52+
override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
53+
val outputs = sc.conf.get(DRIVER_REDIRECT_CONSOLE_OUTPUTS)
54+
if (outputs.contains("stdout")) {
55+
logInfo("Redirect driver's stdout to logging system.")
56+
RedirectConsolePlugin.redirectStdoutToLog()
57+
}
58+
if (outputs.contains("stderr")) {
59+
logInfo("Redirect driver's stderr to logging system.")
60+
RedirectConsolePlugin.redirectStderrToLog()
61+
}
62+
Collections.emptyMap
63+
}
64+
}
65+
66+
class ExecRedirectConsolePlugin extends ExecutorPlugin with Logging {
67+
68+
override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
69+
val outputs = ctx.conf.get(EXEC_REDIRECT_CONSOLE_OUTPUTS)
70+
if (outputs.contains("stdout")) {
71+
logInfo("Redirect executor's stdout to logging system.")
72+
RedirectConsolePlugin.redirectStdoutToLog()
73+
}
74+
if (outputs.contains("stderr")) {
75+
logInfo("Redirect executor's stderr to logging system.")
76+
RedirectConsolePlugin.redirectStderrToLog()
77+
}
78+
}
79+
}
80+
81+
private[spark] class LoggingPrintStream(redirect: String => Unit)
82+
extends PrintStream(new LineBuffer(4 * 1024 * 1024)) {
83+
84+
override def write(b: Int): Unit = {
85+
super.write(b)
86+
tryLogCurrentLine()
87+
}
88+
89+
override def write(buf: Array[Byte], off: Int, len: Int): Unit = {
90+
super.write(buf, off, len)
91+
tryLogCurrentLine()
92+
}
93+
94+
private def tryLogCurrentLine(): Unit = this.synchronized {
95+
out.asInstanceOf[LineBuffer].tryGenerateContext.foreach { logContext =>
96+
redirect(logContext)
97+
}
98+
}
99+
}
100+
101+
/**
102+
* Cache bytes before line ending. When current line is ended or the bytes size reaches the
103+
* threshold, it can generate the line.
104+
*/
105+
private[spark] object LineBuffer {
106+
private val LF_BYTES = System.lineSeparator.getBytes
107+
private val LF_LENGTH = LF_BYTES.length
108+
}
109+
110+
private[spark] class LineBuffer(lineMaxBytes: Long) extends ByteArrayOutputStream {
111+
112+
import LineBuffer._
113+
114+
def tryGenerateContext: Option[String] =
115+
if (isLineEnded) {
116+
try Some(new String(buf, 0, count - LF_LENGTH)) finally reset()
117+
} else if (count >= lineMaxBytes) {
118+
try Some(new String(buf, 0, count)) finally reset()
119+
} else {
120+
None
121+
}
122+
123+
private def isLineEnded: Boolean = {
124+
if (count < LF_LENGTH) return false
125+
// fast return in UNIX-like OS when LF is single char '\n'
126+
if (LF_LENGTH == 1) return LF_BYTES(0) == buf(count - 1)
127+
128+
var i = 0
129+
do {
130+
if (LF_BYTES(i) != buf(count - LF_LENGTH + i)) {
131+
return false
132+
}
133+
i = i + 1
134+
} while (i < LF_LENGTH)
135+
true
136+
}
137+
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2846,4 +2846,30 @@ package object config {
28462846
.checkValues(Set("connect", "classic"))
28472847
.createWithDefault(
28482848
if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic")
2849+
2850+
private[spark] val DRIVER_REDIRECT_CONSOLE_OUTPUTS =
2851+
ConfigBuilder("spark.driver.log.redirectConsoleOutputs")
2852+
.doc("Comma-separated list of the console output kind for driver that needs to redirect " +
2853+
"to logging system. Supported values are `stdout`, `stderr`. It only takes affect when " +
2854+
s"`${PLUGINS.key}` is configured with `org.apache.spark.deploy.RedirectConsolePlugin`.")
2855+
.version("4.1.0")
2856+
.stringConf
2857+
.transform(_.toLowerCase(Locale.ROOT))
2858+
.toSequence
2859+
.checkValue(v => v.forall(Set("stdout", "stderr").contains),
2860+
"The value only can be one or more of 'stdout, stderr'.")
2861+
.createWithDefault(Seq("stdout", "stderr"))
2862+
2863+
private[spark] val EXEC_REDIRECT_CONSOLE_OUTPUTS =
2864+
ConfigBuilder("spark.executor.log.redirectConsoleOutputs")
2865+
.doc("Comma-separated list of the console output kind for executor that needs to redirect " +
2866+
"to logging system. Supported values are `stdout`, `stderr`. It only takes affect when " +
2867+
s"`${PLUGINS.key}` is configured with `org.apache.spark.deploy.RedirectConsolePlugin`.")
2868+
.version("4.1.0")
2869+
.stringConf
2870+
.transform(_.toLowerCase(Locale.ROOT))
2871+
.toSequence
2872+
.checkValue(v => v.forall(Set("stdout", "stderr").contains),
2873+
"The value only can be one or more of 'stdout, stderr'.")
2874+
.createWithDefault(Seq("stdout", "stderr"))
28492875
}

core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
3838
private val updatePeriodMSec = sc.conf.get(UI_CONSOLE_PROGRESS_UPDATE_INTERVAL)
3939
// Delay to show up a progress bar, in milliseconds
4040
private val firstDelayMSec = 500L
41+
// Get the stderr (which is console for spark-shell) before installing RedirectConsolePlugin
42+
private val console = System.err
4143

4244
// The width of terminal
4345
private val TerminalWidth = sys.env.getOrElse("COLUMNS", "80").toInt
@@ -92,7 +94,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
9294
// only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
9395
// after idle some time)
9496
if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
95-
System.err.print(s"$CR$bar$CR")
97+
console.print(s"$CR$bar$CR")
9698
lastUpdateTime = now
9799
}
98100
lastProgressBar = bar
@@ -103,7 +105,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
103105
*/
104106
private def clear(): Unit = {
105107
if (!lastProgressBar.isEmpty) {
106-
System.err.printf(s"$CR${" " * TerminalWidth}$CR")
108+
console.printf(s"$CR${" " * TerminalWidth}$CR")
107109
lastProgressBar = ""
108110
}
109111
}

0 commit comments

Comments
 (0)