Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.4.4
-
None
-
None
-
emr-5.28.0
Description
Hi,
We are getting NPEs with spark structured streaming from time-to-time. Here's the stacktrace:
java.lang.NullPointerException at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore$$anonfun$iterator$1.apply(HDFSBackedStateStoreProvider.scala:164) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore$$anonfun$iterator$1.apply(HDFSBackedStateStoreProvider.scala:163) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:220) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.kinesis.KinesisWriteTask.execute(KinesisWriteTask.scala:50) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KinesisWriter.scala:40) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KinesisWriter.scala:40) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KinesisWriter.scala:40) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KinesisWriter.scala:40) at org.apache.spark.sql.kinesis.KinesisWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KinesisWriter.scala:38) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
As you can see we are using https://github.com/qubole/kinesis-sql/ for writing the kinesis queue, the state backend is HDFS.