Details
-
Wish
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.3.0
-
None
-
None
Description
for example,i have two topics,one_topic、two_topic,each topic have two partitions,consumer group 'c_group' subscribe this topics ; how to get c_group latest commited timestamp?
public static long lastCommitTimestamp(String groupId, String bootstrapServers) { int partition = Math.abs(groupId.hashCode() % 50); TopicPartition tp = new TopicPartition("__consumer_offsets", partition); Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) { consumer.assign(Arrays.asList(tp)); consumer.seekToEnd(Collections.singletonList(tp)); ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(30)); if (records.count() == 0) { return -1; } for (ConsumerRecord<byte[], byte[]> record : records) { BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key())); if(baseKey instanceof OffsetKey){ OffsetKey offsetKey = (OffsetKey) baseKey; if("one_topic".equals(offsetKey.key().topicPartition().topic())) { OffsetAndMetadata offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value())); long commitTimestamp = offsetAndMetadata.commitTimestamp(); //why commitTimestamp is current timestamp } } } return records.iterator().next().timestamp(); } }