Skip to content

Introduce a new conf for vacuum parallel listing #886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val relativizeIgnoreError =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)

val fileListingParallelism =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_FILE_LISTING_PARALLELISM)
.getOrElse(
Option(spark.sessionState.conf.parallelPartitionDiscoveryParallelism).getOrElse(200)
)
Comment on lines +135 to +139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add unit tests for this config behavior? spark.sessionState.conf.parallelPartitionDiscoveryParallelism will never return null if I recall correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sessionState.conf.parallelPartitionDiscoveryParallelism is a spark conf. Not sure how can I check if it is set or not. I can compare the value to its default one 10000. but if the default changes in spark, we will use always the default instead of 200.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. This is a good point. I found the following code should work:

try {
  spark.getConfString(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM.key)
  Some(spark.sessionState.conf.parallelPartitionDiscoveryParallelism)
} catch {
  case _: NoSuchElementException => None
}


val validFiles = snapshot.stateDS
.mapPartitions { actions =>
val reservoirBase = new Path(basePath)
Expand Down Expand Up @@ -168,14 +174,13 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
}.toDF("path")

val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism

val allFilesAndDirs = DeltaFileOperations.recursiveListDirs(
spark,
Seq(basePath),
hadoopConf,
hiddenFileNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
fileListingParallelism = Option(parallelism)
fileListingParallelism = Option(fileListingParallelism)
)
.groupByKey(x => x.path)
.mapGroups { (k, v) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ trait DeltaSQLConfBase {
.checkValue(_ > 0, "parallelDelete.parallelism must be positive")
.createOptional

val DELTA_VACUUM_FILE_LISTING_PARALLELISM =
buildConf("vacuum.fileListing.parallelism")
.doc("Sets the number of partitions to use for file listing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sets the number of parallelism to use for file listing recursively during a vacuum command. If not set, defaults to 'spark.sql.sources.parallelPartitionDiscovery.parallelism'. Set the number to prevent file listing from generating too many tasks.

.intConf
.checkValue(_ > 0, "fileListing.parallelism must be positive")
.createOptional

val DELTA_SCHEMA_AUTO_MIGRATE =
buildConf("schema.autoMerge.enabled")
.doc("If true, enables schema merging on appends and on overwrites.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,6 @@ trait DeltaVacuumSuiteBase extends QueryTest
class DeltaVacuumSuite
extends DeltaVacuumSuiteBase with DeltaSQLCommandTest {
override def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.sources.parallelPartitionDiscovery.parallelism", "2")
super.sparkConf.set("spark.databricks.delta.vacuum.fileListing.parallelism", "2")
}
}