Details
Description
Hi, guys!
We faced the following deadlock:
KafkaBasedLog Work Thread - _streaming_service_config priority:5 - threadId:0x00007f18ec22c000 - nativeId:0x950 - nativeId (decimal):2384 - state:BLOCKED stackTrace: java.lang.Thread.State: BLOCKED (on object monitor) at com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586) - waiting to lock <0x00000000e6136808> (a com.company.streaming.platform.kafka.DistributedHerder) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707) - locked <0x00000000d8c3be40> (a java.lang.Object) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264) at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71) at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337) CustomDistributedHerder-connect-1 priority:5 - threadId:0x00007f1a01e30800 - nativeId:0x93a - nativeId (decimal):2362 - state:BLOCKED stackTrace: java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285) - waiting to lock <0x00000000d8c3be40> (a java.lang.Object) at com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514) - locked <0x00000000e6136808> (a com.company.streaming.platform.kafka.DistributedHerder) at com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402) at com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
DistributedHerder went to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.
Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object got SESSION_KEY record and called updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
As I can see the problem is here:
As I understand this call should be performed outside synchronized block:
if (started) updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);
I'm going to make a PR.