Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12591

NullPointerException using checkpointed mapWithState with KryoSerializer

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.0
    • Fix Version/s: 1.6.1, 2.0.0
    • Component/s: DStreams
    • Labels:
      None
    • Environment:

      MacOSX
      Java(TM) SE Runtime Environment (build 1.8.0_20-ea-b17)

      Description

      Issue occured after upgrading to the RC4 of Spark (streaming) 1.6.0 to (re)test the new mapWithState API, after previously reporting issue SPARK-11932 (https://issues.apache.org/jira/browse/SPARK-11932).

      For initial report, see http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-streaming-1-6-0-RC4-NullPointerException-using-mapWithState-tt15830.html

      Narrowed it down to an issue unrelated to Kafka directstream, but, after observing very unpredictable behavior as a result of changes to the Kafka messages format, it seems to be related to KryoSerialization in specific.

      For test case, see my modified version of the StatefulNetworkWordCount example: https://gist.github.com/juyttenh/9b4a4103699a7d5f698f

      To reproduce, use RC4 of Spark-1.6.0 and

      • start nc:
        nc -lk 9999
      • execute the supplied test case:
        bin/spark-submit --class org.apache.spark.examples.streaming.StatefulNetworkWordCount --master local[2] file:///some-assembly-jar localhost 9999

      Error scenario:

      • put some text in the nc console with the job running, and observe correct functioning of the word count
      • kill the spark job
      • add some more text in the nc console (with the job not running)
      • restart the spark job and observe the NPE
        (you might need to repeat this a couple of times to trigger the exception)

      Here's the stacktrace:

      15/12/31 11:43:47 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 5)
      java.lang.NullPointerException
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103)
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
      	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
      	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      	at org.apache.spark.scheduler.Task.run(Task.scala:89)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      15/12/31 11:43:47 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 6, localhost, partition 1,NODE_LOCAL, 2239 bytes)
      15/12/31 11:43:47 INFO Executor: Running task 1.0 in stage 4.0 (TID 6)
      15/12/31 11:43:47 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 5, localhost): java.lang.NullPointerException
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103)
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
      	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
      	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      	at org.apache.spark.scheduler.Task.run(Task.scala:89)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      
      15/12/31 11:43:47 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
      15/12/31 11:43:47 INFO TaskSchedulerImpl: Cancelling stage 4
      15/12/31 11:43:47 INFO Executor: Executor is trying to kill task 1.0 in stage 4.0 (TID 6)
      15/12/31 11:43:47 INFO TaskSchedulerImpl: Stage 4 was cancelled
      15/12/31 11:43:47 INFO DAGScheduler: ResultStage 4 (foreachPartition at StatefulNetworkCountKryo.scala:72) failed in 0.100 s
      15/12/31 11:43:47 INFO DAGScheduler: Job 1 failed: foreachPartition at StatefulNetworkCountKryo.scala:72, took 0.823996 s
      15/12/31 11:43:47 INFO JobScheduler: Finished job streaming job 1451558610000 ms.0 from job set of time 1451558610000 ms
      15/12/31 11:43:47 INFO JobScheduler: Total delay: 17.994 s for time 1451558610000 ms (execution: 0.857 s)
      15/12/31 11:43:47 INFO JobScheduler: Starting job streaming job 1451558620000 ms.0 from job set of time 1451558620000 ms
      15/12/31 11:43:47 ERROR JobScheduler: Error running job streaming job 1451558610000 ms.0
      org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 5, localhost): java.lang.NullPointerException
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103)
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
      	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
      	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      	at org.apache.spark.scheduler.Task.run(Task.scala:89)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      
      Driver stacktrace:
      	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
      	at scala.Option.foreach(Option.scala:236)
      	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
      	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
      	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
      	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
      	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
      	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
      	at org.apache.spark.examples.streaming.StatefulNetworkWordCountKryo$$anonfun$createContext$2.apply(StatefulNetworkCountKryo.scala:72)
      	at org.apache.spark.examples.streaming.StatefulNetworkWordCountKryo$$anonfun$createContext$2.apply(StatefulNetworkCountKryo.scala:71)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
      	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
      	at scala.util.Try$.apply(Try.scala:161)
      	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
      	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
      	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
      	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
      	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:103)
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
      	at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.get(StateMap.scala:111)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:56)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
      	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
      	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
      	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:148)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      	at org.apache.spark.scheduler.Task.run(Task.scala:89)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
      	... 3 more
      

        Attachments

          Activity

            People

            • Assignee:
              zsxwing Shixiong Zhu
              Reporter:
              juyttenh Jan Uyttenhove
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: