Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-4196

Streaming + checkpointing + saveAsNewAPIHadoopFiles = NotSerializableException for Hadoop Configuration

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.1.0
    • 1.1.2, 1.2.0
    • DStreams
    • None

    Description

      I am reasonably sure there is some issue here in Streaming and that I'm not missing something basic, but not 100%. I went ahead and posted it as a JIRA to track, since it's come up a few times before without resolution, and right now I can't get checkpointing to work at all.

      When Spark Streaming checkpointing is enabled, I see a NotSerializableException thrown for a Hadoop Configuration object, and it seems like it is not one from my user code.

      Before I post my particular instance see http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3C1408135046777-12202.post@n3.nabble.com%3E for another occurrence.

      I was also on customer site last week debugging an identical issue with checkpointing in a Scala-based program and they also could not enable checkpointing without hitting exactly this error.

      The essence of my code is:

          final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
      
          JavaStreamingContextFactory streamingContextFactory = new
      JavaStreamingContextFactory() {
            @Override
            public JavaStreamingContext create() {
              return new JavaStreamingContext(sparkContext, new
      Duration(batchDurationMS));
            }
          };
      
            streamingContext = JavaStreamingContext.getOrCreate(
                checkpointDirString, sparkContext.hadoopConfiguration(),
      streamingContextFactory, false);
            streamingContext.checkpoint(checkpointDirString);
      

      It yields:

      2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66
      org.apache.hadoop.conf.Configuration
      - field (class "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
      name: "conf$2", type: "class org.apache.hadoop.conf.Configuration")
      - object (class
      "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
      <function2>)
      - field (class "org.apache.spark.streaming.dstream.ForEachDStream",
      name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
      type: "interface scala.Function2")
      - object (class "org.apache.spark.streaming.dstream.ForEachDStream",
      org.apache.spark.streaming.dstream.ForEachDStream@cb8016a)
      ...
      

      This looks like it's due to PairRDDFunctions, as this saveFunc seems
      to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9
      :

      def saveAsNewAPIHadoopFiles(
          prefix: String,
          suffix: String,
          keyClass: Class[_],
          valueClass: Class[_],
          outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
          conf: Configuration = new Configuration
        ) {
        val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
          val file = rddToFileName(prefix, suffix, time)
          rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass,
      outputFormatClass, conf)
        }
        self.foreachRDD(saveFunc)
      }
      

      Is that not a problem? but then I don't know how it would ever work in Spark. But then again I don't see why this is an issue and only when checkpointing is enabled.

      Long-shot, but I wonder if it is related to closure issues like https://issues.apache.org/jira/browse/SPARK-1866

      Attachments

        Activity

          People

            tdas Tathagata Das
            srowen Sean R. Owen
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: