Details
Description
This might be related to KAFKA-17263, but I've got a slightly different stacktrace.
With the official Java Kafka client library, version 3.8.0, calls to `consumer.close()` can stall indefinitely even though I've provided a zero-second timeout. I've tried spawning a separate thread which calls consumer.wakeup() after one second just to make sure the client gets woken up, and that doesn't seem to work either. Jstack indicates the thread calling consumer.close() is stuck on AbstractCoordinator.maybeLeaveGroup -> ConsumerNetworkClient.poll, which in turn is stuck on a future completion handler, which goes back into the AbstractCoordinator, which in turn hits onLeaderElected, which eventually issues ConsumerNetworkClient.poll(), and that looks to ground out in a blocking `select()` call.
It would be really nice if there were a way to reliably release client resources-threadpools, network connections, etc-that didn't do in-line network IO.
{{"jepsen worker 24" #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s tid=0x00007462200059e0 nid=0x4909a runnable [0x00007474133fd000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPoll.wait(java.base@17.0.12/Native Method)
at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@17.0.12/EPollSelectorImpl.java:118)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@17.0.12/SelectorImpl.java:129)
- locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2)
- locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSel"jepsen worker 24" #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s tid=0x00007462200059e0 nid=0x4909a runnable [0x00007474133fd000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPoll.wait(java.base@17.0.12/Native Method)
at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@17.0.12/EPollSelectorImpl.java:118)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@17.0.12/SelectorImpl.java:129) - locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2)
- locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(java.base@17.0.12/SelectorImpl.java:141)
at org.apache.kafka.common.network.Selector.select(Selector.java:878)
at org.apache.kafka.common.network.Selector.poll(Selector.java:469)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670) - locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181) - locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131) - locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown Source)
at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
at jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212)
at jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211)
at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200)
at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782)
at jepsen.client.Validate.close_BANG_(client.clj:81)
at jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69)
at jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47)
at jepsen.generator.interpreter$spawn_worker$fn_13745$fn_13746.invoke(interpreter.clj:140)
at jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123)
at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047)
at clojure.lang.AFn.call(AFn.java:18)
at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)ectorImpl)
at sun.nio.ch.SelectorImpl.select(java.base@17.0.12/SelectorImpl.java:141)
at org.apache.kafka.common.network.Selector.select(Selector.java:878)
at org.apache.kafka.common.network.Selector.poll(Selector.java:469)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670) - locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181) - locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131) - locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown Source)
at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
at jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212)
at jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211)
at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200)
at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782)
at jepsen.client.Validate.close_BANG_(client.clj:81)
at jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69)
at jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47)
at jepsen.generator.interpreter$spawn_worker$fn_13745$fn_13746.invoke(interpreter.clj:140)
at jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123)
at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047)
at clojure.lang.AFn.call(AFn.java:18)
at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)}}