Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35284

Kubernetes Fabric exception with Scala programs in Spark 3.x

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 3.0.0
    • Fix Version/s: None
    • Component/s: Kubernetes
    • Labels:
      None
    • Environment:

      Docker Desktop v 3.2 on Windows 10. Kubernetes v1.19.7.

      Apps are launched with the latest Spark Operator. Kafka with Confluent Platform 6.0

       

      Description

      Exception occurs when running a small Scala app on Spark 3.x on Kubernetes. Python programs work fine. The applications are launched using Spark Operator.

      The app uses Spark Structured Streams and reads and writes JSON data to a Kafka topic. This happens during development so only 5-10 small records are being written, and the app doesn't run for more than 3-4 minutes.

      This error is somewhat unpredictable but results in different failure scenarios making Scala apps very unstable.

      eg. Kafka read succeeds but Kafka write fails

      eg. writes to Console or Memory don't work at all - no output is produced.

      eg. Read from file stream and write to Kafka usually works

      21/04/30 10:24:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.1.14.118, executor 1, partition 0, PROCESS_LOCAL, 8414 bytes)
      21/04/30 10:24:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.1.14.118:43469 (size: 9.5 KiB, free: 117.0 MiB)
      21/04/30 10:24:53 ERROR Utils: Uncaught exception in thread kubernetes-executor-pod-polling-sync
      io.fabric8.kubernetes.client.KubernetesClientException: Operation: [list] for kind: [Pod] with name: [null] in namespace: [spark-app] failed.
      at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
      at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
      at io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:155)
      at io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:621)
      at io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:70)
      at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsPollingSnapshotSource$PollRunnable.$anonfun$run$1(ExecutorPodsPollingSnapshotSource.scala:61)
      at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
      at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsPollingSnapshotSource$PollRunnable.run(ExecutorPodsPollingSnapshotSource.scala:56)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
      at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
      at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
      at java.base/java.lang.Thread.run(Unknown Source)
      Caused by: java.net.SocketTimeoutException: timeout
      at okio.Okio$4.newTimeoutException(Okio.java:232)
      at okio.AsyncTimeout.exit(AsyncTimeout.java:285)
      at okio.AsyncTimeout$2.read(AsyncTimeout.java:241)
      at okio.RealBufferedSource.indexOf(RealBufferedSource.java:354)
      at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:226)
      at okhttp3.internal.http1.Http1Codec.readHeaderLine(Http1Codec.java:215)
      at okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)
      at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:88)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
      at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
      at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
      at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
      at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:127)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
      at io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:134)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
      at io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
      at io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:109)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
      at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
      at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:257)
      at okhttp3.RealCall.execute(RealCall.java:93)
      at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:469)
      at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:430)
      at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:412)
      at io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:151)
      ... 11 more
      Caused by: java.net.SocketTimeoutException: Read timed out
      at java.base/java.net.SocketInputStream.socketRead0(Native Method)
      at java.base/java.net.SocketInputStream.socketRead(Unknown Source)
      at java.base/java.net.SocketInputStream.read(Unknown Source)
      at java.base/java.net.SocketInputStream.read(Unknown Source)
      at java.base/sun.security.ssl.SSLSocketInputRecord.read(Unknown Source)
      at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(Unknown Source)
      at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(Unknown Source)
      at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(Unknown Source)
      at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(Unknown Source)
      at okio.Okio$2.read(Okio.java:140)
      at okio.AsyncTimeout$2.read(AsyncTimeout.java:237)
      ... 43 more

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              kdoshi Ketan Doshi
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated: