Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4597

Record metadata returned by producer doesn't consider log append time

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.1.1
    • Fix Version/s: 0.10.2.0
    • Component/s: clients, producer
    • Labels:
      None

      Description

      Kafka topics might be configured to record timestamps of the messages produced. There are two different timestamps which might be stored:

      1. Record create time: The time the record is created by the client.
      2. Log append time: The time the record has been added to the log by the broker.

      The ProducerRecord docs state:

      In either of the cases above, the timestamp that has actually been used will be returned to user in RecordMetadata

      However I found the create time used in both cases.

      The following class creates two topics, one configured to store create time. The other used log append time. It produces 10 messages in each topic and outputs the timestamps from the record meta data as well as those fetched by a consumer client.

      import kafka.admin.AdminUtils;
      import kafka.admin.RackAwareMode;
      import kafka.utils.ZKStringSerializer$;
      import kafka.utils.ZkUtils;
      import org.I0Itec.zkclient.ZkClient;
      import org.I0Itec.zkclient.ZkConnection;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.Producer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import org.apache.kafka.common.TopicPartition;
      
      import java.util.Arrays;
      import java.util.Properties;
      import java.util.concurrent.ExecutionException;
      
      public class KafkaTimestampTest {
      
          public static void main(String[] args) throws ExecutionException, InterruptedException {
              String ip = "127.0.0.1";
      
              Properties producerProperties = new Properties();
              producerProperties.put("bootstrap.servers", ip + ":9092");
              producerProperties.put("acks", "all");
              producerProperties.put("retries", 0);
              producerProperties.put("batch.size", 16384);
              producerProperties.put("linger.ms", 1);
              producerProperties.put("buffer.memory", 33554432);
              producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
              Properties consumerProperties = new Properties();
              consumerProperties.put("bootstrap.servers", ip + ":9092");
              consumerProperties.put("enable.auto.commit", "false");
              consumerProperties.put("session.timeout.ms", "30000");
              consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      
              Producer<String, String> producer = new KafkaProducer<>(producerProperties);
              KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
      
              ZkClient zkClient = new ZkClient(ip + ":2181", 10000, 10000, ZKStringSerializer$.MODULE$);
              ZkConnection zkConnection = new ZkConnection(ip + ":2181");
              ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
      
              TopicPartition topicPartitionWithCreateTime = new TopicPartition("test-topic-with-create-time", 0);
              TopicPartition topicPartitionWithLogAppendTime = new TopicPartition("test-topic-with-log-append-time", 0);
      
              // create topic with create time
              if (!AdminUtils.topicExists(zkUtils, topicPartitionWithCreateTime.topic())) {
                  Properties topicProperties = new Properties();
                  topicProperties.put("message.timestamp.type", "CreateTime");
                  AdminUtils.createTopic(zkUtils, topicPartitionWithCreateTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$);
              }
      
              // create topic with log append time
              if (!AdminUtils.topicExists(zkUtils, topicPartitionWithLogAppendTime.topic())) {
                  Properties topicProperties = new Properties();
                  topicProperties.put("message.timestamp.type", "LogAppendTime");
                  AdminUtils.createTopic(zkUtils, topicPartitionWithLogAppendTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$);
              }
      
              consumer.assign(Arrays.asList(topicPartitionWithLogAppendTime, topicPartitionWithCreateTime));
              String format = "#%s, MetaDataOffset: %s, MetaDataTime: %s, ConsumerRecordOffset: %s, ConsumerRecordTime: %s";
      
              System.out.println(String.format("Create messages into topic %s ...", topicPartitionWithCreateTime));
              for (int i = 0; i < 10; i++) {
                  RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithCreateTime.topic(), topicPartitionWithCreateTime.partition(), "", "message")).get();
                  consumer.seek(topicPartitionWithCreateTime, recordMetadata.offset());
                  ConsumerRecord<String, String> consumerRecord =  consumer.poll(1000).records(topicPartitionWithCreateTime).get(0);
                  System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp()));
              }
      
              System.out.println(String.format("Create messages into topic %s...", topicPartitionWithLogAppendTime));
              for (int i = 0; i < 10; i++) {
                  RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithLogAppendTime.topic(), topicPartitionWithLogAppendTime.partition(), "", "message")).get();
                  consumer.seek(topicPartitionWithLogAppendTime, recordMetadata.offset());
                  ConsumerRecord<String, String> consumerRecord =  consumer.poll(1000).records(topicPartitionWithLogAppendTime).get(0);
                  System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp()));
              }
      
              AdminUtils.deleteTopic(zkUtils, topicPartitionWithCreateTime.topic());
              AdminUtils.deleteTopic(zkUtils, topicPartitionWithLogAppendTime.topic());
          }
      }
      

      The output shows that in case of log append time the timestamps differ.

      Create messages into topic test-topic-with-create-time-0 ...
      #1, MetaDataOffset: 0, MetaDataTime: 1483623773788, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623773788
      #2, MetaDataOffset: 1, MetaDataTime: 1483623774178, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774178
      #3, MetaDataOffset: 2, MetaDataTime: 1483623774183, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623774183
      #4, MetaDataOffset: 3, MetaDataTime: 1483623774188, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623774188
      #5, MetaDataOffset: 4, MetaDataTime: 1483623774193, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623774193
      #6, MetaDataOffset: 5, MetaDataTime: 1483623774197, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623774197
      #7, MetaDataOffset: 6, MetaDataTime: 1483623774202, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623774202
      #8, MetaDataOffset: 7, MetaDataTime: 1483623774207, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623774207
      #9, MetaDataOffset: 8, MetaDataTime: 1483623774212, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623774212
      #10, MetaDataOffset: 9, MetaDataTime: 1483623774217, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623774217
      Create messages into topic test-topic-with-log-append-time-0...
      #1, MetaDataOffset: 0, MetaDataTime: 1483623774224, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623774992
      #2, MetaDataOffset: 1, MetaDataTime: 1483623774230, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774997
      #3, MetaDataOffset: 2, MetaDataTime: 1483623774235, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623775002
      #4, MetaDataOffset: 3, MetaDataTime: 1483623774239, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623775007
      #5, MetaDataOffset: 4, MetaDataTime: 1483623774244, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623775011
      #6, MetaDataOffset: 5, MetaDataTime: 1483623774248, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623775015
      #7, MetaDataOffset: 6, MetaDataTime: 1483623774253, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623775020
      #8, MetaDataOffset: 7, MetaDataTime: 1483623774257, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623775024
      #9, MetaDataOffset: 8, MetaDataTime: 1483623774262, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623775029
      #10, MetaDataOffset: 9, MetaDataTime: 1483623774267, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623775034
      

      I assume the timestamps in the record meta data represent the create time but could not ensure that.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                ijuma Ismael Juma
                Reporter:
                magic_al Alex Fechner
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: