Skip to content

[SPARK-52482][SQL][CORE] ZStandard support for file data source reader #51182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

sandip-db
Copy link
Contributor

@sandip-db sandip-db commented Jun 15, 2025

What changes were proposed in this pull request?

The current Hadoop version in Spark hasn’t been compiled with Zstd because of which ZSTD compressed json, csv, txt and xml files cannot be read. Spark has a built-in ZStdCompressionCodec, which is used for compressing shuffled data and other use cases.

This PR adds support for ZStandard decompression in file data source readers using the built-in ZStdCompressionCodec. It also add support for some non-standard extensions like .gzip and .zstd.

The first commit in this PR is a vanilla copy of Hadoop's LineRecordReader. This was needed to add support for non-Hadoop based codec. Reviewers need not review this commit.

Following config have been added to enable/disable changes in this PR:

  • spark.sql.execution.datasources.hadoopLineRecordReader.enabled: Setting this SQLConf to false will result in using Hadoop's LineRecordReader. Otherwise, use the inlined HadoopLineRecordReader by default.
  • spark.io.zStandard.enabled: Enable Spark's ZStdCompressionCodec for file data source reader. Default: true

Why are the changes needed?

Same as above.

Does this PR introduce any user-facing change?

Yes, Adds support for ZSTD decompression in file data source readers.

How was this patch tested?

Added new unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@sandip-db sandip-db changed the title [SPARK-52482][SQL] ZStandard support for file data source reader [SPARK-52482][SQL][CORE] ZStandard support for file data source reader Jun 15, 2025
@pan3793
Copy link
Member

pan3793 commented Jun 16, 2025

Hadoop has built-in org.apache.hadoop.io.compress.ZStandardCodec, but it requires compilation with the native library, I think the right direction is to migrate it to zstd-jni, like other codecs, see HADOOP-17292 (lz4), HADOOP-17125 (snappy), HADOOP-17825 (gzip).

Even if you don't want to touch the Hadoop code, this PR approach looks too overkill, Hadoop provides io.compression.codecs.CompressionCodec to allow implementing custom codecs, implementing a org.apache.spark.xxx.SparkZstdCompressionCodec and configuring io.compression.codecs should work.

Comment on lines 74 to 90
try {
codec.createInputStream(inputStream)
} catch {
case e: Exception =>
createZstdInputStream(file, inputStream).getOrElse(throw e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is subtle, why is this fallback required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createInputStream may fail with an Exception for ZSTD if hadoop is not already compiled with ZSTD support. In that case, we try to use Spark's Zstandard codec if enabled. Added a comment to clarify this.

@sandip-db
Copy link
Contributor Author

sandip-db commented Jun 17, 2025

Even if you don't want to touch the Hadoop code, this PR approach looks too overkill, Hadoop provides io.compression.codecs.CompressionCodec to allow implementing custom codecs, implementing a org.apache.spark.xxx.SparkZstdCompressionCodec and configuring io.compression.codecs should work.

@pan3793 Thanks for your comment. The PR adds less than 50 lines of code to re-use Spark's ZStdCompressionCodec for file data source. The inlining of Hadoop's LineRecordReader would be needed regardless of ZSTD support because of a follow-up change that I am working on, which would allow users to specify compression type using file data source option. There are users who have files with either non-standard extensions or no extensions at all. Hadoop's way of determining codec based on file name extension is forcing them to rename their files.

Implementing CompressionCodec and Decompressor interfaces for adding ZStdCompressionCodec support seems unnecessary at this point. And it will not address the extension issues pointed above. At some point, I expect Spark will upgrade to Hadoop with native ZSTD support compiled and then the code will start to use that automatically instead of Spark's ZStdCompressionCodec.

@sandip-db sandip-db requested a review from jackierwzhang June 17, 2025 05:41
@cloud-fan
Copy link
Contributor

The inlining of Hadoop's LineRecordReader would be needed regardless of ZSTD support because of a follow-up change that I am working on, which would add allow users to specify compression type using file data source option.

@pan3793 does Hadoop's LineRecordReader allow us to specify the compression at the session level without forking the code?

@@ -3686,6 +3687,96 @@ abstract class CSVSuite
parameters = Map("columnName" -> "`v`", "columnType" -> "\"VARIANT\"", "format" -> "CSV")
)
}

private def createTestFiles(dir: File, fileFormatWriter: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put the test in FileBasedDataSourceSuite, so that it's easier to share code and test different formats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this. However, each format has some special cases that requires generating the files slightly differently:

  • TextFileFormat has wholeText option that reads the whole file
  • CSV has header option
  • Json has single and multiline options

@sandip-db
Copy link
Contributor Author

sandip-db commented Jun 18, 2025

does Hadoop's LineRecordReader allow us to specify the compression at the session level without forking the code?

@cloud-fan Its possible to pass different codecs via io.compression.codecs.CompressionCodec Hadoop conf, but Hadoop will still use file name extension to choose codec from that list. Users' files may not have a standard extension to begin with.

@sandip-db sandip-db force-pushed the zstd-file-source-support branch from 2f90c2b to 4b7cf40 Compare June 18, 2025 00:41
@sandip-db sandip-db requested a review from cloud-fan June 18, 2025 04:50
@pan3793
Copy link
Member

pan3793 commented Jun 18, 2025

does Hadoop's LineRecordReader allow us to specify the compression at the session level without forking the code?

@cloud-fan as @sandip-db said, Hadoop's LineRecordReader relies on the filename suffix to choose the decompressor, I think this is a well-known behavior for processing text files, because unlike binary files (e.g. Parquet/ORC) which has metadata in footer to mark the codec used by each page/chunk inside binary file, compression is applied to the whole text file.

how do you define the behavior of "specify the compression at the session level"? always respect session conf and ignore filename suffix? or fallback to use codec suggested by session conf when something goes wrong?

also, please be careful with that Hadoop codec may have different behavior with Spark/Unix tool codec, for example, HADOOP-12990(lz4)

@pan3793
Copy link
Member

pan3793 commented Jun 18, 2025

There are users who have files with either non-standard extensions or no extensions at all.

@sandip-db to handle "non-standard extensions", you just need to register another Hadoop codec, for example

new org.apache.hadoop.io.compress.GzipCodec {
  override def getDefaultExtension: String = "gzip"
}

for "no extensions" compressed text files, I'm not sure if this is a valid use case(see my last comment)

@sandip-db
Copy link
Contributor Author

for "no extensions" compressed text files, I'm not sure if this is a valid use case(see my last comment)

@pan3793 While uncommon, we come across users who have compressed files without any extension. Most recently, an user had to rename a billion gzipped compressed files in S3 because it didn't have any extension.

@sandip-db
Copy link
Contributor Author

also, please be careful with that Hadoop codec may have different behavior with Spark/Unix tool codec, for example, HADOOP-12990(lz4)

Thanks for bringing this to my attention. We are not adding support for ZSTD compression using Spark codec yet. This PR just adds decompression support.

@cloud-fan @pan3793 Do you have any further concern with this PR. Can this be merged?

@pan3793
Copy link
Member

pan3793 commented Jun 19, 2025

@sandip-db TBH, the current approach (control flow is based on try-catch exception) seems too hacky, and I'd like to see more detailed designs of your next steps. (I don't think the below question got answered.)

how do you define the behavior of "specify the compression at the session level"? always respect session conf and ignore filename suffix? or fallback to use codec suggested by session conf when something goes wrong?

This PR just adds decompression support.

You still need to ensure that Spark's zstd codec is compatible with Hadoop's implementation. I have experienced using the AirCompressor LZO codec to decompress the files written via hadoop-lzo may randomly get corrupt content with no errors.

@pan3793
Copy link
Member

pan3793 commented Jun 19, 2025

Instead of relying on the filename suffix or Spark session conf to choose the codec, I wonder if the Magic Number was considered? For example, the file command can correctly recognize the file codec even without standard filename extension

$ file unit-tests.log
unit-tests.log: ASCII text, with very long lines (388)
$ file unit-tests.log.gz
unit-tests.log.gz: gzip compressed data, was "unit-tests.log", last modified: Mon Apr 21 13:03:04 2025, from Unix, original size modulo 2^32 11024393
$ file unit-tests.log.zst
unit-tests.log.zst: Zstandard compressed data (v0.8+), Dictionary ID: None
$ cp unit-tests.log.gz unit-tests.log.gz.foo
$ file unit-tests.log.gz.foo
unit-tests.log.gz.foo: gzip compressed data, was "unit-tests.log", last modified: Mon Apr 21 13:03:04 2025, from Unix, original size modulo 2^32 11024393
$ cp unit-tests.log.zst unit-tests.log.zst.bar
$ file unit-tests.log.zst.bar
unit-tests.log.zst.bar: Zstandard compressed data (v0.8+), Dictionary ID: None

@sandip-db
Copy link
Contributor Author

@pan3793 Thanks for your input.

how do you define the behavior of "specify the compression at the session level"? always respect session conf and ignore filename suffix? or fallback to use codec suggested by session conf when something goes wrong?

User should be able to specify the compression type in their query using data source reader option. For example:
spark.read.option("compression", "gzip").json(path)

If this option is specified, Spark will always use the compression type specified by the user. There are some other alternatives available like first use the file path extension or use the magic number (used by the file utility) to determine the codec type.

Instead of relying on the filename suffix or Spark session conf to choose the codec, I wonder if the Magic Number was considered? For example, the file command can correctly recognize the file codec even without standard filename extension

I agree and I wonder why Hadoop didn't do this in the first place. There will be some performance penalty for examining the magic number and reopening the file input stream with appropriate codec.

Having said that, we can take the discussion of the compression option and the use of magic number to my next PR.

The current PR is about adding ZSTD decompression support. I would appreciate if we can close this first.

The try-catch logic has been used to pick codec in the following order:

  • User provided ZSTD codec via io.compression.codecs.CompressionCodec Hadoop Conf
  • Hadoop's default ZSTD codec
  • Spark's ZStdCompressionCodec.

For compatibility check, I have added some more tests that reads files compressed with ZSTD in ubuntu (version: 1.4.4+dfsg-3ubuntu0.1).

@pan3793
Copy link
Member

pan3793 commented Jun 19, 2025

OK, it's fair enough to fork Hadoop's LineRecordReader and use try-catch logic for codec fallback given your flexible design.

There will be some performance penalty for examining the magic number and reopening the file input stream with appropriate codec.

Having a wrapper Codec to look ahead Magic Number and transfer the InputStream to the concrete Codec should eliminate the re-open cost, anyway, this is about implementation details and can be discussed later.

For compatibility check, I have added some more tests that reads files compressed with ZSTD in ubuntu (version: 1.4.4+dfsg-3ubuntu0.1).

I think you should at least test reading zstd text file written by Hadoop

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants