Details
-
Type:
Bug
-
Status: Resolved
-
Priority:
Major
-
Resolution: Fixed
-
Affects Version/s: 2.20.0
-
Component/s: camel-kafka
-
Labels:None
-
Estimated Complexity:Novice
Description
When processing of messages takes longer than max session timeout, the consumer thread will end after receiving the org.apache.kafka.clients.consumer.CommitFailedException.
@Override
public void run() {
boolean first = true;
boolean reConnect = true;
while (reConnect) {
// create consumer
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
try {
// Kafka uses reflection for loading authentication settings, use its classloader
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
}
if (!first) {
// skip one poll timeout before trying again
long delay = endpoint.getConfiguration().getPollTimeoutMs();
log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
first = false;
// doRun keeps running until we either shutdown or is told to re-connect
reConnect = doRun();
}
}
The doRun() method returns false and the loop ends. It should be possible to let the proces continue after failed offset commit.
I think the catch block inside doRun method should look like this:
...
} catch (InterruptException e) {
getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e);
log.info("Unsubscribing {} from topic {}", threadId, topicName);
consumer.unsubscribe();
Thread.currentThread().interrupt();
} catch (org.apache.kafka.clients.consumer.CommitFailedException e) { //or even org.apache.kafka.common.KafkaException
getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
reConnect = true;
} catch (Exception e) {
getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
} finally {
log.debug("Closing {} ", threadId);
IOHelper.close(consumer);
}
...