Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.2.0, 3.2.1
-
None
-
None
Description
We tried to use Spark InProcessLauncher to submit Spark job on Kubernetes. The purpose was to be able to launch several jobs within the same Java process. The first job run successfully, but the second one fails with:
2022-05-06 10:00:16.867 [spark-app-2: '...parkEngineDriver'] WARN o.a.s.launcher.InProcessAppHandle - Application failed with exception. io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://0.0.0.0:37811/api/v1/namespaces/connectivity/configmaps/spark-drv-22fc5d8099ab53dc-conf-map. Message: ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=data, message=Forbidden: field is immutable when `immutable` is set, reason=FieldValueForbidden, additionalProperties={})], group=null, kind=ConfigMap, name=spark-drv-22fc5d8099ab53dc-conf-map, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=ConfigMap "spark-drv-22fc5d8099ab53dc-conf-map" is invalid: data: Forbidden: field is immutable when `immutable` is set, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={}). at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:639) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:578) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:543) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:504) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:330) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:310) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:898) at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:132) at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:137) at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:97) at io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:38) at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:44) at io.fabric8.kubernetes.client.handlers.core.v1.ConfigMapHandler.replace(ConfigMapHandler.java:25) at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.lambda$createOrReplaceItem$1(CreateOrReplaceHelper.java:78) at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.replace(CreateOrReplaceHelper.java:96) at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:69) at io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplaceItem(CreateOrReplaceHelper.java:91) at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplaceOrDeleteExisting(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:454) at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:297) at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:66) at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:150) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4(KubernetesClientApplication.scala:220) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$4$adapted(KubernetesClientApplication.scala:214) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2713) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:214) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:186) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.InProcessSparkSubmit$.main(SparkSubmit.scala:984) at org.apache.spark.deploy.InProcessSparkSubmit.main(SparkSubmit.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.apache.spark.launcher.InProcessAppHandle.lambda$start$0(InProcessAppHandle.java:72) at java.base/java.lang.Thread.run(Thread.java:833)
Attaching code example:
val launcher = new InProcessLauncher() .setMaster("k8s://https://0.0.0.0:37811") .setDeployMode("cluster") .setAppResource("{location_to_pi_example}") .setMainClass("org.apache.spark.examples.SparkPi") .setConf("spark.kubernetes.container.image", "{spark_driver_image}") .setConf("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") .setConf("spark.kubernetes.namespace", "default") .setConf("spark.kubernetes.executor.disableConfigMap", "true") .setConf("spark.kubernetes.container.image.pullPolicy", "Always") launcher.setPropertiesFile(getClass.getResource("/spark-defaults.conf").getPath)
After a research I found these lines in org.apache.spark.deploy.k8s.submit.KubernetesClientUtils.scala file:
val configMapNameExecutor: String = configMapName(s"spark-exec-${KubernetesUtils.uniqueID()}") val configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}")
Switching from `val` to `def` solves the issue.