Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Won't Fix
-
None
-
None
-
None
-
None
Description
It appears that Kafka gets in the state of deadlock when ConsumerConnector.commitOffsets(..) is executed during stop call in GetKafka. Looking at the thread dump it may be related to the fact that we are shutting down consumer when onTrigger is still executing. But It also appears that onTrigger is in the deadlock state as well.
Below are the relevant thread dump segments
"StandardProcessScheduler Thread-7" Id=115 BLOCKED on java.lang.Object@2baae51 at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333) at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324) "ConsumerFetcherThread-74993895-50e9-3962-90e3-97af1fed7294_daves-nifi-cluster-2-1459431214410-d3b5143c-0-1001" Id=25120 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23dc03a2 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:53) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:142) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:126) at scala.Option.foreach(Option.scala:236) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:126) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123) at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:123) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:122) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Number of Locked Synchronizers: 1 - java.util.concurrent.locks.ReentrantLock$NonfairSync@6b99259f "ConsumerFetcherThread-33156eec-156c-4b32-9598-d5fc3ca460ce_daves-nifi-cluster-2-1459519432175-5beddff4-0-1001" Id=35266 RUNNABLE (in native code) at sun.nio.ch.Net.poll(Native Method) at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954) - waiting on java.lang.Object@3e91ba2b at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204) - waiting on java.lang.Object@494070b0 at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) - waiting on sun.nio.ch.SocketAdaptor$SocketInputStream@3ade0aef at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) - waiting on java.lang.Object@7d49c744 at kafka.utils.CoreUtils$.read(CoreUtils.scala:192) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:78) - waiting on java.lang.Object@4eba7a24 at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) "Timer-Driven Process Thread-10" Id=92 TIMED_WAITING on java.util.concurrent.CountDownLatch$Sync@7a46b8e8 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:67) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:48) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at org.apache.nifi.processors.kafka.KafkaPublisher.processAcks(KafkaPublisher.java:152) at org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:141) at org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778) at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:295) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Number of Locked Synchronizers: 1 - java.util.concurrent.ThreadPoolExecutor$Worker@2bc3a748 "StandardProcessScheduler Thread-7" Id=115 BLOCKED on java.lang.Object@2baae51 at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333) at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324) at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:110) at org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:297) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137) at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125) at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233) at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85) at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1332) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Number of Locked Synchronizers: 1 - java.util.concurrent.ThreadPoolExecutor$Worker@36a2bc42