Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-39115

Can't submit second Spark job on K8S using InProcesslauncher

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.2.0, 3.2.1
    • None
    • Kubernetes
    • None

    Description

      We tried to use Spark InProcessLauncher to submit Spark job on Kubernetes. The purpose was to be able to launch several jobs within the same Java process. The first job run successfully, but the second one fails with:

      2022-05-06 10:00:16.867 [spark-app-2: '...parkEngineDriver'] WARN  o.a.s.launcher.InProcessAppHandle - Application failed with exception.
      io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://0.0.0.0:37811/api/v1/namespaces/connectivity/configmaps/spark-drv-22fc5d8099ab53dc-conf-map. Message: ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=data, message=Forbidden: field is immutable when `immutable` is set, reason=FieldValueForbidden, additionalProperties={})], group=null, kind=ConfigMap, name=spark-drv-22fc5d8099ab53dc-conf-map, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={}).
          at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:639)
          at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:578)
          at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:543)
          at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:504)
          at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:330)
          at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:310)
          at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:898)
          at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:132)
          at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:137)
          at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:97)
          at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:38)
          at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:44)
          at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:25)
          at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.lambda$createOrReplaceItem$1(CreateOrReplaceHelper.java:78)
          at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.replace(CreateOrReplaceHelper.java:96)
          at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:69)
          at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplaceItem(CreateOrReplaceHelper.java:91)
          at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplaceOrDeleteExisting(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:454)
          at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:297)
          at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:66)
          at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:150)
          at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4(KubernetesClientApplication.scala:220)
          at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4$adapted(KubernetesClientApplication.scala:214)
          at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2713)
          at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:214)
          at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:186)
          at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
          at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
          at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
          at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
          at org.apache.spark.deploy.InProcessSparkSubmit$.main(SparkSubmit.scala:984)
          at org.apache.spark.deploy.InProcessSparkSubmit.main(SparkSubmit.scala)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
          at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.base/java.lang.reflect.Method.invoke(Method.java:568)
          at org.apache.spark.launcher.InProcessAppHandle.lambda$start$0(InProcessAppHandle.java:72)
          at java.base/java.lang.Thread.run(Thread.java:833) 
      

      Attaching code example:

       val launcher = new InProcessLauncher()
       .setMaster("k8s://https://0.0.0.0:37811")
       .setDeployMode("cluster")
       .setAppResource("{location_to_pi_example}")
       .setMainClass("org.apache.spark.examples.SparkPi")
       .setConf("spark.kubernetes.container.image", "{spark_driver_image}")
       .setConf("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
       .setConf("spark.kubernetes.namespace", "default")
       .setConf("spark.kubernetes.executor.disableConfigMap", "true")
       .setConf("spark.kubernetes.container.image.pullPolicy", "Always")
      
       launcher.setPropertiesFile(getClass.getResource("/spark-defaults.conf").getPath)
      

      After a research I found these lines in org.apache.spark.deploy.k8s.submit.KubernetesClientUtils.scala file:

        val configMapNameExecutor: String = configMapName(s"spark-exec-${KubernetesUtils.uniqueID()}")
      
        val configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}")
      

      Switching from `val` to `def` solves the issue.

      Attachments

        Activity

          People

            Unassigned Unassigned
            Zhyshko Yury
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: