Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17203

StreamThread leaking producer instances

    XMLWordPrintableJSON

Details

    • Test
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.9.0
    • 4.0.0
    • streams

    Description

      When running EosIntegrationTest.shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled leaks streams producers with the KAFKA-15845 leak testing extension, I observed that this test appears to consistently leak StreamsProducers. The producer is instantiated here:

      This test contains a resource leak. Close the resources, or open a KAFKA ticket and annotate this class with @LeakTestingExtension.IgnoreAll("KAFKA-XYZ")
      org.opentest4j.AssertionFailedError: This test contains a resource leak. Close the resources, or open a KAFKA ticket and annotate this class with @LeakTestingExtension.IgnoreAll("KAFKA-XYZ")
          at org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:98)
          at org.apache.kafka.common.network.LeakTestingExtension$All.afterAll(LeakTestingExtension.java:123)
          at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
      Caused by: org.opentest4j.AssertionFailedError: Leak check failed
          at org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:89)
          at org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:96)
          ... 2 more
          Suppressed: org.opentest4j.AssertionFailedError: AbstractSelector instances left open
              at org.apache.kafka.common.utils.PredicateLeakTester.lambda$start$0(PredicateLeakTester.java:94)
              at org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:86)
              ... 3 more
              Suppressed: java.lang.Exception: Opened sun.nio.ch.KQueueSelectorImpl
                  at org.apache.kafka.common.utils.PredicateLeakTester.open(PredicateLeakTester.java:63)
                  at org.apache.kafka.common.network.NetworkContextLeakTester$RecordingSelectorProvider.openSelector(NetworkContextLeakTester.java:135)
                  at org.apache.kafka.common.network.TestNetworkContext$SelectorProviderDecorator.openSelector(TestNetworkContext.java:166)
                  at org.apache.kafka.common.network.Selector.<init>(Selector.java:160)
                  at org.apache.kafka.common.network.Selector.<init>(Selector.java:213)
                  at org.apache.kafka.common.network.Selector.<init>(Selector.java:225)
                  at org.apache.kafka.common.network.Selector.<init>(Selector.java:229)
                  at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:225)
                  at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:163)
                  at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:526)
                  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:465)
                  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)
                  at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39)
                  at org.apache.kafka.streams.processor.internals.StreamsProducer.<init>(StreamsProducer.java:142)
                  at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createRecordCollector(ActiveTaskCreator.java:196)
                  at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTask(ActiveTaskCreator.java:265)
                  at org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createTasks(ActiveTaskCreator.java:176)
                  at org.apache.kafka.streams.processor.internals.TaskManager.createNewTasks(TaskManager.java:441)
                  at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:390)
                  at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559)
                  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:327)
                  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:416)
                  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:504)
                  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:415)
                  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:511)
                  at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.updateAssignmentMetadataIfNeeded(LegacyKafkaConsumer.java:653)
                  at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:612)
                  at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:592)
                  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
                  at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1276)
                  at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1224)
                  at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:957)
                  at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:712)
                  at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

      and appears to remain unclosed even after kafkaStreams.close() and CLUSTER.stop() are called.

      I've seen this appear in other suites, I just singled out this one test/suite arbitrarily.

      Attachments

        Issue Links

          Activity

            People

              yangpoan PoAn Yang
              gharris1727 Greg Harris
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: