Skip to content

Commit 95e5316

Browse files
author
Sun Cao
committed
code change
1 parent 71c9920 commit 95e5316

File tree

6 files changed

+208
-16
lines changed

6 files changed

+208
-16
lines changed

.github/workflows/build-and-test.yml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: Build and Test
2-
on: [push, pull_request, workflow_dispatch]
2+
on: [pull_request, workflow_dispatch]
33
jobs:
44
build-and-test-server:
55
runs-on: ubuntu-24.04
@@ -55,6 +55,33 @@ jobs:
5555
java-version: '17'
5656
- run: ./build/sbt client/test spark/test
5757

58+
build-and-test-client-scala-2.12:
59+
runs-on: ubuntu-24.04
60+
env:
61+
SPARK_LOCAL_IP: localhost
62+
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
63+
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
64+
AZURE_TEST_ACCOUNT_KEY: ${{ secrets.AZURE_TEST_ACCOUNT_KEY }}
65+
GOOGLE_APPLICATION_CREDENTIALS: /tmp/google_service_account_key.json
66+
GOOGLE_SERVICE_ACCOUNT_KEY: ${{ secrets.GOOGLE_SERVICE_ACCOUNT_KEY }}
67+
steps:
68+
- name: Checkout repository
69+
uses: actions/checkout@v2
70+
- name: Cache Scala, SBT
71+
uses: actions/cache@v4
72+
with:
73+
path: |
74+
~/.sbt
75+
~/.ivy2
76+
~/.cache/coursier
77+
key: build-and-test-client
78+
- name: Install Java 8
79+
uses: actions/setup-java@v3
80+
with:
81+
distribution: 'zulu'
82+
java-version: '8'
83+
- run: ./build/sbt ++2.12.10 client/test
84+
5885
python:
5986
runs-on: ubuntu-24.04
6087
strategy:

build.sbt

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,17 @@ import sbt.ExclusionRule
1818

1919
ThisBuild / parallelExecution := false
2020

21-
val sparkVersion = "4.0.0"
21+
val previousSparkVersion = "3.5.3"
22+
val latestSparkVersion = "4.0.0"
2223
val scala212 = "2.12.10"
2324
val scala213 = "2.13.13"
2425

26+
def sparkVersionFor(scalaVer: String): String = scalaVer match {
27+
case v if v.startsWith("2.12") => previousSparkVersion
28+
case v if v.startsWith("2.13") => latestSparkVersion
29+
case _ => sys.error(s"Unsupported Scala version: $scalaVer")
30+
}
31+
2532
lazy val commonSettings = Seq(
2633
organization := "io.delta",
2734
fork := true,
@@ -66,20 +73,24 @@ lazy val root = (project in file(".")).aggregate(client, spark, server)
6673
lazy val client = (project in file("client")) settings(
6774
name := "delta-sharing-client",
6875
scalaVersion := scala213,
69-
crossScalaVersions := Seq(scala213),
76+
crossScalaVersions := Seq(scala212, scala213),
7077
commonSettings,
7178
java17Settings,
7279
scalaStyleSettings,
7380
releaseSettings,
74-
libraryDependencies ++= Seq(
75-
"org.apache.httpcomponents" % "httpclient" % "4.5.14",
76-
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
77-
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
78-
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
79-
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
80-
"org.scalatest" %% "scalatest" % "3.2.3" % "test",
81-
"org.scalatestplus" %% "mockito-4-11" % "3.2.18.0" % "test"
82-
),
81+
libraryDependencies ++= {
82+
val sv = scalaVersion.value
83+
val sparkVer = sparkVersionFor(sv)
84+
Seq(
85+
"org.apache.httpcomponents" % "httpclient" % "4.5.14",
86+
"org.apache.spark" %% "spark-sql" % sparkVer % "provided",
87+
"org.apache.spark" %% "spark-catalyst" % sparkVer % "test" classifier "tests",
88+
"org.apache.spark" %% "spark-core" % sparkVer % "test" classifier "tests",
89+
"org.apache.spark" %% "spark-sql" % sparkVer % "test" classifier "tests",
90+
"org.scalatest" %% "scalatest" % "3.2.3" % "test",
91+
"org.scalatestplus" %% "mockito-4-11" % "3.2.18.0" % "test"
92+
)
93+
},
8394
Compile / sourceGenerators += Def.task {
8495
val file = (Compile / sourceManaged).value / "io" / "delta" / "sharing" / "client" / "package.scala"
8596
IO.write(file,
@@ -90,6 +101,16 @@ lazy val client = (project in file("client")) settings(
90101
|}
91102
|""".stripMargin)
92103
Seq(file)
104+
},
105+
// Use scala-2.12 and scala-2.13 folders for version-specific overrides
106+
Compile / unmanagedSourceDirectories ++= {
107+
val sv = scalaVersion.value
108+
val base = (Compile / sourceDirectory).value
109+
sv match {
110+
case v if v.startsWith("2.12") => Seq(base / "scala-2.12")
111+
case v if v.startsWith("2.13") => Seq(base / "scala-2.13")
112+
case _ => Seq.empty
113+
}
93114
}
94115
)
95116

@@ -102,10 +123,10 @@ lazy val spark = (project in file("spark")) dependsOn(client) settings(
102123
scalaStyleSettings,
103124
releaseSettings,
104125
libraryDependencies ++= Seq(
105-
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
106-
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
107-
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
108-
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
126+
"org.apache.spark" %% "spark-sql" % latestSparkVersion % "provided",
127+
"org.apache.spark" %% "spark-catalyst" % latestSparkVersion % "test" classifier "tests",
128+
"org.apache.spark" %% "spark-core" % latestSparkVersion % "test" classifier "tests",
129+
"org.apache.spark" %% "spark-sql" % latestSparkVersion % "test" classifier "tests",
109130
"org.scalatest" %% "scalatest" % "3.2.3" % "test"
110131
),
111132
Compile / sourceGenerators += Def.task {
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.sharing.spark.perf
18+
19+
import scala.reflect.runtime.universe.{termNames, typeOf, typeTag}
20+
21+
import org.apache.spark.sql.SparkSession
22+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
23+
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
24+
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, LogicalPlan, Project}
25+
import org.apache.spark.sql.catalyst.rules.Rule
26+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
27+
import org.apache.spark.sql.sources.BaseRelation
28+
29+
import io.delta.sharing.client.util.ConfUtils
30+
import io.delta.sharing.spark.RemoteDeltaSnapshotFileIndex
31+
32+
object DeltaSharingLimitPushDown extends Rule[LogicalPlan] {
33+
34+
def setup(spark: SparkSession): Unit = synchronized {
35+
if (!spark.experimental.extraOptimizations.contains(DeltaSharingLimitPushDown) ) {
36+
spark.experimental.extraOptimizations ++= Seq(DeltaSharingLimitPushDown)
37+
}
38+
}
39+
40+
def apply(p: LogicalPlan): LogicalPlan = {
41+
if (ConfUtils.limitPushdownEnabled(p.conf)) {
42+
p transform {
43+
case localLimit @ LocalLimit(
44+
literalExpr @ IntegerLiteral(limit),
45+
l @ LogicalRelationWithTable(
46+
r @ HadoopFsRelation(remoteIndex: RemoteDeltaSnapshotFileIndex, _, _, _, _, _),
47+
_)
48+
) =>
49+
if (remoteIndex.limitHint.isEmpty) {
50+
val spark = SparkSession.active
51+
LocalLimit(literalExpr,
52+
LogicalRelationShim.copyWithNewRelation(
53+
l,
54+
r.copy(
55+
location = remoteIndex.copy(limitHint = Some(limit)))(spark))
56+
)
57+
} else {
58+
localLimit
59+
}
60+
}
61+
} else {
62+
p
63+
}
64+
}
65+
}
66+
67+
/**
68+
* Extract the [[BaseRelation]] and [[CatalogTable]] from [[LogicalRelation]]. You can also
69+
* retrieve the instance of LogicalRelation like following:
70+
*
71+
* case l @ LogicalRelationWithTable(relation, catalogTable) => ...
72+
*
73+
* NOTE: This is copied from Spark 4.0 codebase - license: Apache-2.0.
74+
*/
75+
object LogicalRelationWithTable {
76+
def unapply(plan: LogicalRelation): Option[(BaseRelation, Option[CatalogTable])] = {
77+
Some(plan.relation, plan.catalogTable)
78+
}
79+
}
80+
81+
/**
82+
* This class helps the codebase to address the differences among multiple Spark versions.
83+
*/
84+
object LogicalRelationShim {
85+
/**
86+
* This method provides the ability of copying LogicalRelation instance across Spark versions,
87+
* when the caller only wants to replace the relation in the LogicalRelation.
88+
*/
89+
def copyWithNewRelation(src: LogicalRelation, newRelation: BaseRelation): LogicalRelation = {
90+
// We assume Spark would not change the order of the existing parameter, but it's even safe
91+
// as long as the first parameter is reserved to the `relation`.
92+
val paramsForPrimaryConstructor = src.productIterator.toArray
93+
paramsForPrimaryConstructor(0) = newRelation
94+
95+
val constructor = typeOf[LogicalRelation]
96+
.decl(termNames.CONSTRUCTOR)
97+
// Getting all the constructors
98+
.alternatives
99+
.map(_.asMethod)
100+
// Picking the primary constructor
101+
.find(_.isPrimaryConstructor)
102+
// A class must always have a primary constructor, so this is safe
103+
.get
104+
val constructorMirror = typeTag[LogicalRelation].mirror
105+
.reflectClass(typeOf[LogicalRelation].typeSymbol.asClass)
106+
.reflectConstructor(constructor)
107+
108+
constructorMirror.apply(paramsForPrimaryConstructor: _*).asInstanceOf[LogicalRelation]
109+
}
110+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql
18+
19+
import org.apache.spark.sql.catalyst.expressions.Expression
20+
import org.apache.spark.sql.execution.datasources.LogicalRelation
21+
22+
object DeltaSharingScanUtils {
23+
// A wrapper to expose Dataset.ofRows function.
24+
// This is needed because Dataset object is in private[sql] scope.
25+
def ofRows(spark: SparkSession, plan: LogicalRelation): DataFrame = {
26+
Dataset.ofRows(spark, plan)
27+
}
28+
29+
// A wrapper to expose Column.apply(expr: Expression) function.
30+
// This is needed because the Column object is in private[sql] scope.
31+
def toColumn(expr: Expression): Column = {
32+
Column(expr)
33+
}
34+
}

0 commit comments

Comments
 (0)