Skip to content

[SPARK-52403][SQL] Add metric to MergeRowExec for rows that do not match condition #51091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

szehon-ho
Copy link
Contributor

What changes were proposed in this pull request?

MergeRowsExec can record some useful information, like how many rows are not target of any action (doesn't match any condition), which can be emitted as metrics.

Why are the changes needed?

Improve debuggability of MERGE INTO

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add test to MergeIntoTableSuiteBase

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Jun 4, 2025
@szehon-ho szehon-ho closed this Jun 5, 2025
@szehon-ho szehon-ho reopened this Jun 5, 2025
@szehon-ho szehon-ho force-pushed the numTargetRowsCopied branch from 41238ff to ee9ff89 Compare June 5, 2025 19:18
case class Keep(
condition: Expression,
output: Seq[Expression],
isSystem: Boolean = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a small comment to explain what this flag means?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

case _ => false
}

if (instructions.forall(systemKeep)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't even need to evaluate the conditions to update this metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, let me take a closer look

Copy link
Contributor Author

@szehon-ho szehon-ho Jun 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea its a mistake, fixed (hopefully), also made a more complex test case

@szehon-ho szehon-ho force-pushed the numTargetRowsCopied branch from f82f6e5 to 42873d8 Compare June 7, 2025 07:42
Comment on lines 202 to 203
val keepCarryoverRowsInstruction = Keep(TrueLiteral, carryoverRowsOutput,
systemPredicate = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit scalastyle: break the whole Keep to newline instead of breaking at last argument.

Comment on lines 93 to 95
// flag marking that row should be considered not matching
// any user predicate for metric calculations
systemPredicate: Boolean = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would just name it copiedTargetRow, to name it what it is.
With the name systemPredicate there are two scenarios:

  • it will never be used for anything else, so better to name it for what it actually is used for.
  • naming it like this encourages it to be used for something else, and then have logic that decides somewhere what systemPredicate actually needs which I think would not be a good pattern.

@@ -233,6 +246,7 @@ case class MergeRowsExec(
}
}

longMetric("numTargetRowsCopied") += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is incorrect.
E.g. it will make a row from applyInstructions(row, notMatchedInstructions) that then does not match any of the notMatchedInstructions and falls through be counted here.
Could you add a test for this?
All rows that are copied over should be captured by keepCarryoverRowsInstruction at the end of matchedInstructions / notMatchedBySourceInstructions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea for this case i added special handling for that Keep instruction for this, not sure how to make it cleaner. Is that the case you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the code, this line should be deleted and the change https://github.com/apache/spark/pull/51091/files#diff-a572ff40254b26b4a903f101ee466dd2dff9b8c7954a3b957fe5fc25b87ee10aR234-R236 is already handling all the cases.
I haven't run it but I think if I modify the test

  test("Emit numTargetRowsCopied metrics") {
    withTempView("source") {
      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
        """{ "pk": 1, "salary": 100, "dep": "hr" }
          |{ "pk": 2, "salary": 200, "dep": "software" }
          |{ "pk": 3, "salary": 300, "dep": "hr" }
          |{ "pk": 4, "salary": 400, "dep": "marketing" }
          |{ "pk": 5, "salary": 500, "dep": "executive" }
          |""".stripMargin)

      val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
      sourceDF.createOrReplaceTempView("source")

      val mergeExec = findMergeExec {
        s"""MERGE INTO $tableNameAsString t
           |USING source s
           |ON t.pk = s.pk
           |WHEN MATCHED AND salary < 200 THEN
           | UPDATE SET salary = 1000
           |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
           | UPDATE SET salary = -1
           |WHEN NOT MATCHED AND s.pk < 10 THEN
           | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
           |""".stripMargin
      }

      mergeExec.metrics.get("numTargetRowsCopied") match {
        case Some(metric) => assert(metric.value == 3, "3 rows copied without updates")
        case None => fail("numCopiedRows metric not found")
      }

      checkAnswer(
        sql(s"SELECT * FROM $tableNameAsString"),
        Seq(
          Row(1, 1000, "hr"), // updated
          Row(2, 200, "software"),
          Row(3, 300, "hr"),
          Row(4, 400, "marketing"),
          Row(5, -1, "executive"), // updated
          Row(6, -1, "dummy")) // inserted
    }
  }

will fail assert(metric.value == 3, "3 rows copied without updates") because it will end up incrementing the metric on source row with pk=10.

Copy link
Contributor Author

@szehon-ho szehon-ho Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @juliuszsompolski for the test case! I was able to reproduce the duplicate

I think the duplicate comes from the fact that applyInstructions() handles all three cases:

  • matched
  • not matched
  • not matched by source

I added an extra filter to not have notMatched rows increment. It doesnt make sense as these are source rows only, and metric is about target.

I think we still need the case in the end, because the keepCarryOverRows is a logic only for Group Based merge. In Delta Based merge, the matched instructions do not have that, and the target row falls through to the end if it doesnt match any instruction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho in Delta Based merge if the target row falls through all the match instructions, it is not copied, it is not rewritten at all. See that in the next line after this you return null, so this row is not passed to WriteDeltaExec, and hence not written. The reason that Delta Based merge does not have this keepCarryOverRows catch-all fallback condition is exactly because it doesn't copy rows. So I think the only thing needed in the previous commit was to delete this one line.

Actually, the tests that you added to MergeIntoTableSuiteBase executed in the suites extending DeltaBasedMergeIntoTableSuiteBase should be showing 0 as number rows copied. I think the tests need to be split between DeltaBasedMergeIntoTableSuiteBase and GroupBasedMergeIntoTableSuite to account for that.

Copy link
Contributor Author

@szehon-ho szehon-ho Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I was interpreting it a bit differently, initially I thought its more like 'rows that are processed but not matching the filters'.

In delta-mode it is indeed not written.

I think you are right, the name of the metric is indeed 'copied', cc @aokolnychyi . Maybe there is some value in my interpretation, as another metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski @aokolnychyi added two metrics (I feel its more clear this way to capture the nuance of delta vs group based merge), let me know if it looks better?

@szehon-ho szehon-ho force-pushed the numTargetRowsCopied branch from b03dd56 to 99d55f2 Compare June 17, 2025 20:44
@szehon-ho
Copy link
Contributor Author

rebase

Copy link
Contributor

@juliuszsompolski juliuszsompolski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, naming nits only.

Comment on lines 77 to 79
"numTargetRowsUnmatched" -> SQLMetrics.createMetric(sparkContext,
"Number of target rows processed that do not match any condition. " +
"These will be dropped for delta-based merge and retained for group-based merge."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think the "Unmatched" in the name is a bit confusing because of overload with the "MATCHED", "NOT MATCHED", "NOT MATCHED BY SOURCE" of the whole MERGE.

When looking at it from the perspective of the whole MERGE:

  • you are already after the target-source join, so these are rows that can be MATCHED (or NOT MATCHED BY SOURCE), but then didn't pass any of the extra conditions of "WHEN MATCHED AND", and "WHEN NOT MATCHED BY SOURCE AND".
  • these are not really all target rows "processed" in the MERGE, because there are more rows scanned from the target that then may have been dropped in the join.

But from the perspective of the MergeRows operator:

  • "processed" is fine, because these are rows processed by the MergeRows operator.
  • I asked my teammates what name could be better than "Unmatched" and I think "Unchanged". Because these are the rows that passed into the MergeRows operator, but then fell through all the instructions.

So I think "numTargetRowsUnchanged". Maybe while at it also have a corresponding "numSourceRowsUnchanged"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe "Unused" even better? "Unchanged" may also be slightly ambiguous with the rows that are copied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, thanks for the thought on this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, to me unused is a bit wrong because we do use it (we copy it). is unchanged ok? I didnt see the ambiguity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rows that are incrementing this metric are not copied - MergeRowsExec drops them from output (return null), so in the end they are unused by the WriteDeltaExec (not copied, not used by any of the conditions).

Comment on lines 91 to 92
// A special case of Keep where the row is kept as is.
case class CarryOver(output: Seq[Expression]) extends Instruction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since then we name it "copied rows" in other places, maybe name it Copy?

@szehon-ho szehon-ho changed the title [SPARK-52403][SQL] Add numTargetRowsCopied metric to MergeRowExec [SPARK-52403][SQL] Add metric to MergeRowExec for rows that do not match condition Jun 18, 2025
Comment on lines +252 to +255
longMetric("numTargetRowsUnused") += 1
if (sourcePresent) {
longMetric("numSourceRowsUnused") += 1
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, my idea was to not increment this metric here. Do not count rows that were copied as unused. That's why I proposed the name "unused" and now I see the confusion in your last comment.
My idea, for these metrics is as follows:

  • numTargetRowsCopied - rows that were copied into the output unmodified. This metric is useful to show you the amount of write amplification.
  • num{Source|Target}RowsCopied - rows that have passed through the join in merge, but then were not used at all (MergeRows dropped it from output, returned null) because it didn't pass any of the instruction conditions. This metric is useful because it shows that there may be some filter pushdown potential to drop these rows earlier, in or before the join. For example in Delta, in some situations an OR of all matched contitions is pushed down, so that rows that would not pass any of them are pruned earlier: https://github.com/delta-io/delta/blob/5bbe7c81f65e2b136ba2211bc2d789c9d0206636/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala#L104 - a small optimization that as far as I see currently is not done in DSv2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants