Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.5.0
-
None
-
None
Description
In KAFKA-9051, source task offsets reading was modified to allow for in-progress read Futures to be cancelled during task shutdown. The OffsetReaderStorageImpl#offsetReadFutures uses explicitly synchronized accesses to prevent data races between task cancellation and connectors reading offsets.
A thread executing OffsetReaderStorageImpl#offsets method can lock the Set, and then call Producer#flush inside KafkaBasedLog#flush.
At the same time, the herder thread may try to shut down the task, time out, and call AbstractWorkerSourceTask#cancel. This cancellation attempts to lock the Set again, and must wait for the Producer#flush to complete. If the task's producer is unhealthy, this can block the herder thread indefinitely.
See the following stacktraces:
java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.close(OffsetStorageReaderImpl.java:148) - waiting to lock <0x00000006e6ce0748> (a java.util.HashSet) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.cancel(AbstractWorkerSourceTask.java:288) at org.apache.kafka.connect.runtime.Worker.awaitStopTask(Worker.java:1036) at org.apache.kafka.connect.runtime.Worker.awaitStopTasks(Worker.java:1054) at org.apache.kafka.connect.runtime.Worker.stopAndAwaitTask(Worker.java:1082) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$23(DistributedHerder.java:1369) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$1647/0x00007f3d01941b28.call(Unknown Source) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2240) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371) at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.12/Executors.java:539) at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635) at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)
java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@17.0.12/Native Method) - parking to wait for <0x00000006e4f9d610> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(java.base@17.0.12/LockSupport.java:211) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.12/AbstractQueuedSynchronizer.java:715) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@17.0.12/AbstractQueuedSynchronizer.java:1047) at java.util.concurrent.CountDownLatch.await(java.base@17.0.12/CountDownLatch.java:230) at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:1075) at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1215) at org.apache.kafka.connect.util.KafkaBasedLog$$Lambda$861/0x00007f3d017dc9d8.accept(Unknown Source) at java.util.Optional.ifPresent(java.base@17.0.12/Optional.java:178) at org.apache.kafka.connect.util.KafkaBasedLog.flush(KafkaBasedLog.java:345) at org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:334) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:295) at org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.lambda$getFromStore$5(ConnectorOffsetBackingStore.java:348) at org.apache.kafka.connect.storage.ConnectorOffsetBackingStore$$Lambda$1277/0x00007f3d0184d600.apply(Unknown Source) at java.util.Optional.map(java.base@17.0.12/Optional.java:260) at org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.getFromStore(ConnectorOffsetBackingStore.java:348) at org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.get(ConnectorOffsetBackingStore.java:208) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96) - locked <0x00000006e6ce0748> (a java.util.HashSet) at io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:42) at io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:365) at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112) at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251) at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) at org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$990/0x00007f3d01814e08.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.12/Executors.java:539) at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635) at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)
Attachments
Issue Links
- is related to
-
KAFKA-9051 Source task source offset reads can block graceful shutdown
- Resolved
- links to