Details
Description
Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his solution which is below, works for the succeed transactional messages. But when there are aborted messages, it will be in infinite loop. Here is his proposition :
while (offset < highWatermark) { final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); for (ConsumerRecord<byte[], byte[]> record : records) { if (record.key() != null) { stateRestoreCallback.restore(record.key(), record.value()); } offset = consumer.position(topicPartition); } }
Concretely, when the consumer consume a set of aborted messages, it polls 0 records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute.
So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated.
while (offset < highWatermark) { final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); for (ConsumerRecord<byte[], byte[]> record : records) { if (record.key() != null) { stateRestoreCallback.restore(record.key(), record.value()); } } offset = consumer.position(topicPartition); }
Attachments
Issue Links
- links to