Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23636

[SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.1.1, 2.2.0
    • Fix Version/s: 2.4.0
    • Component/s: DStreams
    • Labels:

      Description

       

      Summary

       

      While using the KafkaUtils.createRDD API - we receive below listed error, specifically when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges)

       

      I've tagged this issue to "Structured Streaming" - as I could not find a more appropriate component 

       


      Error Faced

      java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

       Stack Trace

      Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
      at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629)
      at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528)
      at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:204)
      at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

       


      Config Used to simulate the error

      A session with : 

      • Executors - 1
      • Cores - 2 or More
      • Kafka Topic - has only 1 partition
      • While fetching - More than one Array of Offset Range , Example 
      Array(OffsetRange("kafka_topic",0,608954201,608954202),
      OffsetRange("kafka_topic",0,608954202,608954203)
      )

       


      Was this approach working before?

       

      This was working in spark 1.6.2

      However, from spark 2.1 onwards - the approach throws exception

       


      Why are we fetching from kafka as mentioned above.

       

      This gives us the capability to establish a connection to Kafka Broker for every spark executor's core, thus each core can fetch/process its own set of messages based on the specified (offset ranges).

       

       


      Sample Code

       

      scala snippet - on versions spark 2.2.0 or 2.1.0

      // Bunch of imports

      import kafka.serializer.{DefaultDecoder, StringDecoder}
      import org.apache.avro.generic.GenericRecord
      import org.apache.kafka.clients.consumer.ConsumerRecord
      import org.apache.kafka.common.serialization._
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.{DataFrame, Row, SQLContext}
      import org.apache.spark.sql.Row
      import org.apache.spark.sql.hive.HiveContext
      import org.apache.spark.sql.types.{StringType, StructField, StructType}
      import org.apache.spark.streaming.kafka010._
      import org.apache.spark.streaming.kafka010.KafkaUtils._

      // This forces two connections - from a single executor - to topic-partition <kafka_topic-0>.

      // And with 2 cores assigned to 1 executor : each core has a task - pulling respective offsets : OffsetRange("kafka_topic",0,1,2) & OffsetRange("kafka_topic",0,2,3)

      val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records
      OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records
      )

       

      // Initiate kafka properties

      val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap()

      // kafkaParams1.put("key","val") add all the parameters such as broker, topic.... Not listing every property here.

       

      // Create RDD

      val rDDConsumerRec: RDD[ConsumerRecord[String, String]] =
      createRDD[String, String](sparkContext
      , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent)

       

      // Map Function

      val data: RDD[Row] = rDDConsumerRec.map { x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) }

       

      // Create a DataFrame

      val df = sqlContext.createDataFrame(data, StructType(
      Seq(
      StructField("topic", StringType),
      StructField("partition", StringType),
      StructField("offset", StringType),
      StructField("timestamp", StringType),
      StructField("value", BinaryType)
      )))

       

      df.show() //  You will see the error reported.

       


       

      Similar Issue reported earlier, but on a different API

       

      A similar issue reported for DirectStream is 

      https://issues.apache.org/jira/browse/SPARK-19185

       

       

       


      What is the impact - if a fix is not available for this problem?

       

       

      We have a lot of Spark Applications that are running in production, making parallel connections to the 1 topic-partition from each spark-executor: so parallelism is directly proportional to the num-cores in each executor.

      With spark 2.1 onwards : we are not allowed to make concurrent connections from 1 executor to 1 topic-partition. Only workaround is to start our applications with executor-cores = 1, with dynamic resource allocation enabled.

      With above configuration - for every offset range we ask kafka - a new executor is spawned to run the fetch task.

      Downside of Workaround -

      Above approach is not allowing us to leverage more than 1 spark-core per spark-executor.

      And asking for an executor - for each offset range - is costly : in terms of scheduling and allocation.

       

       

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              mcdeepak Deepak
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: