Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-8756

KafkaConsumer doesn't stop consuming when suspended, preventing graceful route shutdown

    XMLWordPrintableJSON

    Details

    • Estimated Complexity:
      Unknown

      Description

      Hi,

      we encountered a problem when consuming events from Kafka and stopping the route while there are still events in Kafka.
      Obviously, the run-method doesn't stop reading events from Kafka stream and so there are always events in-flight what prevents the route from being gracefully shut down.

      12:30:27.611 INFO o.a.c.impl.DefaultCamelContext - Apache Camel 2.15.2 (CamelContext: camel-1) is shutting down
      12:30:27.612 INFO o.a.c.i.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 300 seconds)
      12:30:27.615 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 300 seconds.
      12:30:28.615 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 299 seconds.
      12:30:29.615 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 298 seconds.
      12:30:30.616 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 297 seconds.
      12:30:31.616 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 296 seconds.
      12:30:32.616 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 295 seconds.
      12:30:33.618 INFO o.a.c.c.kafka.KafkaConsumer - Stopping Kafka consumer
      12:30:33.618 INFO k.c.ZookeeperConsumerConnector - [camelGroup1_localhorst-1431081021945-a3d9f455], ZKConsumerConnector shutting down
      12:30:33.626 INFO k.c.ConsumerFetcherManager - [ConsumerFetcherManager-1431081022017] Stopping leader finder thread
      12:30:33.626 INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [camelGroup1_localhorst-1431081021945-a3d9f455-leader-finder-thread], Shutting down
      12:30:33.626 INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [camelGroup1_localhorst-1431081021945-a3d9f455-leader-finder-thread], Stopped 
      12:30:33.627 INFO k.c.ConsumerFetcherManager$LeaderFinderThread - [camelGroup1_localhorst-1431081021945-a3d9f455-leader-finder-thread], Shutdown completed
      12:30:33.627 INFO k.c.ConsumerFetcherManager - [ConsumerFetcherManager-1431081022017] Stopping all fetchers
      12:30:33.627 INFO k.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-camelGroup1_localhorst-1431081021945-a3d9f455-0-0], Shutting down
      12:30:33.627 INFO k.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-camelGroup1_localhorst-1431081021945-a3d9f455-0-0], Stopped 
      12:30:33.627 INFO k.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-camelGroup1_localhorst-1431081021945-a3d9f455-0-0], Shutdown completed
      12:30:33.627 INFO k.c.ConsumerFetcherManager - [ConsumerFetcherManager-1431081022017] All connections stopped
      12:30:33.662 INFO o.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event thread.
      12:30:33.696 INFO org.apache.zookeeper.ZooKeeper - Session: 0x14d2d359d900022 closed
      12:30:33.696 INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
      12:30:33.696 INFO k.c.ZookeeperConsumerConnector - [camelGroup1_localhorst-1431081021945-a3d9f455], ZKConsumerConnector shutdown completed in 78 ms
      12:30:33.698 INFO o.a.c.i.DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: Endpoint[kafka://localhost:9092?groupId=camelGroup1&topic=mykafkatopic&zookeeperHost=localhost]
      12:30:33.699 INFO o.a.c.i.DefaultShutdownStrategy - Graceful shutdown of 1 routes completed in 6 seconds
      12:30:33.703 INFO o.a.c.impl.DefaultCamelContext - Apache Camel 2.15.2 (CamelContext: camel-1) uptime 12.510 seconds
      12:30:33.703 INFO o.a.c.impl.DefaultCamelContext - Apache Camel 2.15.2 (CamelContext: camel-1) is shutdown in 6.091 seconds
      12:30:33.717 ERROR o.a.c.p.DefaultErrorHandler - Failed delivery for (MessageId: ID-localhorst-33557-1431081020733-0-1656461 on ExchangeId: ID-localhorst-33557-1431081020733-0-1656462). Exhausted after delivery attempt: 1 caught: java.util.concurrent.RejectedExecutionException
      
      Message History
      ---------------------------------------------------------------------------------------------------------------------------------------
      RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
      [route1            ] [route1            ] [                                                                              ] [        14]
      [route1            ] [filter1           ] [filter[{SimpleKafkaToCamelExample$1$$Lambda$1/1468303011@a38d7a3}]            ] [         1]
      [route1            ] [bean1             ] [bean[SimpleKafkaToCamelExample$1$$Lambda$2/1354011814@e4ce7ae]                ] [         0]
      
      Exchange
      ---------------------------------------------------------------------------------------------------------------------------------------
      Exchange[
      	Id                  ID-localhorst-33557-1431081020733-0-1656462
      	ExchangePattern     InOnly
      	Headers             {breadcrumbId=ID-localhorst-33557-1431081020733-0-1656461, CamelRedelivered=false, CamelRedeliveryCounter=0, kafka.EXCHANGE_NAME=0, kafka.TOPIC=mykafkatopic}
      	BodyType            byte[]
      	Body                SOMESTRING
      ]
      
      Stacktrace
      ---------------------------------------------------------------------------------------------------------------------------------------
      java.util.concurrent.RejectedExecutionException: null
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:349) ~[camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191) [camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:118) ~[camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) ~[camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:58) ~[camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) ~[camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448) ~[camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191) [camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191) [camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) [camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87) [camel-core-2.15.2.jar:2.15.2]
      	at org.apache.camel.component.kafka.KafkaConsumer$AutoCommitConsumerTask.run(KafkaConsumer.java:209) [camel-kafka-2.15.2.jar:2.15.2]
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
      	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
      12:30:34.171 INFO k.c.ZookeeperConsumerConnector - [camelGroup1_localhorst-1431081021945-a3d9f455], stopping watcher executor thread for consumer camelGroup1_localhorst-1431081021945-a3d9f455
      

      Regards,

      André

        Attachments

        1. CAMEL-8756.patch
          0.9 kB
          André Stein
        2. SimpleKafkaToCamelExample.java
          2 kB
          André Stein

          Activity

            People

            • Assignee:
              davsclaus Claus Ibsen
              Reporter:
              astein André Stein
            • Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: