Details
-
Type:
Bug
-
Status: Resolved
-
Priority:
Major
-
Resolution: Fixed
-
Affects Version/s: 2.18.3
-
Component/s: camel-kafka
-
Labels:None
-
Estimated Complexity:Unknown
Description
My processor in the router throws an exception but the Kafka component still commits the offsets.
My route: (heavily redacted and modified)
Route
from( "kafka://blah-blah" ).routeId("MyRoute") .convertBodyTo( MyData.class ) .process( "MyProcessor" ) .to( "DestinationProcessor" );
The exception I get:
Exception
at com.mycompany.MyProcessor.process(MyProcessor.java:152)
at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103)
at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Exception occurred while getting connection: oracle.ucp.UniversalConnectionPoolException: Cannot get Connection from Datasource: java.sql.SQLException: Listener refused the connection with the following error:
ORA-12514, TNS:listener does not currently know of service requested in connect descriptor
Here is the corresponding Kafka component code:(KafkaConsumer.java) -This part of the code does not seem to handle the exception. The exception handler simply eats up the exception and the fall through code happily commits the offsets. Is this a bug? or am I missing something?
KafkaConsumer.java
while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs); for (TopicPartition partition : allRecords.partitions()) { List<ConsumerRecord<Object, Object>> partitionRecords = allRecords .records(partition); for (ConsumerRecord<Object, Object> record : partitionRecords) { if (LOG.isTraceEnabled()) { LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value()); } Exchange exchange = endpoint.createKafkaExchange(record); try { processor.process(exchange); } catch (Exception e) { getExceptionHandler().handleException("Error during processing", exchange, e); } } // if autocommit is false if (endpoint.getConfiguration().isAutoCommitEnable() != null && !endpoint.getConfiguration().isAutoCommitEnable()) { long partitionLastoffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap( partition, new OffsetAndMetadata(partitionLastoffset + 1))); } } }
Any insights are appreciated.