-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-52482][SQL][CORE] ZStandard support for file data source reader #51182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Hadoop has built-in Even if you don't want to touch the Hadoop code, this PR approach looks too overkill, Hadoop provides |
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/HadoopLineRecordReader.java
Show resolved
Hide resolved
try { | ||
codec.createInputStream(inputStream) | ||
} catch { | ||
case e: Exception => | ||
createZstdInputStream(file, inputStream).getOrElse(throw e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is subtle, why is this fallback required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createInputStream may fail with an Exception for ZSTD if hadoop is not already compiled with ZSTD support. In that case, we try to use Spark's Zstandard codec if enabled. Added a comment to clarify this.
@pan3793 Thanks for your comment. The PR adds less than 50 lines of code to re-use Spark's Implementing |
@pan3793 does Hadoop's |
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
@@ -3686,6 +3687,96 @@ abstract class CSVSuite | |||
parameters = Map("columnName" -> "`v`", "columnType" -> "\"VARIANT\"", "format" -> "CSV") | |||
) | |||
} | |||
|
|||
private def createTestFiles(dir: File, fileFormatWriter: Boolean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we put the test in FileBasedDataSourceSuite
, so that it's easier to share code and test different formats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this. However, each format has some special cases that requires generating the files slightly differently:
- TextFileFormat has wholeText option that reads the whole file
- CSV has header option
- Json has single and multiline options
@cloud-fan Its possible to pass different codecs via |
2f90c2b
to
4b7cf40
Compare
@cloud-fan as @sandip-db said, Hadoop's how do you define the behavior of "specify the compression at the session level"? always respect session conf and ignore filename suffix? or fallback to use codec suggested by session conf when something goes wrong? also, please be careful with that Hadoop codec may have different behavior with Spark/Unix tool codec, for example, HADOOP-12990(lz4) |
@sandip-db to handle "non-standard extensions", you just need to register another Hadoop codec, for example
for "no extensions" compressed text files, I'm not sure if this is a valid use case(see my last comment) |
@pan3793 While uncommon, we come across users who have compressed files without any extension. Most recently, an user had to rename a billion gzipped compressed files in S3 because it didn't have any extension. |
Thanks for bringing this to my attention. We are not adding support for ZSTD compression using Spark codec yet. This PR just adds decompression support. @cloud-fan @pan3793 Do you have any further concern with this PR. Can this be merged? |
@sandip-db TBH, the current approach (control flow is based on try-catch exception) seems too hacky, and I'd like to see more detailed designs of your next steps. (I don't think the below question got answered.)
You still need to ensure that Spark's zstd codec is compatible with Hadoop's implementation. I have experienced using the AirCompressor LZO codec to decompress the files written via hadoop-lzo may randomly get corrupt content with no errors. |
Instead of relying on the filename suffix or Spark session conf to choose the codec, I wonder if the Magic Number was considered? For example, the
|
@pan3793 Thanks for your input.
User should be able to specify the compression type in their query using data source reader option. For example: If this option is specified, Spark will always use the compression type specified by the user. There are some other alternatives available like first use the file path extension or use the magic number (used by the file utility) to determine the codec type.
I agree and I wonder why Hadoop didn't do this in the first place. There will be some performance penalty for examining the magic number and reopening the file input stream with appropriate codec. Having said that, we can take the discussion of the compression option and the use of magic number to my next PR. The current PR is about adding ZSTD decompression support. I would appreciate if we can close this first. The try-catch logic has been used to pick codec in the following order:
For compatibility check, I have added some more tests that reads files compressed with ZSTD in ubuntu (version: 1.4.4+dfsg-3ubuntu0.1). |
OK, it's fair enough to fork Hadoop's
Having a wrapper Codec to look ahead Magic Number and transfer the InputStream to the concrete Codec should eliminate the re-open cost, anyway, this is about implementation details and can be discussed later.
I think you should at least test reading zstd text file written by Hadoop |
What changes were proposed in this pull request?
The current Hadoop version in Spark hasn’t been compiled with Zstd because of which ZSTD compressed json, csv, txt and xml files cannot be read. Spark has a built-in
ZStdCompressionCodec
, which is used for compressing shuffled data and other use cases.This PR adds support for ZStandard decompression in file data source readers using the built-in
ZStdCompressionCodec
. It also add support for some non-standard extensions like.gzip
and.zstd
.The first commit in this PR is a vanilla copy of Hadoop's
LineRecordReader
. This was needed to add support for non-Hadoop based codec. Reviewers need not review this commit.Following config have been added to enable/disable changes in this PR:
spark.sql.execution.datasources.hadoopLineRecordReader.enabled
: Setting this SQLConf to false will result in using Hadoop'sLineRecordReader
. Otherwise, use the inlinedHadoopLineRecordReader
by default.spark.io.zStandard.enabled
: Enable Spark'sZStdCompressionCodec
for file data source reader. Default:true
Why are the changes needed?
Same as above.
Does this PR introduce any user-facing change?
Yes, Adds support for ZSTD decompression in file data source readers.
How was this patch tested?
Added new unit tests
Was this patch authored or co-authored using generative AI tooling?
No