Description
When running EosIntegrationTest.shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled leaks streams producers with the KAFKA-15845 leak testing extension, I observed that this test appears to consistently leak StreamsProducers. The producer is instantiated here:
This test contains a resource leak. Close the resources, or open a KAFKA ticket and annotate this class with @LeakTestingExtension.IgnoreAll("KAFKA-XYZ") org.opentest4j.AssertionFailedError: This test contains a resource leak. Close the resources, or open a KAFKA ticket and annotate this class with @LeakTestingExtension.IgnoreAll("KAFKA-XYZ") at org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:98) at org.apache.kafka.common.network.LeakTestingExtension$All.afterAll(LeakTestingExtension.java:123) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) Caused by: org.opentest4j.AssertionFailedError: Leak check failed at org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:89) at org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:96) ... 2 more Suppressed: org.opentest4j.AssertionFailedError: AbstractSelector instances left open at org.apache.kafka.common.utils.PredicateLeakTester.lambda$start$0(PredicateLeakTester.java:94) at org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:86) ... 3 more Suppressed: java.lang.Exception: Opened sun.nio.ch.KQueueSelectorImpl at org.apache.kafka.common.utils.PredicateLeakTester.open(PredicateLeakTester.java:63) at org.apache.kafka.common.network.NetworkContextLeakTester$RecordingSelectorProvider.openSelector(NetworkContextLeakTester.java:135) at org.apache.kafka.common.network.TestNetworkContext$SelectorProviderDecorator.openSelector(TestNetworkContext.java:166) at org.apache.kafka.common.network.Selector.<init>(Selector.java:160) at org.apache.kafka.common.network.Selector.<init>(Selector.java:213) at org.apache.kafka.common.network.Selector.<init>(Selector.java:225) at org.apache.kafka.common.network.Selector.<init>(Selector.java:229) at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:225) at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:163) at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:526) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:465) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297) at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39) at org.apache.kafka.streams.processor.internals.StreamsProducer.<init>(StreamsProducer.java:142) at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createRecordCollector(ActiveTaskCreator.java:196) at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTask(ActiveTaskCreator.java:265) at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createTasks(ActiveTaskCreator.java:176) at org.apache.kafka.streams.processor.internals.TaskManager.createNewTasks(TaskManager.java:441) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:390) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:327) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:416) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:504) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:415) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:511) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.updateAssignmentMetadataIfNeeded(LegacyKafkaConsumer.java:653) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:612) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:592) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1276) at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1224) at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:957) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:712) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
and appears to remain unclosed even after kafkaStreams.close() and CLUSTER.stop() are called.
I've seen this appear in other suites, I just singled out this one test/suite arbitrarily.
Attachments
Issue Links
- Discovered while testing
-
KAFKA-15845 Add Junit5 test extension which detects leaked Kafka clients and servers
- In Progress
- links to