Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17853

Kafka OffsetOutOfRangeException on DStreams union from separate Kafka clusters with identical topic names.

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.0
    • Fix Version/s: 2.0.2, 2.1.0
    • Component/s: DStreams
    • Labels:
      None

      Description

      During migration from Spark 1.6 to 2.0 I observed OffsetOutOfRangeException reported by Kafka client. In our scenario we create single DStream as a union of multiple DStreams. One DStream for one Kafka cluster (multi dc solution). Both Kafka clusters have the same topics and number of partitions.

      After quick investigation, I found that class DirectKafkaInputDStream keeps offset state for topic and partitions, but it is not aware of different Kafka clusters.

      For every topic, single DStream is created as a union from all configured Kafka clusters.

      class KafkaDStreamSource(configs: Iterable[Map[String, String]]) {
      def createSource(ssc: StreamingContext, topic: String): DStream[(String, Array[Byte])] = {
          val streams = configs.map { config =>
            val kafkaParams = config
            val kafkaTopics = Set(topic)
      
            KafkaUtils.
                createDirectStream[String, Array[Byte]](
              ssc,
              LocationStrategies.PreferConsistent,
              ConsumerStrategies.Subscribe[String, Array[Byte]](kafkaTopics, kafkaParams)
            ).map { record =>
              (record.key, record.value)
            }
          }
      
          ssc.union(streams.toSeq)
        }
      }
      

      At the end, offsets from one Kafka cluster overwrite offsets from second one. Fortunately OffsetOutOfRangeException was thrown because offsets in both Kafka clusters are significantly different.

        Attachments

          Activity

            People

            • Assignee:
              cody@koeninger.org Cody Koeninger
              Reporter:
              marcin.kuthan Marcin Kuthan
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: