Description
We have a Kubernetes Cluster for Spark(2.4.6) and Zeppelin(0.9.0). While running below script with dependency (org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6) get following error.
Had tried running this script with Spark version(2.4.3) and Zeppelin(0.8.1) but got the same issue. While running this script on spark interactive shell, it works perfectly fine.
Please advice, what pending or missing configuration in Zeppelin?
************************Script******************************
%spark
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import java.nio.file.{Paths, Files}
import java.nio.charset.StandardCharsets
val jaas2 = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"*******************\" password=\"**********************\";"
val wschema = new StructType().add("epic", "string").add("ofrClose", "float").add("ofrOpen", "float")
val ds = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "pkc-4ygn6.europe-west3.gcp.confluent.cloud:9092").option("kafka.sasl.jaas.config",jaas2).option("kafka.security.protocol","SASL_SSL").option("kafka.ssl.endpoint.identification.algorithm","https").option("kafka.sasl.mechanism","PLAIN").option("startingOffsets", "earliest").option("subscribe", "raw-data-feed").load()
val ds1 = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val structuredDS = ds1.select(from_json(col("value"), wschema).as("payload")).select("payload.*")
println("WriteStream")
val query = structuredDS.writeStream.format("console").option("truncate", "false").start()
***********************************Error**********************************
WriteStream Start org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 8ccfb813-8a43-45c2-a730-0de408e0168d, runId = f098899f-2138-4545-ae4d-c497cf0cc0e3] terminated with exception: Writing job aborted. at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) Caused by: org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2782) at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2782) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2782) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) ... 1 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 4 times, most recent failure: Lost task 2.3 in stage 0.0 (TID 11, 172.17.0.7, executor 1): java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID = -3328732449542231715, local class serialVersionUID = 4416556597546473068 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64) ... 35 more Caused by: java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID = -3328732449542231715, local class serialVersionUID = 4416556597546473068 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)