Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5297

JavaStreamingContext.fileStream won't work because type info isn't propagated

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.2.0
    • 1.3.0
    • DStreams
    • None

    Description

      The following code:

      stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory)
      .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
           public Void call ( JavaPairRDD<K,V> rdd ) throws Exception {
               for ( Tuple2<K,V> x: rdd.collect() )
                   System.out.println("# "+x._1+" "+x._2);
               return null;
           }
        });
      stream_context.start();
      stream_context.awaitTermination();
      

      for custom (serializable) classes K and V compiles fine but gives an error
      when I drop a new hadoop sequence file in the directory:

      15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 1421507639000 ms
      java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
      at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
      at scala.Option.getOrElse(Option.scala:120)
      at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
      at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
      at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
      at scala.collection.AbstractTraversable.map(Traversable.scala:105)
      at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
      at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
      at scala.Option.orElse(Option.scala:257)

      The same classes K and V work fine for non-streaming Spark:

      spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
      

      also streaming works fine for TextFileInputFormat.

      The issue is that class manifests are erased to object in the Java file stream constructor, but those are relied on downstream when creating the Hadoop RDD that backs each batch of the file stream.

      https://github.com/apache/spark/blob/v1.2.0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala#L263
      https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L753

      Attachments

        Issue Links

          Activity

            People

              jerryshao Saisai Shao
              fegaras Leonidas Fegaras
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: