Details
Description
We've been running into ConcurrentModificationExcpetions "KafkaConsumer is not safe for multi-threaded access" with the CachedKafkaConsumer. I've been working through debugging this issue and after looking through some of the spark source code I think this is a bug.
Our set up is:
Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using Spark-Streaming-Kafka-010
spark.executor.cores 1
spark.mesos.extra.cores 1
Batch interval: 10s, window interval: 180s, and slide interval: 30s
We would see the exception when in one executor there are two task worker threads assigned the same Topic+Partition, but a different set of offsets.
They would both get the same CachedKafkaConsumer, and whichever task thread went first would seek and poll for all the records, and at the same time the second thread would try to seek to its offset but fail because it is unable to acquire the lock.
Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
Time1 E0 Task0 - Seeks and starts to poll
Time1 E0 Task1 - Attempts to seek, but fails
Here are some relevant logs:
17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing topic test-topic, partition 2 offsets 4394204414 -> 4394238058 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing topic test-topic, partition 2 offsets 4394238058 -> 4394257712 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 4394204414 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 4394238058 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: Initial fetch for spark-executor-consumer test-topic 2 4394238058 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: Seeking to test-topic-2 4394238058 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting block rdd_199_2 failed due to an exception 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block rdd_199_2 could not be removed as it was not found on disk or in memory 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in task 49.0 in stage 45.0 (TID 3201) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: Polled [test-topic-2] 8237 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested 4394204415 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested 4394204416 ...
It looks like when WindowedDStream does the getOrCompute call its computing all the sets of of offsets it needs and tries to farm out the work in parallel. So each available worker task gets each set of offsets that need to be read.
After realizing what was going on I tested four states:
- spark.executor.cores 1 and spark.mesos.extra.cores 0
- No Exceptions
- spark.executor.cores 1 and spark.mesos.extra.cores 1
- ConcurrentModificationException
- spark.executor.cores 2 and spark.mesos.extra.cores 0
- ConcurrentModificationException
- spark.executor.cores 2 and spark.mesos.extra.cores 1
- ConcurrentModificationException
Minimal set of code I was able to reproduce with:
Streaming batch interval was set to 2 seconds. This increased the rate of exceptions I saw.
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[KafkaAvroDeserializer], "value.deserializer" -> classOf[KafkaAvroDeserializer], "enable.auto.commit" -> (false: java.lang.Boolean), "group.id" -> groupId, "schema.registry.url" -> schemaRegistryUrl, "auto.offset.reset" -> offset ) val inputStream = KafkaUtils.createDirectStream[Object, Object]( ssc, PreferConsistent, Subscribe[Object, Object] (kafkaTopic, kafkaParams) ) val windowStream = inputStream.map(_.toString).window(Seconds(180), Seconds(30)) windowStream.foreachRDD{ rdd => { val filtered = rdd.filter(_.contains("idb")) filtered.foreach( message => { var i = 0 if (i == 0) { logger.info(message) i = i + 1 } } ) } }
Attachments
Issue Links
- is duplicated by
-
SPARK-20911 Unable to do windowing operation on Spark 2.1.1 and kafka 0.10.2.0
- Resolved
-
SPARK-23663 Spark Streaming Kafka 010 , fails with "java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"
- Closed
-
SPARK-22606 There may be two or more tasks in one executor will use the same kafka consumer at the same time, then it will throw an exception: "KafkaConsumer is not safe for multi-threaded access"
- Closed
- relates to
-
SPARK-19888 Seeing offsets not resetting even when reset policy is configured explicitly
- Resolved
-
SPARK-27720 ConcurrentModificationException on operating with DirectKafkaInputDStream
- Resolved
- links to