Description
The following code:
stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory) .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() { public Void call ( JavaPairRDD<K,V> rdd ) throws Exception { for ( Tuple2<K,V> x: rdd.collect() ) System.out.println("# "+x._1+" "+x._2); return null; } }); stream_context.start(); stream_context.awaitTermination();
for custom (serializable) classes K and V compiles fine but gives an error
when I drop a new hadoop sequence file in the directory:
15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 1421507639000 ms
java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
at scala.Option.orElse(Option.scala:257)
The same classes K and V work fine for non-streaming Spark:
spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
also streaming works fine for TextFileInputFormat.
The issue is that class manifests are erased to object in the Java file stream constructor, but those are relied on downstream when creating the Hadoop RDD that backs each batch of the file stream.
https://github.com/apache/spark/blob/v1.2.0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala#L263
https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L753
Attachments
Issue Links
- is duplicated by
-
SPARK-3754 Spark Streaming fileSystem API is not callable from Java
- Closed
- links to