Skip to content

Apply filter pushdown to source rows for the right outer join of matched only case #438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

LantaoJin
Copy link
Contributor

@LantaoJin LantaoJin commented May 28, 2020

The reason for this optimization is similar to #432.

In matched only case, we use a right outer join between source and target to write the changes.
Due to the non-deterministic UDF makeMetricUpdateUDF, the predicate pushdown for source rows is not applied. This PR is manually adding a filter before Projection with the non-deterministic UDF to trigger filter pushdown.

Besides the performance improvement by filter pushdown, without this, Spark Driver may easily trigger frequent full GC problem if the source table contains mass files:
(in our inner version, we use target left outer join source instead of source right out join target, so the right side in the below graphs is source table)
screenshot-1-1
screenshot-2-2

From the Class Histogram when Spark Driver full GC, we can see 3.8 millionSerializableFileStatus which basically matches the file count in the source table in above graphs. This hold memory could not be GC during the join processing.

2020-05-24T 13:11:40.996-0700: [Full GC (Allocation Failure) 49G->39G(50G), 49.8580399 secs]
[Eden: 0.0B(2528.0M)->0.0B(2560.0M) Survivors: 32.0M->0.0B Heap: 49.4G(50.0G)->39.9G(50.0G)], [Metaspace: 201466K->200916K(239616K)]
2020-05-24T13:12:30.854-0700: [Class Histogram (after full gc):
num #instances #bytes class name
----------------------------------------------
1: 148517177 27413443128 [C
2: 148514960 4752478720 java.lang.String
3: 22946326 3304270944 java.net.URI
4: 20371798 2118666992 org.apache.hadoop.fs.LocatedFileStatus
5: 20383330 978399840 org.apache.hadoop.fs.permission.FsPermission
6: 22945656 550695744 org.apache.hadoop.fs.Path
7: 20371798 488923152 [Lorg.apache.hadoop.fs.BlockLocation;
8: 51688 344773816 [B
9: 3876863 279134136 org.apache.spark.sql.execution.datasources.PartitionedFile
10: 3806959 274101048 org.apache.spark.sql.execution.datasources.InMemoryFileIndex$SerializableFileStatus
11: 2561048 245860608 org.apache.hadoop.fs.FileStatus
12: 647901 186946488 [Lscala.collection.mutable.HashEntry;

After applied this optimization, the frequent full GC problem caused by this scenario had gone. And the performance of this out join was greatly improved.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Jun 17, 2020

Gentle ping @tdas @zsxwing @jose-torres @brkyvz

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Jul 16, 2020

Any comments? Gentle ping @tdas @brkyvz @zsxwing @gatorsmile

@GrigorievNick
Copy link
Contributor

Any reason why this was not merged or at least reviewed?
I check this branch with my production job, and get a big performance and resource usage boost?

Copy link
Contributor

@jaceklaskowski jaceklaskowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -214,7 +214,7 @@ case class MergeIntoCommand(
private def isMatchedOnly: Boolean = notMatchedClauses.isEmpty && matchedClauses.nonEmpty

override lazy val metrics = Map[String, SQLMetric](
"numSourceRows" -> createMetric(sc, "number of source rows"),
"numSourceRows" -> createMetric(sc, "number of source rows participated in merge"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/participated in merge/involved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants