Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-10594

Kafka consumer stays alive when camel context is shut down

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.18.2, 2.19.0
    • Component/s: camel-kafka
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      I happened to be running some camel-kafka unit tests with the log level set to DEBUG and noticed that the KafkaConsumer is not shut down correctly.

      When the Camel Kafka consumer is stopped, it invokes shutdownNow() on the ExecutorService. But this does not guarantee any running threads will be terminated.

      This is a bit of an issue when Camel runs in a container like Karaf or WildFly because the KafkaFetchRecords thread just keeps on running for the lifetime of the JVM.

      It's simple to reproduce in a unit test:

      • Enable DEBUG log level
      • Start a Camel context with a Kafka consumer endpoint
      • Stop the camel context
      • Thread.sleep for some time (10 seconds or whatever). Then notice exception in the log output:
      07:09:44,247 DEBUG [org.apache.kafka.clients.NetworkClient] (Camel (camel-36) thread #134 - KafkaConsumer[test]) Error connecting to node 1 at localhost:9092:: java.nio.channels.ClosedByInterruptException
      	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
      	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
      	at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
      	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
      	at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:48)
      	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:645)
      	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
      	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
      	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:130)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

        Attachments

          Activity

            People

            • Assignee:
              davsclaus Claus Ibsen
              Reporter:
              jamesnetherton James Netherton
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: