Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15946

AsyncKafkaConsumer should retry commits on the application thread instead of auto-retry

    XMLWordPrintableJSON

Details

    Description

      The original design was that the network thread always completes the future whether succeeds or fails.  However, in the current patch, I mis-added auto-retry functionality because commitSync wasn't retrying.  What we should be doing is, the commit sync API should catch the RetriableExceptions and resend another commit until timesout.

       

      if (error.exception() instanceof RetriableException) {
      log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message());
      handleRetriableError(error, response);
      retry(responseTime);  <--- We probably shouldn't do this.
      return;
      } 

       

      @Override
      public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
      acquireAndEnsureOpen();
      long commitStart = time.nanoseconds();
      try
      { CompletableFuture<Void> commitFuture = commit(offsets, true); <-- we probably should retry here ConsumerUtils.getResult(commitFuture, time.timer(timeout)); }
      finally
      { wakeupTrigger.clearTask(); kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); release(); }
      } 

      Attachments

        Activity

          People

            lianetm Lianet Magrans
            pnee Philip Nee
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: