Skip to content

[SPARK-52024][SQL] Support cancel ShuffleQueryStage when propagate empty relations #50814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

summaryzb
Copy link
Contributor

@summaryzb summaryzb commented May 7, 2025

What changes were proposed in this pull request?

a.This pr introduce cancel queryStage mechanism.

  1. Cancel shuffle queryStage only When it's not reused nor reusing others
  2. Mark the cancellable queryStage when it's not completed yet in reOptimize stage
  3. When cancellable queryStage is determined to be cancelled by costEvaluator(cancellable queryStage in EmptyPropagate should always be cancelled), the queryStage has no relation with current logical plan nor physical plan any more
  4. Cancel the queryStage after the corresponding job is completed, will effect nothing
  5. Cancel the queryStage when the corresponding job is running, the job will be cancelled
  6. Cancel the queryStage before he corresponding job is started, the job corresponding this queryStage will still be submitted, this case should be optimized in future commit, however it's not worse than before this pr

b.Apply this mechanism to AQEPropagateEmptyRelation, cancel the running queryStages which are unnecessary since propagate empty relations.

Why are the changes needed?

  1. Cancel queryStage mechanism can make AQE more flexible, we can add more CBO feature by using this mechanism after this pr merged.
  2. Tasks corresponding to unnecessary running queryStages occupy the executor cores, thus wasting compute resource

Does this PR introduce any user-facing change?

Yes, user will see stage failure because of optimized stage cancellation , but this failure takes no effect to the query result

How was this patch tested?

Manual test, since we can not guarantee the completion order of query stages, it's not reliable to put it in unit test

./bin/spark-shell --master local[4]

scala> case class TestData(key: Int, value: String)
defined class TestData

scala> case class TestData2(a: Int, b: Int)
defined class TestData2

scala> spark.sparkContext.parallelize(Seq.empty[Int].map(i => TestData(i, i.toString))).toDF().createOrReplaceTempView("emptyTestData")

scala> spark.sparkContext.parallelize((1 to 100).map(i => TestData(i, i.toString))).toDF().createOrReplaceTempView("testData")

scala> spark.sparkContext.parallelize(TestData2(1, 1) ::TestData2(1, 2) ::TestData2(2, 1) ::TestData2(2, 2) ::TestData2(3, 1) ::TestData2(3, 2) :: Nil,2).toDF().createOrReplaceTempView("testData2")

scala>     spark.udf.register("fake_udf", (input: Int) => {
     |       Thread.sleep(100)
     |       input
     |     })

scala> spark.sql("SELECT t.key1 FROM emptyTestData join (SELECT testData.key as key1 FROM testData join testData2 ON fake_udf(testData.key)=fake_udf(testData2.a) ) t on t.key1 = emptyTestData.key union SELECT testData.key FROM testData join testData2 ON testData.key=testData2.a ").collect

before this pr
image
after this pr
image

Was this patch authored or co-authored using generative AI tooling?

No.

@summaryzb
Copy link
Contributor Author

@cloud-fan @LuciferYang @panbingkun PTAL

@LuciferYang
Copy link
Contributor

We should make GA green first

@summaryzb summaryzb force-pushed the SPARK-52024 branch 2 times, most recently from ece5812 to 51708f6 Compare May 16, 2025 10:15
summaryzb added 2 commits May 29, 2025 14:25
@github-actions github-actions bot added the BUILD label May 29, 2025
@cloud-fan
Copy link
Contributor

what if the shuffle stage is being reused by other places? I don't think we can cancel it without a ref counting to prove it's not used anywhere.

@summaryzb
Copy link
Contributor Author

what if the shuffle stage is being reused by other places? I don't think we can cancel it without a ref counting to prove it's not used anywhere.

Appreciate your time to review, could you help to explain more detail about 'reused by other places', currently i use AdaptiveExecutionContext#stageReuse to record the reuse cases, which similar to a ref counting.

@cloud-fan
Copy link
Contributor

I think it's more complicated than what you implemented now: what if the reuse stage is completed before we want to cancel the original stage? What if the reuse stage has not started yet? It's really a complicate algorithm with shuffle reuse being considered, can we explain it in detail in the PR description?

@summaryzb
Copy link
Contributor Author

Fine, update the PR description.

 1. Cancel shuffle queryStage only When it's not reused nor reusing others
 2. Mark the cancellable queryStage when it's not completed yet in reOptimize stage
 3. When cancellable queryStage is determined to be cancelled by costEvaluator(cancellable queryStage in `EmptyPropagate` should always be cancelled), the queryStage has no relation with current logical plan nor physical plan any more
 4. Cancel the queryStage after the corresponding job is completed, will effect nothing
 5. Cancel the queryStage when the corresponding job is running, the job will be cancelled
 6. Cancel the queryStage before he corresponding job is started, the job will still be submitted, this case should be optimized in future commit, however it's not worse than before this pr

@LuciferYang
Copy link
Contributor

Is there a possibility for this PR to progress further?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants