Skip to content

Commit 79f30d5

Browse files
authored
[ZEPPELIN-6086] Remove Spark Shims and unofficial support for Spark 4.0
### What is this PR for? Add support for Spark 4 by removing the spark shims. There is only one version of shims now, and they are compatible with the current Spark 4.0 preview releases, so we can just remove the shim concept. ### What type of PR is it? Improvement ### Todos * [x] - Disable 4.0 tests in CI and re-enable supported version check ### What is the Jira issue? ZEPPELIN-6086 ### How should this be tested? A CI stage is added for Spark 4.0 tests, but these will need to be disabled until 4.0 is officially supported. ### Screenshots (if appropriate) ### Questions: * Does the license files need to update? No * Is there breaking changes for older versions? No * Does this needs documentation? Maybe Closes #4825 from Kimahriman/spark-4.0. Signed-off-by: Cheng Pan <[email protected]>
1 parent 383ae3b commit 79f30d5

File tree

16 files changed

+181
-300
lines changed

16 files changed

+181
-300
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,3 +142,6 @@ tramp
142142

143143
# pyenv file
144144
.python-version
145+
146+
# python venv
147+
.venv

spark/README.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ Spark interpreter is the first and most important interpreter of Zeppelin. It su
1616
- Scala module for Scala 2.12
1717
* scala-2.13
1818
- Scala module for Scala 2.13
19-
* spark-shims
20-
- Parent module for each Spark module
21-
* spark3-shims
22-
- Shims module for Spark3
19+
* spark-common
20+
- Common utils for all Scala versions
2321

2422

spark/interpreter/pom.xml

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,14 @@
5959
<!-- settings -->
6060
<pyspark.test.exclude>**/PySparkInterpreterMatplotlibTest.java</pyspark.test.exclude>
6161
<pyspark.test.include>**/*Test.*</pyspark.test.include>
62+
63+
<extraJavaTestArgs></extraJavaTestArgs>
6264
</properties>
6365

6466
<dependencies>
6567
<dependency>
6668
<groupId>org.apache.zeppelin</groupId>
67-
<artifactId>spark3-shims</artifactId>
69+
<artifactId>spark-common</artifactId>
6870
<version>${project.version}</version>
6971
</dependency>
7072

@@ -162,6 +164,13 @@
162164
<artifactId>spark-core_${spark.scala.binary.version}</artifactId>
163165
<version>${spark.version}</version>
164166
<scope>provided</scope>
167+
<exclusions>
168+
<!-- Leads to conflicting Jackson versions in tests -->
169+
<exclusion>
170+
<groupId>org.apache.avro</groupId>
171+
<artifactId>*</artifactId>
172+
</exclusion>
173+
</exclusions>
165174
</dependency>
166175

167176
<dependency>
@@ -313,7 +322,7 @@
313322
<target>
314323
<delete dir="../../interpreter/spark/pyspark" />
315324
<copy file="${project.build.directory}/${spark.archive}/python/lib/py4j-${py4j.version}-src.zip" todir="${project.build.directory}/../../../interpreter/spark/pyspark" />
316-
<zip basedir="${project.build.directory}/${spark.archive}/python" destfile="${project.build.directory}/../../../interpreter/spark/pyspark/pyspark.zip" includes="pyspark/*.py,pyspark/**/*.py" />
325+
<zip basedir="${project.build.directory}/${spark.archive}/python" destfile="${project.build.directory}/../../../interpreter/spark/pyspark/pyspark.zip" includes="pyspark/" />
317326
</target>
318327
</configuration>
319328
</execution>
@@ -359,7 +368,7 @@
359368
<configuration>
360369
<forkCount>1</forkCount>
361370
<reuseForks>false</reuseForks>
362-
<argLine>-Xmx3072m -XX:MaxMetaspaceSize=256m</argLine>
371+
<argLine>-Xmx3072m -XX:MaxMetaspaceSize=256m ${extraJavaTestArgs}</argLine>
363372
<excludes>
364373
<exclude>${pyspark.test.exclude}</exclude>
365374
<exclude>${tests.to.exclude}</exclude>
@@ -513,6 +522,35 @@
513522

514523
<profiles>
515524

525+
<profile>
526+
<id>java-17</id>
527+
<activation>
528+
<jdk>[17,)</jdk>
529+
</activation>
530+
<properties>
531+
<extraJavaTestArgs>
532+
-XX:+IgnoreUnrecognizedVMOptions
533+
--add-modules=jdk.incubator.vector
534+
--add-opens=java.base/java.lang=ALL-UNNAMED
535+
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
536+
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
537+
--add-opens=java.base/java.io=ALL-UNNAMED
538+
--add-opens=java.base/java.net=ALL-UNNAMED
539+
--add-opens=java.base/java.nio=ALL-UNNAMED
540+
--add-opens=java.base/java.util=ALL-UNNAMED
541+
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
542+
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
543+
--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
544+
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
545+
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
546+
--add-opens=java.base/sun.security.action=ALL-UNNAMED
547+
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
548+
-Djdk.reflect.useDirectMethodHandle=false
549+
-Dio.netty.tryReflectionSetAccessible=true
550+
</extraJavaTestArgs>
551+
</properties>
552+
</profile>
553+
516554
<!-- profile spark-scala-x only affect the unit test in spark/interpreter module -->
517555

518556
<profile>
@@ -534,6 +572,16 @@
534572
</properties>
535573
</profile>
536574

575+
<profile>
576+
<id>spark-4.0</id>
577+
<properties>
578+
<spark.version>4.0.0-preview2</spark.version>
579+
<protobuf.version>3.21.12</protobuf.version>
580+
<py4j.version>0.10.9.7</py4j.version>
581+
<libthrift.version>0.16.0</libthrift.version>
582+
</properties>
583+
</profile>
584+
537585
<!-- profile spark-x only affect spark version used in test -->
538586
<profile>
539587
<id>spark-3.5</id>

spark/interpreter/src/main/resources/python/zeppelin_pyspark.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919

2020
from py4j.java_gateway import java_import
2121
from pyspark.conf import SparkConf
22-
from pyspark.context import SparkContext
22+
from pyspark import SparkContext
2323

2424
# for back compatibility
25-
from pyspark.sql import SQLContext, Row
25+
from pyspark.sql import SQLContext
2626

2727
intp = gateway.entry_point
2828

spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,6 @@ public void tearDown() throws InterpreterException {
592592
if (this.interpreter != null) {
593593
this.interpreter.close();
594594
}
595-
SparkShims.reset();
596595
}
597596

598597
private InterpreterContext getInterpreterContext() {

spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java renamed to spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkUtilsTest.java

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.junit.jupiter.params.provider.CsvSource;
3838
import org.mockito.ArgumentCaptor;
3939

40-
class SparkShimsTest {
40+
class SparkUtilsTest {
4141

4242
@ParameterizedTest
4343
@CsvSource({"2.6.0, false",
@@ -64,29 +64,12 @@ class SparkShimsTest {
6464
"3.0.0-alpha4, true", // The latest fixed version
6565
"3.0.1, true"}) // Future versions
6666
void checkYarnVersionTest(String version, boolean expected) {
67-
SparkShims sparkShims =
68-
new SparkShims(new Properties()) {
69-
@Override
70-
public void setupSparkListener(String master,
71-
String sparkWebUrl,
72-
InterpreterContext context) {}
73-
74-
@Override
75-
public String showDataFrame(Object obj, int maxResult, InterpreterContext context) {
76-
return null;
77-
}
78-
79-
@Override
80-
public Object getAsDataFrame(String value) {
81-
return null;
82-
}
83-
};
84-
assertEquals(expected, sparkShims.supportYarn6615(version));
67+
assertEquals(expected, SparkUtils.supportYarn6615(version));
8568
}
8669

8770
@Nested
8871
class SingleTests {
89-
SparkShims sparkShims;
72+
SparkUtils sparkUtils;
9073
InterpreterContext mockContext;
9174
RemoteInterpreterEventClient mockIntpEventClient;
9275

@@ -96,18 +79,14 @@ public void setUp() {
9679
mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
9780
when(mockContext.getIntpEventClient()).thenReturn(mockIntpEventClient);
9881

99-
try {
100-
sparkShims = SparkShims.getInstance(SparkVersion.SPARK_3_3_0.toString(), new Properties(), null);
101-
} catch (Throwable e1) {
102-
throw new RuntimeException("All SparkShims are tried, but no one can be created.");
103-
}
82+
sparkUtils = new SparkUtils(new Properties(), null);
10483
}
10584

10685
@Test
10786
void runUnderLocalTest() {
10887
Properties properties = new Properties();
10988
properties.setProperty("spark.jobGroup.id", "zeppelin|user1|noteId|paragraphId");
110-
sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, properties, mockContext);
89+
sparkUtils.buildSparkJobUrl("local", "http://sparkurl", 0, properties, mockContext);
11190
@SuppressWarnings("unchecked")
11291
ArgumentCaptor<Map<String, String>> argument = ArgumentCaptor.forClass(HashMap.class);
11392
verify(mockIntpEventClient).onParaInfosReceived(argument.capture());
@@ -120,14 +99,14 @@ void runUnderLocalTest() {
12099
void runUnderYarnTest() {
121100
Properties properties = new Properties();
122101
properties.setProperty("spark.jobGroup.id", "zeppelin|user1|noteId|paragraphId");
123-
sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, properties, mockContext);
102+
sparkUtils.buildSparkJobUrl("yarn", "http://sparkurl", 0, properties, mockContext);
124103
@SuppressWarnings("unchecked")
125104
ArgumentCaptor<Map<String, String>> argument = ArgumentCaptor.forClass(HashMap.class);
126105
verify(mockIntpEventClient).onParaInfosReceived(argument.capture());
127106
Map<String, String> mapValue = argument.getValue();
128107
assertTrue(mapValue.keySet().contains("jobUrl"));
129108

130-
if (sparkShims.supportYarn6615(VersionInfo.getVersion())) {
109+
if (sparkUtils.supportYarn6615(VersionInfo.getVersion())) {
131110
assertTrue(mapValue.get("jobUrl").contains("/jobs/job?id="));
132111
} else {
133112
assertFalse(mapValue.get("jobUrl").contains("/jobs/job?id="));

spark/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@
5555
<module>spark-scala-parent</module>
5656
<module>scala-2.12</module>
5757
<module>scala-2.13</module>
58-
<module>spark-shims</module>
59-
<module>spark3-shims</module>
58+
<module>spark-common</module>
6059
</modules>
6160

6261
<build>

spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,9 @@ class SparkScala212Interpreter(conf: SparkConf,
198198
}
199199

200200
override def createZeppelinContext(): Unit = {
201-
val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
202-
sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
203-
z = new SparkZeppelinContext(sc, sparkShims,
201+
val sparkUtils = new SparkUtils(properties, sparkSession)
202+
sparkUtils.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
203+
z = new SparkZeppelinContext(sc, sparkUtils,
204204
interpreterGroup.getInterpreterHookRegistry,
205205
properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
206206
bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))

spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
3333
* ZeppelinContext for Spark
3434
*/
3535
class SparkZeppelinContext(val sc: SparkContext,
36-
val sparkShims: SparkShims,
36+
val sparkUtils: SparkUtils,
3737
val hooks2: InterpreterHookRegistry,
3838
val maxResult2: Int) extends ZeppelinContext(hooks2, maxResult2) {
3939

@@ -65,7 +65,7 @@ class SparkZeppelinContext(val sc: SparkContext,
6565

6666
override def getInterpreterClassMap: util.Map[String, String] = interpreterClassMap.asJava
6767

68-
override def showData(obj: Any, maxResult: Int): String = sparkShims.showDataFrame(obj, maxResult, interpreterContext)
68+
override def showData(obj: Any, maxResult: Int): String = sparkUtils.showDataFrame(obj, maxResult, interpreterContext)
6969

7070
/**
7171
* create paragraph level of dynamic form of Select with no item selected.
@@ -237,6 +237,6 @@ class SparkZeppelinContext(val sc: SparkContext,
237237
}
238238

239239
def getAsDataFrame(name: String): Object = {
240-
sparkShims.getAsDataFrame(get(name).toString)
240+
sparkUtils.getAsDataFrame(get(name).toString)
241241
}
242242
}

spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,9 @@ class SparkScala213Interpreter(conf: SparkConf,
200200
}
201201

202202
override def createZeppelinContext(): Unit = {
203-
val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
204-
sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
205-
z = new SparkZeppelinContext(sc, sparkShims,
203+
val sparkUtils = new SparkUtils(properties, sparkSession)
204+
sparkUtils.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
205+
z = new SparkZeppelinContext(sc, sparkUtils,
206206
interpreterGroup.getInterpreterHookRegistry,
207207
properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
208208
bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))

spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
3333
* ZeppelinContext for Spark
3434
*/
3535
class SparkZeppelinContext(val sc: SparkContext,
36-
val sparkShims: SparkShims,
36+
val sparkUtils: SparkUtils,
3737
val hooks2: InterpreterHookRegistry,
3838
val maxResult2: Int) extends ZeppelinContext(hooks2, maxResult2) {
3939

@@ -65,7 +65,7 @@ class SparkZeppelinContext(val sc: SparkContext,
6565

6666
override def getInterpreterClassMap: util.Map[String, String] = interpreterClassMap.asJava
6767

68-
override def showData(obj: Any, maxResult: Int): String = sparkShims.showDataFrame(obj, maxResult, interpreterContext)
68+
override def showData(obj: Any, maxResult: Int): String = sparkUtils.showDataFrame(obj, maxResult, interpreterContext)
6969

7070
/**
7171
* create paragraph level of dynamic form of Select with no item selected.
@@ -237,6 +237,6 @@ class SparkZeppelinContext(val sc: SparkContext,
237237
}
238238

239239
def getAsDataFrame(name: String): Object = {
240-
sparkShims.getAsDataFrame(get(name).toString)
240+
sparkUtils.getAsDataFrame(get(name).toString)
241241
}
242242
}

spark/spark3-shims/pom.xml renamed to spark/spark-common/pom.xml

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2020
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
2122
<parent>
2223
<artifactId>spark-parent</artifactId>
2324
<groupId>org.apache.zeppelin</groupId>
@@ -26,21 +27,19 @@
2627
</parent>
2728

2829
<modelVersion>4.0.0</modelVersion>
29-
<artifactId>spark3-shims</artifactId>
30+
<artifactId>spark-common</artifactId>
3031
<packaging>jar</packaging>
31-
<name>Zeppelin: Spark3 Shims</name>
32-
33-
<properties>
34-
<scala.binary.version>2.12</scala.binary.version>
35-
<spark.version>3.4.1</spark.version>
36-
</properties>
32+
<name>Zeppelin: Spark Common</name>
3733

3834
<dependencies>
39-
35+
<!--
36+
This is for ZEPPELIN-2221 using VersionInfo for check the version of Yarn.
37+
It's checked that VersionInfo is compatible at least 2.2.0 to the latest one.
38+
-->
4039
<dependency>
41-
<groupId>org.apache.zeppelin</groupId>
42-
<artifactId>spark-shims</artifactId>
43-
<version>${project.version}</version>
40+
<groupId>org.apache.hadoop</groupId>
41+
<artifactId>hadoop-client-api</artifactId>
42+
<scope>provided</scope>
4443
</dependency>
4544

4645
<dependency>

0 commit comments

Comments
 (0)