Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-11215

Camel Kafka component commits offsets in case of exceptions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.18.3
    • 2.19.1, 2.20.0
    • camel-kafka
    • None
    • Unknown

    Description

      My processor in the router throws an exception but the Kafka component still commits the offsets.

      My route: (heavily redacted and modified)

      Route
      from( "kafka://blah-blah" ).routeId("MyRoute")
                      .convertBodyTo( MyData.class )
                      .process( "MyProcessor" )
                      .to( "DestinationProcessor" );
      

      The exception I get:

      Exception
              at com.mycompany.MyProcessor.process(MyProcessor.java:152)
              at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103)
              at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
              at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
              at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
              at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
              at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
              at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
              at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
              at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
              at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
              at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
              at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
              at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
              at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
              at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
              at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:140)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: java.sql.SQLException: Exception occurred while getting connection: oracle.ucp.UniversalConnectionPoolException: Cannot get Connection from Datasource: java.sql.SQLException: Listener refused the connection with the following error:
      ORA-12514, TNS:listener does not currently know of service requested in connect descriptor
      

      Here is the corresponding Kafka component code:(KafkaConsumer.java) -This part of the code does not seem to handle the exception. The exception handler simply eats up the exception and the fall through code happily commits the offsets. Is this a bug? or am I missing something?

      KafkaConsumer.java
      while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) {
                          ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);
                          for (TopicPartition partition : allRecords.partitions()) {
                              List<ConsumerRecord<Object, Object>> partitionRecords = allRecords
                                  .records(partition);
                              for (ConsumerRecord<Object, Object> record : partitionRecords) {
                                  if (LOG.isTraceEnabled()) {
                                      LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
                                  }
                                  Exchange exchange = endpoint.createKafkaExchange(record);
                                  try {
                                      processor.process(exchange);
                                  } catch (Exception e) {
                                      getExceptionHandler().handleException("Error during processing", exchange, e);
                                  }
                              }
                              // if autocommit is false
                              if (endpoint.getConfiguration().isAutoCommitEnable() != null
                                  && !endpoint.getConfiguration().isAutoCommitEnable()) {
                                  long partitionLastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                                  consumer.commitSync(Collections.singletonMap(
                                      partition, new OffsetAndMetadata(partitionLastoffset + 1)));
                              }
                          }
                      }
      

      Any insights are appreciated.

      Attachments

        Activity

          People

            davsclaus Claus Ibsen
            rogerhill01234 Roger
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: