Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17734

KafkaConsumer.close(0) can block indefinitely in ConsumerNetworkClient.poll

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.8.0
    • None
    • clients, consumer
    • None
    • openjdk version "17.0.12" 2024-07-16
      OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu222.04)
      OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu222.04, mixed mode, sharing)

    Description

      This might be related to KAFKA-17263, but I've got a slightly different stacktrace.

      With the official Java Kafka client library, version 3.8.0, calls to `consumer.close()` can stall indefinitely even though I've provided a zero-second timeout. I've tried spawning a separate thread which calls consumer.wakeup() after one second just to make sure the client gets woken up, and that doesn't seem to work either. Jstack indicates the thread calling consumer.close() is stuck on AbstractCoordinator.maybeLeaveGroup -> ConsumerNetworkClient.poll, which in turn is stuck on a future completion handler, which goes back into the AbstractCoordinator, which in turn hits onLeaderElected, which eventually issues ConsumerNetworkClient.poll(), and that looks to ground out in a blocking `select()` call.

      It would be really nice if there were a way to reliably release client resources-threadpools, network connections, etc-that didn't do in-line network IO.

      {{"jepsen worker 24" #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s tid=0x00007462200059e0 nid=0x4909a runnable [0x00007474133fd000]
      java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.EPoll.wait(java.base@17.0.12/Native Method)
      at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@17.0.12/EPollSelectorImpl.java:118)
      at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@17.0.12/SelectorImpl.java:129)

      • locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2)
      • locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSel"jepsen worker 24" #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s tid=0x00007462200059e0 nid=0x4909a runnable [0x00007474133fd000]
        java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPoll.wait(java.base@17.0.12/Native Method)
        at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@17.0.12/EPollSelectorImpl.java:118)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@17.0.12/SelectorImpl.java:129)
      • locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2)
      • locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(java.base@17.0.12/SelectorImpl.java:141)
        at org.apache.kafka.common.network.Selector.select(Selector.java:878)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:469)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670)
      • locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181)
      • locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131)
      • locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986)
        at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135)
        at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown Source)
        at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042)
        at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135)
        at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
        at jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212)
        at jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211)
        at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200)
        at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782)
        at jepsen.client.Validate.close_BANG_(client.clj:81)
        at jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69)
        at jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47)
        at jepsen.generator.interpreter$spawn_worker$fn_13745$fn_13746.invoke(interpreter.clj:140)
        at jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123)
        at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047)
        at clojure.lang.AFn.call(AFn.java:18)
        at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)ectorImpl)
        at sun.nio.ch.SelectorImpl.select(java.base@17.0.12/SelectorImpl.java:141)
        at org.apache.kafka.common.network.Selector.select(Selector.java:878)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:469)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670)
      • locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181)
      • locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131)
      • locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986)
        at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135)
        at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown Source)
        at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042)
        at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135)
        at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
        at jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212)
        at jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211)
        at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200)
        at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782)
        at jepsen.client.Validate.close_BANG_(client.clj:81)
        at jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69)
        at jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47)
        at jepsen.generator.interpreter$spawn_worker$fn_13745$fn_13746.invoke(interpreter.clj:140)
        at jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123)
        at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047)
        at clojure.lang.AFn.call(AFn.java:18)
        at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)}}

      Attachments

        Activity

          People

            Unassigned Unassigned
            aphyr Kyle Kingsbury
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: