Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10576

Different behavior of commitSync and commitAsync

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • consumer
    • None

    Description

      It looks like commitSync and commitAsync consumer's methods have a different semantic.

      public class TestKafka {
          public static void main(String[]args) {
              String id = "dev_test";
              Map<String, Object> settings = new HashMap<>();
              settings.put("bootstrap.servers", "localhost:9094");
              settings.put("key.deserializer", StringDeserializer.class);
              settings.put("value.deserializer", StringDeserializer.class);
              settings.put("client.id", id);
              settings.put("group.id", id);
      
              String topic = "test";
              Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
              offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));
      
              try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
                  consumer.commitSync(offsets);
              }
          }
      }
      

      In the example above I created a consumer and use commitSync to commit offsets. This code works as expected — all offsets are committed to kafka.

      But in the case of commitAsync it will not work:

      try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
          CompletableFuture<Boolean> result = new CompletableFuture<>();
          consumer.commitAsync(offsets, new OffsetCommitCallback() {
              @Override
              public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                  if (exception != null) {
                      result.completeExceptionally(exception);
                  } else {
                      result.complete(true);
                  }
              }
          });
          result.get(15L, TimeUnit.SECONDS);
      }
      

      The result future failed with a timeout.

      This behavior is pretty surprising. From naming and documentation, it looks like commitSync and commitAsync methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences.

      I can assume that the commitAsync method somehow depends on the poll calls. But I didn't find any explicit information about it in KafkaConsumer's javadoc or kafka documentation page.

      So, I believe that there are the next options:

      1. It's a bug and not expected behavior. commitSync and commitAsync should have identical semantics.
      2. It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented.

      Attachments

        Activity

          People

            Unassigned Unassigned
            lmnet Yuriy Badalyantc
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: