Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23663

Spark Streaming Kafka 010 , fails with "java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"

Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 2.2.0
    • 2.4.0
    • DStreams
    • None
    • Spark 2.2.0 

      Spark streaming kafka 010

       

    Description

      test being tried:

      10 kafka topics created. Streamed with avro data from kafka producers.

      org.apache.spark.streaming.kafka010 used for creating directStream to kafka.

      A single direct stream is created for all the ten topics.

      And on each RDD(batch of 50 seconds), key of kafka consumer record is checked and seperate RDDs are created for seperate topics.

      Each topic has records with key as topic name and value of avro messages.

      Finally ten RDDs are converted to data frames and registered as separate temp tables.

      Once all the temp tables are registered, few sql queries are run on these temp tables.

       

      Exception seen:

      18/03/12 11:58:34 WARN TaskSetManager: Lost task 23.0 in stage 4.0 (TID 269, 192.168.24.145, executor 7): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
      at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
      at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:80)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.1 in stage 4.0 (TID 828, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
      18/03/12 11:58:34 INFO TaskSetManager: Lost task 23.1 in stage 4.0 (TID 828) on 192.168.24.145, executor 7: java.util.ConcurrentModificationException (KafkaConsumer is not safe for multi-threaded access) [duplicate 1]
      18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.2 in stage 4.0 (TID 829, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
      18/03/12 11:58:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 30, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed.
      at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
      at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      18/03/12 11:58:40 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 830, 192.168.24.147, executor 6, partition 0, PROCESS_LOCAL, 4758 bytes)
      18/03/12 11:58:45 WARN TaskSetManager: Lost task 0.1 in stage 4.0 (TID 296, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed.
      at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
      at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

       

      Code snippet:

      val stream = KafkaUtils.createDirectStream[Object, Object](ssc,
      PreferConsistent,
      Subscribe[Object, Object](topicsArr, kafkaParams)
      )
      val tbl = topicsArr(0).toString
      stream.foreachRDD(rdd => {
      var ca = new Array[String](0)
      var ss = new Array[String](0)
      if (!rdd.isEmpty())
      {
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._
      rdd.foreach(record =>
      {
      record.key() match

      { case "customer_address" => ca=Array.concat(ca,Array(record.value().toString)) case "store_sales" => ss=Array.concat(ss,Array(record.value().toString)) case _ => println("Invalid Key") }

      ;
      })
      //val topicValueStrings = rdd.map(record => (record.value()).toString)
      val df_ca = sqlContext.read.json(spark.sparkContext.parallelize(ca))
      val df_ss = sqlContext.read.json(spark.sparkContext.parallelize(ss))
      try

      { df_ca.registerTempTable("customer_address") df_ss.registerTempTable("store_sales") }

      catch{
      case e : Throwable =>

      { println(e.getStackTrace()) }

      }
      try

      { //spark.sql("show tables") println ("======New Batch=======") spark.sql(s"select count(1) as cnt,'customer_address' as tableName from customer_address").show() spark.sql(s"select count(1) as cnt,'store_sales' as tableName from store_sales").show() }

      catch{
      case e : Throwable =>

      { println(e.getStackTrace())}

      }

       

      Spark session is created with below confs:

      val spark = SparkSession.builder()
      .appName(appname)
      .config("hive.metastore.uris", hivemeta)
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .config("spark.driver.allowMultipleContexts", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.kryoserializer.buffer.mb", "64")
      .config("spark.sql.tungsten.enabled", "true")
      .config("spark.app.id", appname)
      .config("spark.speculation","false")
      .config("spark.sql.parquet.mergeSchema", "false")
      .getOrCreate()

      Note: spark.streaming.kafka.consumer.cache.enabled is not made false.

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            kaushik_srinivas kaushik srinivas
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment