Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.20.0
-
None
-
Novice
Description
When processing of messages takes longer than max session timeout, the consumer thread will end after receiving the org.apache.kafka.clients.consumer.CommitFailedException.
@Override public void run() { boolean first = true; boolean reConnect = true; while (reConnect) { // create consumer ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader(); try { // Kafka uses reflection for loading authentication settings, use its classloader Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); } finally { Thread.currentThread().setContextClassLoader(threadClassLoader); } if (!first) { // skip one poll timeout before trying again long delay = endpoint.getConfiguration().getPollTimeoutMs(); log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay); try { Thread.sleep(delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } first = false; // doRun keeps running until we either shutdown or is told to re-connect reConnect = doRun(); } }
The doRun() method returns false and the loop ends. It should be possible to let the proces continue after failed offset commit.
I think the catch block inside doRun method should look like this:
... } catch (InterruptException e) { getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); log.info("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe(); Thread.currentThread().interrupt(); } catch (org.apache.kafka.clients.consumer.CommitFailedException e) { //or even org.apache.kafka.common.KafkaException getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e); reConnect = true; } catch (Exception e) { getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e); } finally { log.debug("Closing {} ", threadId); IOHelper.close(consumer); } ...