Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.3.1
-
None
Description
We found the offsetLog recorded an empty offset `{}` for the KafkaSource:
It occurs only once but this empty offset will cause the data duplication.
Root Cause:
The root cause is that Kafka consumer may get empty partitions in extreme cases like getting partitions while Kafka cluster is reassigning partitions.
// org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer private def partitionsAssignedToConsumer( body: ju.Set[TopicPartition] => Map[TopicPartition, Long], fetchingEarliestOffset: Boolean = false) : Map[TopicPartition, Long] = uninterruptibleThreadRunner.runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() // partitions may be empty if (!fetchingEarliestOffset) { // Call `position` to wait until the potential offset request triggered by `poll(0)` is // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by // `poll(0)` may reset offsets that should have been set by another request. partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) } consumer.pause(partitions) logDebug(s"Partitions assigned to consumer: $partitions.") body(partitions) } }
Solution:
Add offset filter for latestOffset.