Details
-
Bug
-
Status: In Progress
-
Minor
-
Resolution: Unresolved
-
3.1.0, 3.2.0, 3.3.0
-
None
Description
If we use the Spark Launcher to launch our spark apps in k8s:
val sparkLauncher = new InProcessLauncher() .setMaster(k8sMaster) .setDeployMode(deployMode) .setAppName(appName) .setVerbose(true) sparkLauncher.startApplication(new SparkAppHandle.Listener { ...
We have an issue when we launch another spark driver in the same namespace where other spark app was running:
kp -n audit-exporter-eee5073aac -w NAME READY STATUS RESTARTS AGE audit-exporter-71489e843d8085c0-driver 1/1 Running 0 9m54s audit-exporter-7e6b8b843d80b9e6-exec-1 1/1 Running 0 9m40s data-io-120204843d899567-driver 0/1 Terminating 0 1s data-io-120204843d899567-driver 0/1 Terminating 0 2s data-io-120204843d899567-driver 0/1 Terminating 0 3s data-io-120204843d899567-driver 0/1 Terminating 0 3s
The error is:
{"time":"2022-11-03T12:49:45.626Z","lvl":"WARN","logger":"o.a.s.l.InProcessAppHandle","thread":"spark-app-38: 'data-io'","msg":"Application failed with exception.","stack_trace":"io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://kubernetes.default/api/v1/namespaces/audit-exporter-eee5073aac/configmaps/spark-drv-d19c37843d80350c-conf-map. Message: ConfigMap \"spark-drv-d19c37843d80350c-conf-map\" is invalid: data: Forbidden: field is immutable when `immutable` is set. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=data, message=Forbidden: field is immutable when `immutable` is set, reason=FieldValueForbidden, additionalProperties={})], group=null, kind=ConfigMap, name=spark-drv-d19c37843d80350c-conf-map, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=ConfigMap \"spark-drv-d19c37843d80350c-conf-map\" is invalid: data: Forbidden: field is immutable when `immutable` is set, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={}).\n\tat io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:682)\n\tat io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:661)\n\tat io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612)\n\tat io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:555)\n\tat io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:518)\n\tat io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:342)\n\tat io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:322)\n\tat io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:649)\n\tat io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$1(HasMetadataOperation.java:195)\n\tat io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation$$Lambda$5360/000000000000000000.apply(Unknown Source)\n\tat io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:200)\n\tat io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:141)\n\tat io.fabric8.kubernetes.client.dsl.base.BaseOperation$$Lambda$4618/000000000000000000.apply(Unknown Source)\n\tat io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.replace(CreateOrReplaceHelper.java:69)\n\tat io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:61)\n\tat io.fabric8.kubernetes.client.dsl.base.BaseOperation.createOrReplace(BaseOperation.java:318)\n\tat io.fabric8.kubernetes.client.dsl.base.BaseOperation.createOrReplace(BaseOperation.java:83)\n\tat io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl.java:105)\n\tat io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.lambda$createOrReplace$7(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:174)\n\tat io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl$$Lambda$5012/000000000000000000.apply(Unknown Source)\n\tat java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)\n\tat java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)\n\tat java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)\n\tat java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)\n\tat io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:176)\n\tat io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.createOrReplace(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:54)\n\tat org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:175)\n\tat org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$5(KubernetesClientApplication.scala:248)\n\tat org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$5$adapted(KubernetesClientApplication.scala:242)\n\tat org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$Lambda$4885/000000000000000000.apply(Unknown Source)\n\tat org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2764)\n\tat org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:242)\n\tat org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:214)\n\tat org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)\n\tat org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\n\tat org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\n\tat org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\n\tat org.apache.spark.deploy.InProcessSparkSubmit$.main(SparkSubmit.scala:987)\n\tat org.apache.spark.deploy.InProcessSparkSubmit.main(SparkSubmit.scala)\n\tat jdk.internal.reflect.GeneratedMethodAccessor432.invoke(Unknown Source)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)\n\tat java.base/java.lang.reflect.Method.invoke(Unknown Source)\n\tat org.apache.spark.launcher.InProcessAppHandle.lambda$start$0(InProcessAppHandle.java:72)\n\tat org.apache.spark.launcher.InProcessAppHandle$$Lambda$4658/000000000000000000.run(Unknown Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\n"
When launching the second algorithm (data-io) in the same namespace, the name of the configmap is the same as the one created for the previous running algorithm (spark-drv-d19c37843d80350c-conf-map), and since it is immutable, it fails.
The solution is pretty straightforward, simply change here:
https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala#L46
From val to def, so the uniqueID is always new when we call to configMapNameDriver:
def configMapNameDriver: String = configMapName(s"spark-drv-${KubernetesUtils.uniqueID()}")
We have tested and it works in our case. I could do the pull request if you think it is a bug