Details
-
Sub-task
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
None
Description
The following code returns records before the group metadata is updated. This fails the first transactions ever run by the Producer/Consumer.
Producer<String, String> txnProducer = new KafkaProducer<>(txnProducerProps); Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps); txnProducer.initTransactions(); System.out.println("Init transactions called"); try { txnProducer.beginTransaction(); System.out.println("Begin transactions called"); consumer.subscribe(Collections.singletonList("input")); System.out.println("Consumer subscribed to topic -> KIP848-topic-2 "); ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10)); System.out.println("Returned " + records.count() + " records."); // Process and send txn messages. for (ConsumerRecord<String, String> processedRecord : records) { txnProducer.send(new ProducerRecord<>("output", processedRecord.key(), "Processed: " + processedRecord.value())); } ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); System.out.println("Group metadata inside test" + groupMetadata); Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); for (ConsumerRecord<String, String> record : records) { offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); } System.out.println("Offsets to commit" + offsetsToCommit); // Send offsets to transaction with ConsumerGroupMetadata. txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata); System.out.println("Send offsets to transaction done"); // Commit the transaction. txnProducer.commitTransaction(); System.out.println("Commit transaction done"); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { e.printStackTrace(); txnProducer.close(); } catch (KafkaException e) { e.printStackTrace(); txnProducer.abortTransaction(); } finally { txnProducer.close(); consumer.close(); }
The issue seems to be that while it waits in `poll`, the event to update the group metadata is not processed.
Attachments
Issue Links
- is related to
-
KAFKA-16290 Investigate propagating subscription state updates via queues
- Resolved
- links to