Skip to content

Introduce a new conf for vacuum parallel listing #886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from

Conversation

JassAbidi
Copy link
Contributor

@JassAbidi JassAbidi commented Jan 9, 2022

Currently, vacuum uses spark.sql.sources.parallelPartitionDiscovery.parallelism to control the parallelism for files and directories listing for vacuum and the default value is 10000.
This PR introduces a new Delta SQL conf DELTA_VACUUM_FILE_LISTING_PARALLELISM to control the file listing parallelism for vacuum command. The default value for this conf is 200.

Closes #859

.doc("Sets the number of partitions to use for file listing")
.intConf
.checkValue(_ > 0, "fileListing.parallelism must be positive")
.createWithDefault(200)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we set a default here ? or do you think defaulting to spark.sql.shuffle.partitions is more ideal ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I even think that defaulting to spark.default.parallelism is more ideal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahulsmahadev @vkorukanti any comments on this PR?

@JassAbidi JassAbidi requested a review from rahulsmahadev March 8, 2022 07:51
Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To minimize the surprises of this change, can we use the following logic to decide the parallelism?

  • If spark.databricks.delta.vacuum.fileListing.parallelism is set, use it. Otherwise,
  • If spark.sql.sources.parallelPartitionDiscovery.parallelism is set, use it. Otherwise,
  • Use 200.

Then if someone has set spark.sql.sources.parallelPartitionDiscovery.parallelism today, they can upgrade to the new version without any change.

@JassAbidi JassAbidi requested a review from zsxwing April 3, 2022 10:29
Comment on lines +135 to +139
val fileListingParallelism =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_FILE_LISTING_PARALLELISM)
.getOrElse(
Option(spark.sessionState.conf.parallelPartitionDiscoveryParallelism).getOrElse(200)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add unit tests for this config behavior? spark.sessionState.conf.parallelPartitionDiscoveryParallelism will never return null if I recall correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sessionState.conf.parallelPartitionDiscoveryParallelism is a spark conf. Not sure how can I check if it is set or not. I can compare the value to its default one 10000. but if the default changes in spark, we will use always the default instead of 200.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. This is a good point. I found the following code should work:

try {
  spark.getConfString(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM.key)
  Some(spark.sessionState.conf.parallelPartitionDiscoveryParallelism)
} catch {
  case _: NoSuchElementException => None
}

@@ -253,6 +253,13 @@ trait DeltaSQLConfBase {
.checkValue(_ > 0, "parallelDelete.parallelism must be positive")
.createOptional

val DELTA_VACUUM_FILE_LISTING_PARALLELISM =
buildConf("vacuum.fileListing.parallelism")
.doc("Sets the number of partitions to use for file listing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sets the number of parallelism to use for file listing recursively during a vacuum command. If not set, defaults to 'spark.sql.sources.parallelPartitionDiscovery.parallelism'. Set the number to prevent file listing from generating too many tasks.

@zsxwing
Copy link
Member

zsxwing commented Sep 14, 2022

@JassAbidi Could you take a look at my comments when you have time? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Streaming to Delta Sink, Sharp Increase in Batch Time after ~36h Using Delta-1.0.0
4 participants