Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23663

Spark Streaming Kafka 010 , fails with "java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 2.2.0
    • 2.4.0
    • DStreams
    • None
    • Spark 2.2.0 

      Spark streaming kafka 010

       

    Description

      test being tried:

      10 kafka topics created. Streamed with avro data from kafka producers.

      org.apache.spark.streaming.kafka010 used for creating directStream to kafka.

      A single direct stream is created for all the ten topics.

      And on each RDD(batch of 50 seconds), key of kafka consumer record is checked and seperate RDDs are created for seperate topics.

      Each topic has records with key as topic name and value of avro messages.

      Finally ten RDDs are converted to data frames and registered as separate temp tables.

      Once all the temp tables are registered, few sql queries are run on these temp tables.

       

      Exception seen:

      18/03/12 11:58:34 WARN TaskSetManager: Lost task 23.0 in stage 4.0 (TID 269, 192.168.24.145, executor 7): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
      at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
      at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:80)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.1 in stage 4.0 (TID 828, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
      18/03/12 11:58:34 INFO TaskSetManager: Lost task 23.1 in stage 4.0 (TID 828) on 192.168.24.145, executor 7: java.util.ConcurrentModificationException (KafkaConsumer is not safe for multi-threaded access) [duplicate 1]
      18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.2 in stage 4.0 (TID 829, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
      18/03/12 11:58:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 30, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed.
      at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
      at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      18/03/12 11:58:40 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 830, 192.168.24.147, executor 6, partition 0, PROCESS_LOCAL, 4758 bytes)
      18/03/12 11:58:45 WARN TaskSetManager: Lost task 0.1 in stage 4.0 (TID 296, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed.
      at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
      at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

       

      Code snippet:

      val stream = KafkaUtils.createDirectStream[Object, Object](ssc,
      PreferConsistent,
      Subscribe[Object, Object](topicsArr, kafkaParams)
      )
      val tbl = topicsArr(0).toString
      stream.foreachRDD(rdd => {
      var ca = new Array[String](0)
      var ss = new Array[String](0)
      if (!rdd.isEmpty())
      {
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._
      rdd.foreach(record =>
      {
      record.key() match

      { case "customer_address" => ca=Array.concat(ca,Array(record.value().toString)) case "store_sales" => ss=Array.concat(ss,Array(record.value().toString)) case _ => println("Invalid Key") }

      ;
      })
      //val topicValueStrings = rdd.map(record => (record.value()).toString)
      val df_ca = sqlContext.read.json(spark.sparkContext.parallelize(ca))
      val df_ss = sqlContext.read.json(spark.sparkContext.parallelize(ss))
      try

      { df_ca.registerTempTable("customer_address") df_ss.registerTempTable("store_sales") }

      catch{
      case e : Throwable =>

      { println(e.getStackTrace()) }

      }
      try

      { //spark.sql("show tables") println ("======New Batch=======") spark.sql(s"select count(1) as cnt,'customer_address' as tableName from customer_address").show() spark.sql(s"select count(1) as cnt,'store_sales' as tableName from store_sales").show() }

      catch{
      case e : Throwable =>

      { println(e.getStackTrace())}

      }

       

      Spark session is created with below confs:

      val spark = SparkSession.builder()
      .appName(appname)
      .config("hive.metastore.uris", hivemeta)
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("spark.driver.allowMultipleContexts", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.kryoserializer.buffer.mb", "64")
      .config("spark.sql.tungsten.enabled", "true")
      .config("spark.app.id", appname)
      .config("spark.speculation","false")
      .config("spark.sql.parquet.mergeSchema", "false")
      .getOrCreate()

      Note: spark.streaming.kafka.consumer.cache.enabled is not made false.

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              kaushik_srinivas kaushik srinivas
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: