Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-18327

camel-kafka: Kafka consumer closes when it is paused

    XMLWordPrintableJSON

Details

    • Task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.17.0
    • 3.18.3, 3.20.0
    • camel-kafka
    • None
    • Novice

    Description

       

      In the startPolling() method, if the consumer is suspending, then the unsubscribe method is called which permanently closes the consumer

       

          Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
          while (isKafkaConsumerRunnable() && isConnected() && pollExceptionStrategy.canContinue()) {
              ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
              if (consumerListener != null) {
                  if (!consumerListener.afterConsume(consumer)) {
                      continue;
                  }
              }
      
              ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
      
              if (result.isBreakOnErrorHit()) {
                  LOG.debug("We hit an error ... setting flags to force reconnect");
                  // force re-connect
                  setReconnect(true);
                  setConnected(false);
              }
      
              updateTaskState();
          }
      
          if (!isConnected()) {
              LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
              commitManager.commit();
          }
      
          safeUnsubscribe();
      } catch (InterruptException e) {

       

       

      Attachments

        Issue Links

          Activity

            People

              orpiske Otavio Rodolfo Piske
              geek.rupam rupam
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: