Skip to content

Commit 2f90c2b

Browse files
committed
remove SparkConf
1 parent c3f9264 commit 2f90c2b

File tree

3 files changed

+17
-59
lines changed

3 files changed

+17
-59
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2846,16 +2846,4 @@ package object config {
28462846
.checkValues(Set("connect", "classic"))
28472847
.createWithDefault(
28482848
if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic")
2849-
2850-
private[spark] val FILE_DATA_SOURCE_ZSTANDARD_ENABLED =
2851-
ConfigBuilder("spark.io.zStandard.enabled")
2852-
.internal()
2853-
.doc("Hadoop library used in Spark has not been compiled with ZSTD support. " +
2854-
"This conf enables the use of ZStandard codec available within SparkCompressionCodec " +
2855-
"for file data sources. This conf is not a SQLConf because SQLConf is not accessible " +
2856-
"from WholeTextFileRecordReader. This is a temporary workaround until Spark's Hadoop " +
2857-
"library has support for ZSTD.")
2858-
.version("4.1.0")
2859-
.booleanConf
2860-
.createWithDefault(true)
28612849
}

core/src/main/scala/org/apache/spark/io/HadoopCodecStreams.scala

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path
2727
import org.apache.hadoop.io.compress._
2828

2929
import org.apache.spark.{SparkConf, SparkEnv}
30-
import org.apache.spark.internal.config
3130
import org.apache.spark.io.{CompressionCodec => SparkCompressionCodec}
3231

3332
/**
@@ -37,9 +36,6 @@ import org.apache.spark.io.{CompressionCodec => SparkCompressionCodec}
3736
* non-standard file extensions like `.zstd` and `.gzip` for Zstandard and Gzip codecs.
3837
*/
3938
object HadoopCodecStreams {
40-
private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
41-
private lazy val isSparkZstdCodecEnabled =
42-
sparkConf.get(config.FILE_DATA_SOURCE_ZSTANDARD_ENABLED)
4339
private val ZSTD_EXTENSIONS = Seq(".zstd", ".zst")
4440

4541
// get codec based on file name extension
@@ -62,16 +58,10 @@ object HadoopCodecStreams {
6258
def createZstdInputStream(
6359
file: Path,
6460
inputStream: InputStream): Option[InputStream] = {
61+
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
6562
val fileName = file.getName.toLowerCase(Locale.ROOT)
6663

67-
// FOR DEBUGGING GITHUB ACTION FAILURE ONLY -- START
68-
assert(ZSTD_EXTENSIONS.exists(fileName.endsWith),
69-
s"File $fileName does not have a recognized Zstandard extension:" +
70-
s"${ZSTD_EXTENSIONS.mkString(", ")}")
71-
assert(isSparkZstdCodecEnabled, "Spark Zstandard codec is not enabled.")
72-
// FOR DEBUGGING GITHUB ACTION FAILURE ONLY -- END
73-
74-
val isOpt = if (ZSTD_EXTENSIONS.exists(fileName.endsWith) && isSparkZstdCodecEnabled) {
64+
val isOpt = if (ZSTD_EXTENSIONS.exists(fileName.endsWith)) {
7565
Some(
7666
SparkCompressionCodec
7767
.createCodec(sparkConf, SparkCompressionCodec.ZSTD)
@@ -80,8 +70,6 @@ object HadoopCodecStreams {
8070
} else {
8171
None
8272
}
83-
assert(isOpt.isDefined,
84-
s"Failed to create Zstandard input stream for file: $fileName")
8573
isOpt
8674
}
8775

@@ -98,7 +86,7 @@ object HadoopCodecStreams {
9886
} catch {
9987
case e: RuntimeException =>
10088
// createInputStream may fail for ZSTD if hadoop is not already compiled with ZSTD
101-
// support. In that case, we try to use Spark's Zstandard codec if enabled.
89+
// support. In that case, we try to use Spark's Zstandard codec.
10290
createZstdInputStream(file, inputStream).getOrElse(throw e)
10391
}
10492
}.getOrElse(inputStream)

core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.io.Text
2626
import org.apache.hadoop.io.compress.{CompressionCodecFactory, GzipCodec}
2727

28-
import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
29-
import org.apache.spark.internal.config
28+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
3029
import org.apache.spark.io.ZStdCompressionCodec
3130

3231
/**
@@ -112,43 +111,26 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite {
112111
createNativeFile(dir, filename, contents, compressionType)
113112
}
114113

115-
if (!sc.conf.get(config.FILE_DATA_SOURCE_ZSTANDARD_ENABLED) &&
116-
(compressionType == CompressionType.ZSTD || compressionType == CompressionType.ZST)) {
117-
val e = intercept[SparkException] {
118-
sc.wholeTextFiles(dir.toString, 3).collect()
119-
}
120-
assert(e.getCause.isInstanceOf[RuntimeException])
121-
assert(e.getCause.getMessage === "native zStandard library not available: " +
122-
"this version of libhadoop was built without zstd support.")
123-
} else {
124-
val res = sc.wholeTextFiles(dir.toString, 3).collect()
125-
126-
assert(res.length === WholeTextFileRecordReaderSuite.fileNames.length,
127-
"Number of files read out does not fit with the actual value.")
128-
129-
for ((filename, contents) <- res) {
130-
val shortName = compressionType match {
131-
case CompressionType.NONE => filename.split('/').last
132-
case _ => filename.split('/').last.split('.').head
133-
}
134-
assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
135-
s"Missing file name $filename.")
136-
assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
137-
s"file $filename contents can not match.")
114+
val res = sc.wholeTextFiles(dir.toString, 3).collect()
115+
116+
assert(res.length === WholeTextFileRecordReaderSuite.fileNames.length,
117+
"Number of files read out does not fit with the actual value.")
118+
119+
for ((filename, contents) <- res) {
120+
val shortName = compressionType match {
121+
case CompressionType.NONE => filename.split('/').last
122+
case _ => filename.split('/').last.split('.').head
138123
}
124+
assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
125+
s"Missing file name $filename.")
126+
assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
127+
s"file $filename contents can not match.")
139128
}
140129
}
141130
}
142131
}
143132
}
144133

145-
class WholeTextFileRecordReaderZStandardDisabledSuite extends WholeTextFileRecordReaderSuite {
146-
147-
override def getSparkConf(): SparkConf = {
148-
super.getSparkConf().set(config.FILE_DATA_SOURCE_ZSTANDARD_ENABLED, false)
149-
}
150-
}
151-
152134
/**
153135
* Files to be tested are defined here.
154136
*/

0 commit comments

Comments
 (0)