Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17871

Source task source offset reads can block herder task cancellation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.5.0
    • None
    • connect
    • None

    Description

      In KAFKA-9051, source task offsets reading was modified to allow for in-progress read Futures to be cancelled during task shutdown. The OffsetReaderStorageImpl#offsetReadFutures uses explicitly synchronized accesses to prevent data races between task cancellation and connectors reading offsets.

      A thread executing OffsetReaderStorageImpl#offsets method can lock the Set, and then call Producer#flush inside KafkaBasedLog#flush.

      At the same time, the herder thread may try to shut down the task, time out, and call AbstractWorkerSourceTask#cancel. This cancellation attempts to lock the Set again, and must wait for the Producer#flush to complete. If the task's producer is unhealthy, this can block the herder thread indefinitely.

      See the following stacktraces:

          java.lang.Thread.State: BLOCKED (on object monitor)
               at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.close(OffsetStorageReaderImpl.java:148)
               - waiting to lock <0x00000006e6ce0748> (a java.util.HashSet)
               at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.cancel(AbstractWorkerSourceTask.java:288)
               at org.apache.kafka.connect.runtime.Worker.awaitStopTask(Worker.java:1036)
               at org.apache.kafka.connect.runtime.Worker.awaitStopTasks(Worker.java:1054)
               at org.apache.kafka.connect.runtime.Worker.stopAndAwaitTask(Worker.java:1082)
               at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$23(DistributedHerder.java:1369)
               at org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$1647/0x00007f3d01941b28.call(Unknown Source)
               at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2240)
               at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
               at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
               at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.12/Executors.java:539)
               at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
               at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)

       

          java.lang.Thread.State: WAITING (parking)
               at jdk.internal.misc.Unsafe.park(java.base@17.0.12/Native Method)
               - parking to wait for  <0x00000006e4f9d610> (a java.util.concurrent.CountDownLatch$Sync)
               at java.util.concurrent.locks.LockSupport.park(java.base@17.0.12/LockSupport.java:211)
               at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.12/AbstractQueuedSynchronizer.java:715)
               at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@17.0.12/AbstractQueuedSynchronizer.java:1047)
               at java.util.concurrent.CountDownLatch.await(java.base@17.0.12/CountDownLatch.java:230)
               at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
               at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:1075)
               at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1215)
               at org.apache.kafka.connect.util.KafkaBasedLog$$Lambda$861/0x00007f3d017dc9d8.accept(Unknown Source)
               at java.util.Optional.ifPresent(java.base@17.0.12/Optional.java:178)
               at org.apache.kafka.connect.util.KafkaBasedLog.flush(KafkaBasedLog.java:345)
               at org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:334)
               at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:295)
               at org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.lambda$getFromStore$5(ConnectorOffsetBackingStore.java:348)
               at org.apache.kafka.connect.storage.ConnectorOffsetBackingStore$$Lambda$1277/0x00007f3d0184d600.apply(Unknown Source)
               at java.util.Optional.map(java.base@17.0.12/Optional.java:260)
               at org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.getFromStore(ConnectorOffsetBackingStore.java:348)
               at org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.get(ConnectorOffsetBackingStore.java:208)
               at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
               - locked <0x00000006e6ce0748> (a java.util.HashSet)
               at io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:42)
               at io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:365)
               at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112)
               at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)
               at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)
               at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
               at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
               at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
               at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
               at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
               at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
               at org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$990/0x00007f3d01814e08.run(Unknown Source)
               at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.12/Executors.java:539)
               at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
               at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              gharris1727 Greg Harris
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: