Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17853

Kafka OffsetOutOfRangeException on DStreams union from separate Kafka clusters with identical topic names.

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersStop watchingWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0
    • 2.0.2, 2.1.0
    • DStreams
    • None

    Description

      During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException reported by Kafka client. In our scenario we create single DStream as a union of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). Both Kafka clusters have the same topics and number of partitions.

      After quick investigation, I found that class DirectKafkaInputDStream keeps offset state for topic and partitions, but it is not aware of different Kafka clusters.

      For every topic, single DStream is created as a union from all configured Kafka clusters.

      class KafkaDStreamSource(configs: Iterable[Map[String, String]]) {
      def createSource(ssc: StreamingContext, topic: String): DStream[(String, Array[Byte])] = {
          val streams = configs.map { config =>
            val kafkaParams = config
            val kafkaTopics = Set(topic)
      
            KafkaUtils.
                createDirectStream[String, Array[Byte]](
              ssc,
              LocationStrategies.PreferConsistent,
              ConsumerStrategies.Subscribe[String, Array[Byte]](kafkaTopics, kafkaParams)
            ).map { record =>
              (record.key, record.value)
            }
          }
      
          ssc.union(streams.toSeq)
        }
      }
      

      At the end, offsets from one Kafka cluster overwrite offsets from second one. Fortunately OffsetOutOfRangeException was thrown because offsets in both Kafka clusters are significantly different.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            koeninger Cody Koeninger Assign to me
            marcin.kuthan Marcin Kuthan
            Votes:
            1 Vote for this issue
            Watchers:
            5 Stop watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment