Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40188

Spark Direct Streaming: Read messages of a certain bytes or count in batches from Kafka is not working.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.2.1
    • None
    • DStreams
    • None
    • Spark Version: 3.2.1

      Kafka version: 3.2.0

       

    Description

      Spark Kafka consumer is unable to read messages, of a certain size or count in batches. I have tried few approaches as mentioned in Kafka docs but with no success. Here is a link to Stack Overflow where I asked the same question with no response and think this is a possible bug here. Same configuration works fine when the consumer is a java code.
      https://stackoverflow.com/questions/73398533/spark-streaming-context-kafka-consumer-read-messages-of-a-certain-byte-size-in

      Here is the consumer code which fetches data from Kafka,

      val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10))
      val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test",
      "fetch.max.bytes" -> "65536",
      "max.partition.fetch.bytes" -> "8192",
      "max.poll.records" -> "100",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "sasl.jaas.config"-> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";",
      "sasl.mechanism" -> "PLAIN",
      "security.protocol" -> "SASL_PLAINTEXT",
        )
      
      val topics = Array("test.topic"val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
      )
      
      stream.foreachRDD {
      rdd =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        println(offsetRanges.foreach(a => println(a.topic + ":" + a.partition + ":" + a.fromOffset + ":" + a.untilOffset + ":" + a.count())))
      
        val df = rdd.map(a => a.value().split(",")).toDF()
        val selectCols = columns.indices.map(i => $"value"(i))
        var newDF = df.select(selectCols: _*).toDF(columns: _*)
      
        // Some business operations here and then write to back to kafka.
        
        newDF.write
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("topic", "topic.ouput")
          .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";")
          .option("kafka.sasl.mechanism", "PLAIN")
          .option("kafka.security.protocol", "SASL_PLAINTEXT")
          .save()
      
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      
        sparkSession.catalog.clearCache()
      }
      
      streamingContext.start()
      streamingContext.awaitTermination()
      
      

      Output:

      
      test.topic:6:1345075:4163058:2817983
      test.topic:0:1339456:4144190:2804734
      test.topic:3:1354266:4189336:2835070
      test.topic:7:1353542:4186148:2832606
      test.topic:5:1355140:4189071:2833931
      test.topic:2:1351162:4173375:2822213
      test.topic:1:1352801:4184073:2831272
      test.topic:4:1348558:4166749:2818191
      ()
      test.topic:6:4163058:4163058:0
      test.topic:0:4144190:4144190:0
      test.topic:3:4189336:4189336:0
      test.topic:7:4186148:4186148:0
      test.topic:5:4189071:4189071:0
      test.topic:2:4173375:4173375:0
      test.topic:1:4184073:4184073:0
      test.topic:4:4166749:4166749:0
      

      I tried different options as followed,
      Option 1:

      Topic Partition 8
      Streaming Context 1 sec:
      "fetch.max.bytes" -> "65536", // 64 Kb
      "max.partition.fetch.bytes" -> "8192" // 8Kb
      "max.poll.records" -> "100"

      DataFrame count which it read from Kafka in the very first batch: 1200000

      Option 2:
      Partition 1
      Streaming Context 1 sec
      "fetch.max.bytes" -> "65536",
      "max.partition.fetch.bytes" -> "8192"
      "max.poll.records" -> "100"

      Kafka Lag: 126360469
      DataFrame count which it read from Kafka in the very first batch: 126360469.
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            madhav692 Madhav Madhu
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: