Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-12031

KafkaConsumer stops consuming messages when exception occurs during offset commit

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.20.0
    • 2.19.5, 2.20.2, 2.21.0
    • camel-kafka
    • None
    • Novice

    Description

      When processing of messages takes longer than max session timeout, the consumer thread will end after receiving the org.apache.kafka.clients.consumer.CommitFailedException.

             @Override
              public void run() {
                  boolean first = true;
                  boolean reConnect = true;
      
                  while (reConnect) {
      
                      // create consumer
                      ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
                      try {
                          // Kafka uses reflection for loading authentication settings, use its classloader
                          Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                          this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
                      } finally {
                          Thread.currentThread().setContextClassLoader(threadClassLoader);
                      }
      
                      if (!first) {
                          // skip one poll timeout before trying again
                          long delay = endpoint.getConfiguration().getPollTimeoutMs();
                          log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay);
                          try {
                              Thread.sleep(delay);
                          } catch (InterruptedException e) {
                              Thread.currentThread().interrupt();
                          }
                      }
      
                      first = false;
      
                      // doRun keeps running until we either shutdown or is told to re-connect
                      reConnect = doRun();
                  }
              }
      

      The doRun() method returns false and the loop ends. It should be possible to let the proces continue after failed offset commit.

      I think the catch block inside doRun method should look like this:

                 ...
                  } catch (InterruptException e) {
                      getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e);
                      log.info("Unsubscribing {} from topic {}", threadId, topicName);
                      consumer.unsubscribe();
                      Thread.currentThread().interrupt();
                  } catch (org.apache.kafka.clients.consumer.CommitFailedException e) { //or even org.apache.kafka.common.KafkaException
                      getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
                      reConnect = true;
                  } catch (Exception e) {
                      getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
                  } finally {
                      log.debug("Closing {} ", threadId);
                      IOHelper.close(consumer);
                  }
                  ...
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            davsclaus Claus Ibsen
            rgala Rafał Gała
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment