Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.7.0
-
Patch, Important
Description
I have created 8 messages using the Kafka console producer, such that when I execute the console consumer
./kafka-console-consumer.sh --bootstrap-server vrxhdpkfknod.eastus.cloudapp.azure.com:6667 --topic spark-streaming --from-beginning
I get 8 messages displayed
^CProcessed a total of 8 messages
When I execute the spark 2 code in Zeppelin,
%spark2 import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming._ sc.setLogLevel("ERROR") // prevent INFO logging from polluting output val ssc = StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, Seconds(5))) // creating the StreamingContext with 5 seconds interval val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "vrxhdpkfknod.eastus.cloudapp.azure.com:6667", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "kafka-streaming-example", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("spark-streaming") val messages = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) messages.foreachRDD { rdd => System.out.println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records") rdd.foreach { record => System.out.print(record.value()) } } ssc.start()
I get
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming._ ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@377213ce kafkaParams: scala.collection.immutable.Map[String,Object] = Map(key.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, auto.offset.reset -> earliest, group.id -> kafka-streaming-example, bootstrap.servers -> vrxhdpkfknod.eastus.cloudapp.azure.com:6667, enable.auto.commit -> false, value.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer) topics: Array[String] = Array(spark-streaming) messages: org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] = org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@328d6f9a
There are no error messages. But there is no display of the Scala output. The same code when run in Spark Shell works just fine.