Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25810

Spark structured streaming logs auto.offset.reset=earliest even though startingOffsets is set to latest

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Trivial
    • Resolution: Incomplete
    • 2.3.1
    • None
    • Structured Streaming

    Description

      I have a  issue when i'm trying to read data from kafka using spark structured streaming. 

      Versions : spark-core_2.11 : 2.3.1, spark-sql_2.11 : 2.3.1, spark-sql-kafka-0-10_2.11 : 2.3.1, kafka-client :0.11.0.0

      The issue i am facing is that the spark job always logs auto.offset.reset = earliest  even though latest option is specified in the code during startup of application .

      Code to reproduce: 

      package com.informatica.exec
      import org.apache.spark.sql.SparkSession
      object kafkaLatestOffset {
       def main(s: Array[String]) {
      
       val spark = SparkSession
       .builder()
       .appName("Spark Offset basic example")
       .master("local[*]")
       .getOrCreate()
       val df = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:9092")
       .option("subscribe", "topic1")
       .option("startingOffsets", "latest")
       .load()
       val query = df.writeStream
       .outputMode("complete")
       .format("console")
       .start()
      
       query.awaitTermination()
       }
      }
      

       

      As mentioned in Structured streaming doc, startingOffsets  need to be set for auto.offset.reset.

      https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html

      • auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.

      During runtime , kafka messages are picked from the latest offset , so functional wise it is working as expected. Only log is misleading as it logs  auto.offset.reset = earliest .

      Attachments

        Activity

          People

            Unassigned Unassigned
            abanthiy ANUJA BANTHIYA
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: