Details
-
Bug
-
Status: Resolved
-
Trivial
-
Resolution: Incomplete
-
2.3.1
-
None
Description
I have a issue when i'm trying to read data from kafka using spark structured streaming.
Versions : spark-core_2.11 : 2.3.1, spark-sql_2.11 : 2.3.1, spark-sql-kafka-0-10_2.11 : 2.3.1, kafka-client :0.11.0.0
The issue i am facing is that the spark job always logs auto.offset.reset = earliest even though latest option is specified in the code during startup of application .
Code to reproduce:
package com.informatica.exec import org.apache.spark.sql.SparkSession object kafkaLatestOffset { def main(s: Array[String]) { val spark = SparkSession .builder() .appName("Spark Offset basic example") .master("local[*]") .getOrCreate() val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topic1") .option("startingOffsets", "latest") .load() val query = df.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }
As mentioned in Structured streaming doc, startingOffsets need to be set for auto.offset.reset.
https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html
- auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
During runtime , kafka messages are picked from the latest offset , so functional wise it is working as expected. Only log is misleading as it logs auto.offset.reset = earliest .