Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.21.0
-
None
-
Linux
-
Unknown
Description
I am trying the maual commit from consumer below is the code snippet, i want to consume and commit the message after 2 mins of its arrival in the topic. My consumer retrieves and checks the time difference if it is above 2 mins then it should commit. But message once retrieved and not committed manually. I am expecting it to come back but it does not comeback ever. when i try creating kafka consumer it works fine
public void configure() throws Exception {
from("kafka:BENEFITSLOADER.LOAD?brokers=xxxx:9092,xxxx:9092,xxxx:9092&groupId=BENEFITSLOADER&consumersCount=1&pollTimeoutMs=1000&autoCommitEnable=false&allowManualCommit=true&maxPollRecords=1")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Long msgDateTime = (Long) exchange.getIn().getHeaders().get(KafkaConstants.TIMESTAMP);
System.out.println("Message : " + (exchange.getIn().getHeaders()));
System.out.println("Message : " + (exchange.getIn().getBody()));
Date msgDate = new Date(msgDateTime);
Date currentDate = new Date();
long diff = currentDate.getTime() - msgDate.getTime();
long diffMinutes = diff / (60 * 1000) % 60;
System.out.println("Difference in Minutes " + diffMinutes);
KafkaManualCommit manualCommit = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if(diffMinutes > 2)
}
});
}
}
Code that works fine
public class TestKafkaConsumer {
static Consumer<String, String> consumer = null;
static ConsumerRecord<String,String> fetchedRecord;
static ConsumerRecords<String, String> records;
public static void main(String... args) {
String topicName = "BENEFITSLOADER.LOAD";
consumer = createConsumer();
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
if(fetchedRecord == null)
records = consumer.poll(1000);
records.forEach(record ->
{ fetchedRecord = record; });
if(fetchedRecord != null)
{
Date msgDate = new Date(fetchedRecord.timestamp());
Date date = new Date(System.currentTimeMillis());
long diff = date.getTime() - msgDate.getTime();
long diffMinutes = diff / (60 * 1000) % 60;
System.out.printf("Consumer Record%s, %s, %d, %d)\n",
fetchedRecord.key(), fetchedRecord.value(),
fetchedRecord.partition(), fetchedRecord.offset());
if(diffMinutes > 2)
}
}
}
catch (Exception ex)
{ ex.printStackTrace(); }finally
{ consumer.close(); }}
private static Consumer<String, String> createConsumer()
{ Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092,xxx:9092,xxx:9093"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "BENEFITSLOADER"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); return new KafkaConsumer<>(props); }}
Attachments
Issue Links
- relates to
-
CAMEL-12732 Kafka manual commit to file repository doesn't work properly (using Spring boot)
- Resolved