Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6782

GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.1, 1.1.0
    • 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
    • streams
    • None

    Description

      Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his solution which is below, works for the succeed transactional messages. But when there are aborted messages, it will be in infinite loop. Here is his proposition :

      while (offset < highWatermark) {
       final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
       for (ConsumerRecord<byte[], byte[]> record : records) {
       if (record.key() != null) {
         stateRestoreCallback.restore(record.key(), record.value());
       }
       offset = consumer.position(topicPartition);
       }
       }

      Concretely, when the consumer consume a set of aborted messages, it polls 0 records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute.

       So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated.

      while (offset < highWatermark) {
       final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
       for (ConsumerRecord<byte[], byte[]> record : records) {
       if (record.key() != null) {
         stateRestoreCallback.restore(record.key(), record.value());
       }
       }
       offset = consumer.position(topicPartition);
       }

       

      Attachments

        Issue Links

          Activity

            People

              Gitomain Lingxiao WANG
              Lingxiao WANG Lingxiao WANG
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: