Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15932

Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")

    XMLWordPrintableJSON

Details

    Description

      Intermittently failing test for the new consumer.

      https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/

      ```Error
      org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 records. The number consumed was 0.
      Stacktrace
      org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 records. The number consumed was 0.
      at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
      at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
      at app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161)
      at app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128)
      at app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616)
      at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
      at app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
      at app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
      at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
      at app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
      at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
      at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
      at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
      at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
      at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
      at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
      at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
      at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
      at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
      at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
      at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
      at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
      at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
      at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
      at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
      at app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
      at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
      at app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
      at app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:142)
      at app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:110)
      at java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
      at java.base@11.0.16.1/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      at java.base@11.0.16.1/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      at java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
      at java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
      at java.base@11.0.16.1/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      at java.base@11.0.16.1/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
      at java.base@11.0.16.1/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      at java.base@11.0.16.1/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      at java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
      at java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
      at java.base@11.0.16.1/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
      at java.base@11.0.16.1/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
      at java.base@11.0.16.1/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      at java.base@11.0.16.1/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      at java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
      at java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
      at java.base@11.0.16.1/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      at java.base@11.0.16.1/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
      at app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:110)
      at app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:44)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
      at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
      at app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
      at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
      at java.base@11.0.16.1/java.util.ArrayList.forEach(ArrayList.java:1541)
      at app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
      at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
      at app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
      at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
      at java.base@11.0.16.1/java.util.ArrayList.forEach(ArrayList.java:1541)
      at app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
      at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
      at app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
      at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
      at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
      at app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
      at app//org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
      at app//org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
      at app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
      at app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
      at app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
      at app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
      at app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
      at app//org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
      at app//org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
      at app//org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
      at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:118)
      at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:93)
      at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:88)
      at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
      at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
      at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
      at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
      at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
      at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
      at com.sun.proxy.$Proxy2.stop(Unknown Source)
      at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
      at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
      at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
      at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
      at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
      at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
      at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
      at app//worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
      at app//worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
      Standard Output
      [2023-11-28 22:32:42,436] WARN maxCnxns is not configured, using default value 0. (org.apache.zookeeper.server.ServerCnxnFactory:309)
      [2023-11-28 22:32:42,554] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
      [2023-11-28 22:32:42,554] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
      [2023-11-28 22:32:42,658] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
      [2023-11-28 22:32:42,659] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
      [2023-11-28 22:32:42,659] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
      [2023-11-28 22:32:44,062] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1029971141, epoch=2), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:44,062] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=258532250, epoch=2), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:44,282] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Close timed out with 1 pending requests to coordinator, terminating client connections (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1123)
      [2023-11-28 22:32:44,285] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=luVxKKl9Tf2TGcoaTSC0lA, fetchOffset=3, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[2], lastFetchedEpoch=Optional[2])}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1140846877, epoch=4), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:44,285] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData=

      {topic-1=PartitionData(topicId=0CftS7C6T3GRKzHC-gaTFQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional.empty)}

      , isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1798269664, epoch=1), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 2 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs=

      {/tmp/kafka-3842282715338511459: EMPTY}

      )
      [2023-11-28 22:32:44,534] WARN [QuorumController id=1000] Performing controller activation. The metadata log appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from bootstrap source 'test harness'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController:108)
      [2023-11-28 22:32:45,499] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@56ac7b1a (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 22:32:45,499] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@1e56d84 (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 22:32:45,504] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@403f1279 (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 22:32:46,513] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=695135585, epoch=4), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:46,513] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=282103315, epoch=3), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:46,646] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:46,748] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:47,049] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:47,147] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:47,513] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:47,513] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:47,514] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:47,514] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:47,551] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:47,651] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:48,153] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:48,455] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:48,514] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:48,514] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:48,514] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:48,515] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:48,655] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:49,361] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:49,515] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:49,515] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:49,516] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:49,516] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:49,660] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:50,264] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:50,664] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:51,274] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:51,517] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:51,517] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:51,517] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:51,518] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:51,669] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:52,276] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:52,518] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:52,518] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:52,518] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:52,519] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:52,579] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:53,180] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:53,470] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:814)
      [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=1, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
      at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:54,084] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Close timed out with 1 pending requests to coordinator, terminating client connections (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1123)
      [2023-11-28 22:32:54,087] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, fetchOffset=4, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional[1])}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=359597478, epoch=18), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:32:54,416] WARN [NodeToControllerChannelManager id=1000 name=registration] Attempting to close NetworkClient that has already been closed. (org.apache.kafka.clients.NetworkClient:667)
      [2023-11-28 22:32:54,418] ERROR [QuorumController id=1000] Cancelling deferred write event maybeFenceReplicas because the event queue is now closed. (org.apache.kafka.controller.QuorumController:1297)
      metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs=

      {/tmp/kafka-6888200266425032707: EMPTY}

      )
      [2023-11-28 22:32:54,443] WARN [QuorumController id=1000] Performing controller activation. The metadata log appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from bootstrap source 'test harness'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController:108)
      [2023-11-28 22:32:55,399] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@6c76155d (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 22:32:55,400] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@1937c8e (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 22:32:55,401] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@371497a (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventCont
      ...[truncated 2797622 chars]...
      java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 22:44:22,187] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Close timed out with 1 pending requests to coordinator, terminating client connections (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1123)
      [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData=

      {tsomec-1=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, fetchOffset=1000, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=152301008, epoch=9), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={tsomec-1=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, fetchOffset=1000, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}

      , isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1438523841, epoch=9), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData=

      {tsomec-0=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, fetchOffset=1000, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=450423079, epoch=23), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 2 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={tsomec-0=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, fetchOffset=1000, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}

      , isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1790334620, epoch=23), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 2 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=hNV3D7sjRmCoKFr8M5cIDA, fetchOffset=4, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1882472249, epoch=44), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=PartitionData(topicId=hNV3D7sjRmCoKFr8M5cIDA, fetchOffset=4, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=941512031, epoch=44), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 22:44:22,542] WARN [NodeToControllerChannelManager id=1000 name=registration] Attempting to close NetworkClient that has already been closed. (org.apache.kafka.clients.NetworkClient:667)
      [2023-11-28 22:44:22,545] ERROR [QuorumController id=1000] Cancelling deferred write event maybeFenceReplicas because the event queue is now closed. (org.apache.kafka.controller.QuorumController:1297)
      [2023-11-28 23:19:48,189] WARN maxCnxns is not configured, using default value 0. (org.apache.zookeeper.server.ServerCnxnFactory:309)
      [2023-11-28 23:19:48,536] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
      [2023-11-28 23:19:48,537] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
      [2023-11-28 23:19:48,538] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
      [2023-11-28 23:19:48,538] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
      [2023-11-28 23:19:50,559] WARN [Producer clientId=ConsumerTestProducer] delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms. Setting it to 2147483647. (org.apache.kafka.clients.producer.KafkaProducer:560)
      [2023-11-28 23:19:51,807] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=958284730, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 2 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:51,807] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=121456605, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 2 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:51,807] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1490435564, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:51,808] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1567850327, epoch=6), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:51,808] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1555119328, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:51,808] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=351277526, epoch=6), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs=

      {/tmp/kafka-16453547074389008049: EMPTY}

      )
      [2023-11-28 23:19:52,093] WARN [QuorumController id=1000] Performing controller activation. The metadata log appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from bootstrap source 'test harness'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController:108)
      [2023-11-28 23:19:53,080] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@502f739a (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 23:19:53,081] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@503838ca (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 23:19:53,081] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@1439aa21 (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 23:19:53,825] WARN [Producer clientId=ConsumerTestProducer] delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms. Setting it to 2147483647. (org.apache.kafka.clients.producer.KafkaProducer:560)
      [2023-11-28 23:19:54,853] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=145239054, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:54,853] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1189011807, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:54,853] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1943161650, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:54,854] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=428781265, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:55,017] WARN [NodeToControllerChannelManager id=1000 name=registration] Attempting to close NetworkClient that has already been closed. (org.apache.kafka.clients.NetworkClient:667)
      [2023-11-28 23:19:55,019] ERROR [QuorumController id=1000] Cancelling deferred write event maybeFenceReplicas because the event queue is now closed. (org.apache.kafka.controller.QuorumController:1297)
      metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs=

      {/tmp/kafka-4036458965452388226: EMPTY}

      )
      [2023-11-28 23:19:55,050] WARN [QuorumController id=1000] Performing controller activation. The metadata log appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from bootstrap source 'test harness'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController:108)
      [2023-11-28 23:19:56,031] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@6b05363a (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 23:19:56,033] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@504c619e (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 23:19:56,033] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@74796fb4 (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 23:19:56,871] WARN [Producer clientId=ConsumerTestProducer] delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms. Setting it to 2147483647. (org.apache.kafka.clients.producer.KafkaProducer:560)
      [2023-11-28 23:19:57,912] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=513660765, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 2 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=462250570, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 2 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=49478967, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1136363756, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1394899890, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:57,914] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=820589906, epoch=5), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:19:58,216] WARN [NodeToControllerChannelManager id=1000 name=registration] Attempting to close NetworkClient that has already been closed. (org.apache.kafka.clients.NetworkClient:667)
      [2023-11-28 23:19:58,218] ERROR [QuorumController id=1000] Cancelling deferred write event maybeFenceReplicas because the event queue is now closed. (org.apache.kafka.controller.QuorumController:1297)
      metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs=

      {/tmp/kafka-2823784908060096373: EMPTY}

      )
      [2023-11-28 23:19:58,244] WARN [QuorumController id=1000] Performing controller activation. The metadata log appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from bootstrap source 'test harness'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController:108)
      [2023-11-28 23:19:59,395] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@4726f78d (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 23:19:59,396] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@66f23734 (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 23:19:59,396] ERROR Unexpected error handling org.apache.kafka.server.AssignmentsManager$DispatchEvent@423b55e3 (org.apache.kafka.server.AssignmentsManager:117)
      java.lang.IllegalStateException: Cannot enqueue a request if the request thread is not running
      at kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
      at kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
      at org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
      at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
      at java.base/java.lang.Thread.run(Thread.java:829)
      [2023-11-28 23:20:05,547] WARN [Producer clientId=ConsumerTestProducer] delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms. Setting it to 2147483647. (org.apache.kafka.clients.producer.KafkaProducer:560)
      [2023-11-28 23:20:09,432] WARN [Consumer clientId=ConsumerTestConsumer, groupId=my-test] The fetch buffer was already closed (org.apache.kafka.clients.consumer.internals.FetchBuffer:263)
      [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=365803585, epoch=14), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=1978360363, epoch=14), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=2059879385, epoch=14), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 0 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=2110384598, epoch=14), rackId=) (kafka.server.ReplicaFetcherThread:72)
      java.io.IOException: Connection to 1 was disconnected before the response was read
      at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
      at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
      at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
      at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
      at scala.Option.foreach(Option.scala:437)
      at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
      at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
      at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
      [2023-11-28 23:20:09,617] WARN [NodeToControllerChannelManager id=1000 name=registration] Attempting to close NetworkClient that has already been closed. (org.apache.kafka.clients.NetworkClient:667)
      [2023-11-28 23:20:09,619] ERROR [QuorumController id=1000] Cancelling deferred write event maybeFenceReplicas because the event queue is now closed. (org.apache.kafka.controller.QuorumController:1297)```

      Attachments

        Issue Links

          Activity

            People

              schofielaj Andrew Schofield
              schofielaj Andrew Schofield
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: