Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-41375

Avoid empty latest KafkaSourceOffset

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.1
    • 3.3.2, 3.4.0
    • Structured Streaming
    • None

    Description

      We found the offsetLog recorded an empty offset `{}` for the KafkaSource:


      It occurs only once but this empty offset will cause the data duplication.

      Root Cause:

      The root cause is that Kafka consumer may get empty partitions in extreme cases like getting partitions while Kafka cluster is reassigning partitions.

      // org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer
        private def partitionsAssignedToConsumer(
            body: ju.Set[TopicPartition] => Map[TopicPartition, Long],
            fetchingEarliestOffset: Boolean = false)
          : Map[TopicPartition, Long] = uninterruptibleThreadRunner.runUninterruptibly {
      
          withRetriesWithoutInterrupt {
            // Poll to get the latest assigned partitions
            consumer.poll(0)
            val partitions = consumer.assignment() // partitions may be empty
      
            if (!fetchingEarliestOffset) {
              // Call `position` to wait until the potential offset request triggered by `poll(0)` is
              // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
              // `poll(0)` may reset offsets that should have been set by another request.
              partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
            }
      
            consumer.pause(partitions)
            logDebug(s"Partitions assigned to consumer: $partitions.")
            body(partitions)
          }
        }
      

      Solution:
      Add offset filter for latestOffset.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            wechar Wechar
            wechar Wechar
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment