Skip to content

[SPARK-52505][K8S] Allow to create executor kubernetes service #51203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,12 @@
],
"sqlState" : "42702"
},
"EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT" : {
"message" : [
"Enabling the executor Kubernetes service requires <blockManagerPortConfigKey> to be set to a positive number, for instance <defaultShuffleServicePort>."
],
"sqlState" : "42000"
},
"EXEC_IMMEDIATE_DUPLICATE_ARGUMENT_ALIASES" : {
"message" : [
"The USING clause of this EXECUTE IMMEDIATE command contained multiple arguments with same alias (<aliases>), which is invalid; please update the command to specify unique aliases and then try it again."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,20 @@ private[spark] object Config extends Logging {
.toSequence
.createWithDefault(Nil)

val KUBERNETES_EXECUTOR_ENABLE_SERVICE =
ConfigBuilder("spark.kubernetes.executor.enableService")
.doc("If a Kubernetes service is created for the executor. " +
"A Kubernetes service is created for the executor pod that allows to connect to executor " +
"ports via the Kubernetes service instead of the pod host IP. Once the executor got " +
"decommissioned, connecting to such ports instantly fails with 'connection refused'. " +
"Connection to the port via the pod host IP instead fails with a 'connection timeout' " +
"after NETWORK_TIMEOUT, which defaults to 2 minutes. " +
"The executor kubernetes service provides access to the executor's block manager, so " +
"BLOCK_MANAGER_PORT has to be given a value greater than zero.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)

val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL =
ConfigBuilder("spark.kubernetes.executor.decommissionLabel")
.doc("Label to apply to a pod which is being decommissioned." +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, ServiceBuilder}

import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
import org.apache.spark.internal.config.{BLOCK_MANAGER_PORT, SHUFFLE_SERVICE_PORT}

class ExecutorServiceFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep {
private val spark_app_selector_label = "spark-app-selector"
private val spark_exec_id_label = "spark-exec-id"
private val service_selector_labels = Set(spark_app_selector_label, spark_exec_id_label)
private lazy val selector = conf.labels
.filter { case (key, _) => service_selector_labels.contains(key) }

private lazy val sparkAppSelector = getLabel(spark_app_selector_label)
private lazy val sparkExecId = getLabel(spark_exec_id_label)
// name length is 8 + 38 + 6 + 10 = 62
// which fits in KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH = 63
private lazy val serviceName = s"svc-$sparkAppSelector-exec-$sparkExecId"

// The executor kubernetes services requires BLOCK_MANAGER_PORT to be set
private val blockManagerPortName = "spark-block-manager"
private val blockManagerPort = conf.sparkConf.get(BLOCK_MANAGER_PORT)
SparkException.require(blockManagerPort > 0,
"EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT",
Map(
"blockManagerPortConfigKey" -> BLOCK_MANAGER_PORT.key,
"defaultShuffleServicePort" -> SHUFFLE_SERVICE_PORT.defaultValue.get.toString));

private def getLabel(label: String): String = {
val value = conf.labels.get(label)
value.getOrElse(
throw new SparkException(s"This feature step requires label $label")
)
}

override def configurePod(pod: SparkPod): SparkPod = {
SparkPod(
pod.pod,
// tell the executor entry point its Kubernetes service name
new ContainerBuilder(pod.container)
.addNewEnv()
.withName("EXECUTOR_SERVICE_NAME")
.withValue(serviceName)
.endEnv()
.build())
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
val service = new ServiceBuilder()
.withNewMetadata()
.withName(serviceName)
.endMetadata()
.withNewSpec()
.withSelector(selector.asJava)
.addNewPort()
.withName(blockManagerPortName)
.withPort(blockManagerPort)
.withNewTargetPort(blockManagerPort)
.endPort()
.endSpec()
.build()

Seq(service)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ class ExecutorPodsAllocator(
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
kubernetesClient.resourceList(resources: _*).forceConflicts().serverSideApply()
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,20 @@ private[spark] class KubernetesExecutorBuilder {
}
}

val optionalFeatures = Seq(
Some(conf.get(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE))
.filter(enabled => enabled)
.map(_ => new ExecutorServiceFeatureStep(conf))
).flatten

val features = Seq(
new BasicExecutorFeatureStep(conf, secMgr, resourceProfile),
new ExecutorKubernetesCredentialsFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new HadoopConfExecutorFeatureStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures
new LocalDirsFeatureStep(conf)) ++ optionalFeatures ++ userFeatures

val spec = KubernetesExecutorSpec(
initialPod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}
import io.fabric8.kubernetes.client.dsl.PodResource
import io.fabric8.kubernetes.client.dsl.{NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, ServerSideApplicable}
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{any, anyString, eq => meq}
import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester._
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec}
Expand Down Expand Up @@ -142,6 +143,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
val apl = mock[NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[HasMetadata]]
val ssa = mock[ServerSideApplicable[java.util.List[HasMetadata]]]
when(apl.forceConflicts()).thenReturn(ssa)
when(kubernetesClient.resourceList()).thenReturn(apl)
when(kubernetesClient.resourceList(any[HasMetadata]())).thenReturn(apl)
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace)
when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
*/
package org.apache.spark.scheduler.cluster.k8s

import scala.jdk.CollectionConverters.IterableHasAsScala

import io.fabric8.kubernetes.api.model.Service
import io.fabric8.kubernetes.client.KubernetesClient
import org.mockito.Mockito.mock

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.{SecurityManager, SparkConf, SparkIllegalArgumentException}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features.KubernetesExecutorCustomFeatureConfigStep
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.internal.config.{BLOCK_MANAGER_PORT, ConfigEntry}
import org.apache.spark.resource.ResourceProfile

class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
Expand Down Expand Up @@ -61,6 +65,57 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, defaultProfile).pod
}

test("SPARK-XXXXX: check executor kubernetes spec with service disabled by default") {
val sparkConf = baseConf
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
val secMgr = new SecurityManager(sparkConf)
val client = mock(classOf[KubernetesClient])
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)

val containerEnvs = spec.pod.container.getEnv.asScala
assert(!containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME"))

assert(spec.executorKubernetesResources.size === 0)
}

test("SPARK-XXXXX: check executor kubernetes spec with service enabled") {
val sparkConf = baseConf.clone
.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true)
.set(BLOCK_MANAGER_PORT, 1234)
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
val secMgr = new SecurityManager(sparkConf)
val client = mock(classOf[KubernetesClient])
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)

val containerEnvs = spec.pod.container.getEnv.asScala
assert(containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME"))
val containerEnv = containerEnvs.filter(_.getName === "EXECUTOR_SERVICE_NAME").head
assert(containerEnv.getValue === "svc-appId-exec-1")

assert(spec.executorKubernetesResources.size === 1)
val resource = spec.executorKubernetesResources.head
assert(resource.getKind === "Service")
val service = resource.asInstanceOf[Service]
assert(service.getMetadata.getName === "svc-appId-exec-1")
assert(service.getSpec.getPorts.size() === 1)
val port = service.getSpec.getPorts.get(0)
assert(port.getName === "spark-block-manager")
assert(port.getPort === 1234)
}

test("SPARK-XXXXX: check executor kubernetes service requires block manager port") {
val sparkConf = baseConf.clone.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true)
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
val secMgr = new SecurityManager(sparkConf)
val client = mock(classOf[KubernetesClient])
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
assertThrows[SparkIllegalArgumentException] {
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ case "$1" in
--executor-id $SPARK_EXECUTOR_ID
--cores $SPARK_EXECUTOR_CORES
--app-id $SPARK_APPLICATION_ID
--hostname $SPARK_EXECUTOR_POD_IP
${EXECUTOR_SERVICE_NAME:+--bind-address $SPARK_EXECUTOR_POD_IP}
--hostname ${EXECUTOR_SERVICE_NAME:-$SPARK_EXECUTOR_POD_IP}
--resourceProfileId $SPARK_RESOURCE_PROFILE_ID
--podName $SPARK_EXECUTOR_POD_NAME
)
Expand Down