Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Invalid
-
2.1.1
-
None
Description
I am trying to use spark-streaming-kafka-0.10 to pull messages from a kafka topic(broker version 0.10). I have checked that messages are being produced and used a KafkaConsumer to pull them successfully. Now, when I try to use the spark streaming api, I am not getting anything. If I just use KafkaUtils.createRDD and specify some offset ranges manually it works. But when, I try to use createDirectStream, all the rdds are empty and when I check the partition offsets it simply reports that all partitions are 0. Here is what I tried:
val sparkConf = new SparkConf().setAppName("kafkastream") val ssc = new StreamingContext(sparkConf, Seconds(3)) val topics = Array("my_topic") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hostname:6667" "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "my_group", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (true: java.lang.Boolean) ) val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } val rddCount = rdd.count() println("rdd count: ", rddCount) // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } ssc.start() ssc.awaitTermination()
All partitions show offset ranges from 0 to 0 and all rdds are empty. I would like it to start from the beginning of a partition but also pick up everything that is being produced to it.
I have also tried using spark-streaming-kafka-0.8 and it does work. I think it is a 0.10 issue because everything else works fine. Thank you!