Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17766

Write ahead log exception on a toy project

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Duplicate
    • Affects Version/s: 2.0.0
    • Fix Version/s: None
    • Component/s: DStreams
    • Labels:
      None

      Description

      Write ahead log seems to get corrupted when the application is stopped abruptly (Ctrl-C, or kill). Then, the application refuses to run due to this exception:

      2016-10-03 08:03:32,321 ERROR [Executor task launch worker-1] executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
      com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
      ...skipping...
              at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
              at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
              at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
              at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
              at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
              at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
              at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
              at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
              at org.apache.spark.scheduler.Task.run(Task.scala:85)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      

      Code:

      import org.apache.hadoop.conf.Configuration
      import org.apache.spark._
      import org.apache.spark.streaming._
      
      object ProtoDemo {
        def createContext(dirName: String) = {
          val conf = new SparkConf().setAppName("mything").setMaster("local[4]")
          conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
          /*
          conf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
          conf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
          */
      
          val ssc = new StreamingContext(conf, Seconds(1))
          ssc.checkpoint(dirName)
          val lines = ssc.socketTextStream("127.0.0.1", 9999)
          val words = lines.flatMap(_.split(" "))
          val pairs = words.map(word => (word, 1))
          val wordCounts = pairs.reduceByKey(_ + _)
          val runningCounts = wordCounts.updateStateByKey[Int] {
            (values: Seq[Int], oldValue: Option[Int]) =>
              val s = values.sum
              Some(oldValue.fold(s)(_ + s))
            }
      
        // Print the first ten elements of each RDD generated in this DStream to the console
          runningCounts.print()
          ssc
        }
      
        def main(args: Array[String]) = {
          val hadoopConf = new Configuration()
          val dirName = "/tmp/chkp"
          val ssc = StreamingContext.getOrCreate(dirName, () => createContext(dirName), hadoopConf)
          ssc.start()
          ssc.awaitTermination()
        }
      }
      

      Steps to reproduce:
      1. I put the code in a repository: git clone https://github.com/thesamet/spark-issue
      2. in one terminal: {{ while true; do nc -l localhost 9999; done}}
      3. Start a new terminal
      4. Run "sbt run".
      5. Type a few lines in the netcat terminal.
      6. Kill the streaming project (Ctrl-C),
      7. Go back to step 4 until you see the exception above.

      I tried the above with local filesystem and also with S3, and getting the same result.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                thesamet Nadav Samet
              • Votes:
                1 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: