Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17886

Spark Streaming deletes checkpointed RDD then tries to load it after restart

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.5.1
    • None
    • DStreams
    • None

    Description

      The issue is that the Spark driver checkpoints an RDD, deletes it, the job restarts, and the new driver tries to load the deleted checkpoint RDD.

      The application is run in YARN, which attempts to restart the application a number of times (100 in our case), all of which fail due to missing the deleted RDD.

      Here is a Splunk log which shows the inconsistency in checkpoint behaviour:

      2016-10-09 02:48:43,533 [streaming-job-executor-0] INFO org.apache.spark.rdd.ReliableRDDCheckpointData - Done checkpointing RDD 73847 to hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/rdd-73847, new parent is RDD 73872
      host = ip-10-1-1-13.ec2.internal
      2016-10-09 02:53:14,696 [JobGenerator] INFO org.apache.spark.streaming.dstream.DStreamCheckpointData - Deleted checkpoint file 'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/rdd-73847' for time 1475981310000 ms
      host = ip-10-1-1-13.ec2.internal
      Job restarts here, notice driver host change from ip-10-1-1-13.ec2.internal to ip-10-1-1-25.ec2.internal.
      2016-10-09 02:53:30,175 [Driver] INFO org.apache.spark.streaming.dstream.DStreamCheckpointData - Restoring checkpointed RDD for time 1475981310000 ms from file 'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/rdd-73847'
      host = ip-10-1-1-25.ec2.internal
      2016-10-09 02:53:30,491 [Driver] ERROR org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/rdd-73847
      java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/rdd-73847
      host = ip-10-1-1-25.ec2.internal

      Spark streaming is configured with a microbatch interval of 30 seconds, checkpoint interval of 120 seconds, and cleaner.ttl of 28800 (8 hours), but as far as I can tell, this TTL only affects metadata cleanup interval. RDDs seem to be deleted every 4-5 minutes after being checkpointed.

      Running on top of Spark 1.5.1.

      There are at least two possible issues here:

      • In case of a driver restart the new driver tries to load checkpointed RDDs which the previous driver had just deleted;
      • Spark loads stale checkpointed data - the logs show that the deleted RDD was initially checkpointed 4 minutes and 31 seconds before deletion, and 4 minutes and 47 seconds before the new driver tries to load it. Given the fact the checkpointing interval is 120 seconds, it makes no sense to load data older than that.

      P.S. Looking at the source code with the event loop that handles checkpoint updates and cleanup, nothing seems to have changed in more recent versions of Spark, so the bug is likely present in 2.0.1 as well.

      Attachments

        Activity

          People

            Unassigned Unassigned
            cosminci Cosmin Ciobanu
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: