Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10426

Deadlock in KafkaConfigBackingStore

    XMLWordPrintableJSON

Details

    Description

      Hi, guys!

      We faced the following deadlock:

       

      KafkaBasedLog Work Thread - _streaming_service_config
      priority:5 - threadId:0x00007f18ec22c000 - nativeId:0x950 - nativeId (decimal):2384 - state:BLOCKED
      stackTrace:
      java.lang.Thread.State: BLOCKED (on object monitor)
      at com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586)
      - waiting to lock <0x00000000e6136808> (a com.company.streaming.platform.kafka.DistributedHerder)
      at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707)
      - locked <0x00000000d8c3be40> (a java.lang.Object)
      at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
      at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
      at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71)
      at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337)
      
      
      
      CustomDistributedHerder-connect-1
      priority:5 - threadId:0x00007f1a01e30800 - nativeId:0x93a - nativeId (decimal):2362 - state:BLOCKED
      stackTrace:
      java.lang.Thread.State: BLOCKED (on object monitor)
      at org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285)
      - waiting to lock <0x00000000d8c3be40> (a java.lang.Object)
      at com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514)
      - locked <0x00000000e6136808> (a com.company.streaming.platform.kafka.DistributedHerder)
      at com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402)
      at com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)

      DistributedHerder went to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.

       

      Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object got SESSION_KEY record and called updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.

       

      As I can see the problem is here:

      https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737

       

      As I understand this call should be performed outside synchronized block:

      if (started)
         updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

       

      I'm going to make a PR.

       

      Attachments

        Activity

          People

            xakassi Goltseva Taisiia
            xakassi Goltseva Taisiia
            Chris Egerton Chris Egerton
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: