Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-2746

Scala Spark-Streaming with Kafka Integration program does not show output

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.7.0
    • Fix Version/s: 0.7.0
    • Component/s: Interpreters
    • Labels:
    • Environment:
    • Flags:
      Patch, Important

      Description

      I have created 8 messages using the Kafka console producer, such that when I execute the console consumer

      ./kafka-console-consumer.sh --bootstrap-server vrxhdpkfknod.eastus.cloudapp.azure.com:6667 --topic spark-streaming --from-beginning
      I get 8 messages displayed

      ^CProcessed a total of 8 messages
      When I execute the spark 2 code in Zeppelin,

      %spark2
      import org.apache.kafka.clients.consumer.ConsumerRecord
      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.spark.streaming.kafka010._
      import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
      import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      import org.apache.spark.streaming._
      sc.setLogLevel("ERROR")  // prevent INFO logging from polluting output
      val ssc =  StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, Seconds(5)))    // creating the StreamingContext with 5 seconds interval
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "vrxhdpkfknod.eastus.cloudapp.azure.com:6667",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "kafka-streaming-example",
        "auto.offset.reset" -> "earliest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      val topics = Array("spark-streaming")
      val messages = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Subscribe[String, String](topics, kafkaParams)
      )
      messages.foreachRDD { rdd =>
            System.out.println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")
            rdd.foreach { record =>
              System.out.print(record.value())
            }
          }
      ssc.start()
      

      I get

      import org.apache.kafka.clients.consumer.ConsumerRecord
      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
      import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      import org.apache.spark.streaming._
      
      ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@377213ce
      kafkaParams: scala.collection.immutable.Map[String,Object] = Map(key.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, auto.offset.reset -> earliest, group.id -> kafka-streaming-example, bootstrap.servers -> vrxhdpkfknod.eastus.cloudapp.azure.com:6667, enable.auto.commit -> false, value.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer)
      topics: Array[String] = Array(spark-streaming)
      messages: org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] = org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@328d6f9a
      

      There are no error messages. But there is no display of the Scala output. The same code when run in Spark Shell works just fine.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              jybsws Jyoti Biswas
            • Votes:
              4 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:

                Time Tracking

                Estimated:
                Original Estimate - 96h
                96h
                Remaining:
                Remaining Estimate - 96h
                96h
                Logged:
                Time Spent - Not Specified
                Not Specified