Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9136

get consumer latest commited timestamp

    XMLWordPrintableJSON

Details

    • Wish
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.3.0
    • None
    • consumer
    • None

    Description

      for example,i have two topics,one_topic、two_topic,each topic have two partitions,consumer group 'c_group' subscribe  this topics ; how to get c_group  latest commited timestamp?

      public static long lastCommitTimestamp(String groupId, String bootstrapServers) {
       int partition = Math.abs(groupId.hashCode() % 50);
       TopicPartition tp = new TopicPartition("__consumer_offsets", partition);
       Properties props = new Properties();
       props.put("bootstrap.servers", bootstrapServers);
       props.put("group.id", groupId);
       props.put("enable.auto.commit", "false");
       props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
       props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
       try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
       consumer.assign(Arrays.asList(tp));
       consumer.seekToEnd(Collections.singletonList(tp));
       ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(30));
       if (records.count() == 0) {
       return -1;
       }
      
       for (ConsumerRecord<byte[], byte[]> record : records) {
       BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
       if(baseKey instanceof OffsetKey){
       OffsetKey offsetKey = (OffsetKey) baseKey;
       if("one_topic".equals(offsetKey.key().topicPartition().topic())) {
       OffsetAndMetadata offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
       long commitTimestamp = offsetAndMetadata.commitTimestamp();
       //why commitTimestamp is current timestamp
       }
       }
       }
      return records.iterator().next().timestamp();
       }
       }

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            zhangzs zhangzhisheng
            Guozhang Wang Guozhang Wang
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: