Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-1722

Intermittent deadlocks in Kafka API when when stopping GetKafka

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • None
    • None
    • None
    • None

    Description

      It appears that Kafka gets in the state of deadlock when ConsumerConnector.commitOffsets(..) is executed during stop call in GetKafka. Looking at the thread dump it may be related to the fact that we are shutting down consumer when onTrigger is still executing. But It also appears that onTrigger is in the deadlock state as well.
      Below are the relevant thread dump segments

      "StandardProcessScheduler Thread-7" Id=115 BLOCKED  on java.lang.Object@2baae51
      	at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
      	at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
      
      
      "ConsumerFetcherThread-74993895-50e9-3962-90e3-97af1fed7294_daves-nifi-cluster-2-1459431214410-d3b5143c-0-1001" Id=25120 WAITING  on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23dc03a2
      	at sun.misc.Unsafe.park(Native Method)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
      	at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
      	at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
      	at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:53)
      	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:142)
      	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:126)
      	at scala.Option.foreach(Option.scala:236)
      	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:126)
      	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123)
      	at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
      	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:123)
      	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
      	at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
      	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
      	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:122)
      	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
      	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
      	Number of Locked Synchronizers: 1
      	- java.util.concurrent.locks.ReentrantLock$NonfairSync@6b99259f
      
      
      "ConsumerFetcherThread-33156eec-156c-4b32-9598-d5fc3ca460ce_daves-nifi-cluster-2-1459519432175-5beddff4-0-1001" Id=35266 RUNNABLE  (in native code)
      	at sun.nio.ch.Net.poll(Native Method)
      	at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
      	- waiting on java.lang.Object@3e91ba2b
      	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
      	- waiting on java.lang.Object@494070b0
      	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
      	- waiting on sun.nio.ch.SocketAdaptor$SocketInputStream@3ade0aef
      	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
      	- waiting on java.lang.Object@7d49c744
      	at kafka.utils.CoreUtils$.read(CoreUtils.scala:192)
      	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
      	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
      	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
      	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
      	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
      	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:78)
      	- waiting on java.lang.Object@4eba7a24
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
      	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
      	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
      	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
      	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
      	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
      
      "Timer-Driven Process Thread-10" Id=92 TIMED_WAITING  on java.util.concurrent.CountDownLatch$Sync@7a46b8e8
      	at sun.misc.Unsafe.park(Native Method)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
      	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
      	at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:67)
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:48)
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
      	at org.apache.nifi.processors.kafka.KafkaPublisher.processAcks(KafkaPublisher.java:152)
      	at org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:141)
      	at org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299)
      	at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
      	at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
      	at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:295)
      	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
      	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
      	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
      	at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
      	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      	Number of Locked Synchronizers: 1
      	- java.util.concurrent.ThreadPoolExecutor$Worker@2bc3a748
      
      "StandardProcessScheduler Thread-7" Id=115 BLOCKED  on java.lang.Object@2baae51
      	at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
      	at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
      	at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:110)
      	at org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:297)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
      	at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
      	at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
      	at org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
      	at org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1332)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      	Number of Locked Synchronizers: 1
      	- java.util.concurrent.ThreadPoolExecutor$Worker@36a2bc42
      

      Attachments

        Activity

          People

            ozhurakousky Oleg Zhurakousky
            ozhurakousky Oleg Zhurakousky
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: