Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-12031

KafkaConsumer stops consuming messages when exception occurs during offset commit

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.20.0
    • Fix Version/s: 2.19.5, 2.20.2, 2.21.0
    • Component/s: camel-kafka
    • Labels:
      None
    • Estimated Complexity:
      Novice

      Description

      When processing of messages takes longer than max session timeout, the consumer thread will end after receiving the org.apache.kafka.clients.consumer.CommitFailedException.

             @Override
              public void run() {
                  boolean first = true;
                  boolean reConnect = true;
      
                  while (reConnect) {
      
                      // create consumer
                      ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
                      try {
                          // Kafka uses reflection for loading authentication settings, use its classloader
                          Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                          this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
                      } finally {
                          Thread.currentThread().setContextClassLoader(threadClassLoader);
                      }
      
                      if (!first) {
                          // skip one poll timeout before trying again
                          long delay = endpoint.getConfiguration().getPollTimeoutMs();
                          log.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay);
                          try {
                              Thread.sleep(delay);
                          } catch (InterruptedException e) {
                              Thread.currentThread().interrupt();
                          }
                      }
      
                      first = false;
      
                      // doRun keeps running until we either shutdown or is told to re-connect
                      reConnect = doRun();
                  }
              }
      

      The doRun() method returns false and the loop ends. It should be possible to let the proces continue after failed offset commit.

      I think the catch block inside doRun method should look like this:

                 ...
                  } catch (InterruptException e) {
                      getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e);
                      log.info("Unsubscribing {} from topic {}", threadId, topicName);
                      consumer.unsubscribe();
                      Thread.currentThread().interrupt();
                  } catch (org.apache.kafka.clients.consumer.CommitFailedException e) { //or even org.apache.kafka.common.KafkaException
                      getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
                      reConnect = true;
                  } catch (Exception e) {
                      getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
                  } finally {
                      log.debug("Closing {} ", threadId);
                      IOHelper.close(consumer);
                  }
                  ...
      

        Attachments

          Activity

            People

            • Assignee:
              davsclaus Claus Ibsen
              Reporter:
              rgala Rafał Gała
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: