-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Introduce a new conf for vacuum parallel listing #886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Introduce a new conf for vacuum parallel listing #886
Conversation
[SC-24892] Add typesafe bintray repo for sbt-mima-plugin
update fork
catch up to master
update master
update with master
update fork branch
update fork
update fork
update fork
fork update
update fork
update fork
update fork
update fork
…ol file listing for the vacuum command
.doc("Sets the number of partitions to use for file listing") | ||
.intConf | ||
.checkValue(_ > 0, "fileListing.parallelism must be positive") | ||
.createWithDefault(200) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we set a default here ? or do you think defaulting to spark.sql.shuffle.partitions is more ideal ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I even think that defaulting to spark.default.parallelism is more ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rahulsmahadev @vkorukanti any comments on this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To minimize the surprises of this change, can we use the following logic to decide the parallelism?
- If
spark.databricks.delta.vacuum.fileListing.parallelism
is set, use it. Otherwise, - If
spark.sql.sources.parallelPartitionDiscovery.parallelism
is set, use it. Otherwise, - Use 200.
Then if someone has set spark.sql.sources.parallelPartitionDiscovery.parallelism
today, they can upgrade to the new version without any change.
core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala
Outdated
Show resolved
Hide resolved
… parallel listing.
val fileListingParallelism = | ||
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_FILE_LISTING_PARALLELISM) | ||
.getOrElse( | ||
Option(spark.sessionState.conf.parallelPartitionDiscoveryParallelism).getOrElse(200) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add unit tests for this config behavior? spark.sessionState.conf.parallelPartitionDiscoveryParallelism
will never return null if I recall correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sessionState.conf.parallelPartitionDiscoveryParallelism
is a spark conf. Not sure how can I check if it is set or not. I can compare the value to its default one 10000. but if the default changes in spark, we will use always the default instead of 200.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. This is a good point. I found the following code should work:
try {
spark.getConfString(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM.key)
Some(spark.sessionState.conf.parallelPartitionDiscoveryParallelism)
} catch {
case _: NoSuchElementException => None
}
@@ -253,6 +253,13 @@ trait DeltaSQLConfBase { | |||
.checkValue(_ > 0, "parallelDelete.parallelism must be positive") | |||
.createOptional | |||
|
|||
val DELTA_VACUUM_FILE_LISTING_PARALLELISM = | |||
buildConf("vacuum.fileListing.parallelism") | |||
.doc("Sets the number of partitions to use for file listing") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sets the number of parallelism to use for file listing recursively during a vacuum command. If not set, defaults to 'spark.sql.sources.parallelPartitionDiscovery.parallelism'. Set the number to prevent file listing from generating too many tasks.
@JassAbidi Could you take a look at my comments when you have time? Thanks! |
Currently, vacuum uses
spark.sql.sources.parallelPartitionDiscovery.parallelism
to control the parallelism for files and directories listing for vacuum and the default value is 10000.This PR introduces a new Delta SQL conf
DELTA_VACUUM_FILE_LISTING_PARALLELISM
to control the file listing parallelism for vacuum command. The default value for this conf is 200.Closes #859