Description
Kafka topics might be configured to record timestamps of the messages produced. There are two different timestamps which might be stored:
- Record create time: The time the record is created by the client.
- Log append time: The time the record has been added to the log by the broker.
The ProducerRecord docs state:
In either of the cases above, the timestamp that has actually been used will be returned to user in RecordMetadata
However I found the create time used in both cases.
The following class creates two topics, one configured to store create time. The other used log append time. It produces 10 messages in each topic and outputs the timestamps from the record meta data as well as those fetched by a consumer client.
import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaTimestampTest { public static void main(String[] args) throws ExecutionException, InterruptedException { String ip = "127.0.0.1"; Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers", ip + ":9092"); producerProperties.put("acks", "all"); producerProperties.put("retries", 0); producerProperties.put("batch.size", 16384); producerProperties.put("linger.ms", 1); producerProperties.put("buffer.memory", 33554432); producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", ip + ":9092"); consumerProperties.put("enable.auto.commit", "false"); consumerProperties.put("session.timeout.ms", "30000"); consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Producer<String, String> producer = new KafkaProducer<>(producerProperties); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); ZkClient zkClient = new ZkClient(ip + ":2181", 10000, 10000, ZKStringSerializer$.MODULE$); ZkConnection zkConnection = new ZkConnection(ip + ":2181"); ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); TopicPartition topicPartitionWithCreateTime = new TopicPartition("test-topic-with-create-time", 0); TopicPartition topicPartitionWithLogAppendTime = new TopicPartition("test-topic-with-log-append-time", 0); // create topic with create time if (!AdminUtils.topicExists(zkUtils, topicPartitionWithCreateTime.topic())) { Properties topicProperties = new Properties(); topicProperties.put("message.timestamp.type", "CreateTime"); AdminUtils.createTopic(zkUtils, topicPartitionWithCreateTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$); } // create topic with log append time if (!AdminUtils.topicExists(zkUtils, topicPartitionWithLogAppendTime.topic())) { Properties topicProperties = new Properties(); topicProperties.put("message.timestamp.type", "LogAppendTime"); AdminUtils.createTopic(zkUtils, topicPartitionWithLogAppendTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$); } consumer.assign(Arrays.asList(topicPartitionWithLogAppendTime, topicPartitionWithCreateTime)); String format = "#%s, MetaDataOffset: %s, MetaDataTime: %s, ConsumerRecordOffset: %s, ConsumerRecordTime: %s"; System.out.println(String.format("Create messages into topic %s ...", topicPartitionWithCreateTime)); for (int i = 0; i < 10; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithCreateTime.topic(), topicPartitionWithCreateTime.partition(), "", "message")).get(); consumer.seek(topicPartitionWithCreateTime, recordMetadata.offset()); ConsumerRecord<String, String> consumerRecord = consumer.poll(1000).records(topicPartitionWithCreateTime).get(0); System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp())); } System.out.println(String.format("Create messages into topic %s...", topicPartitionWithLogAppendTime)); for (int i = 0; i < 10; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithLogAppendTime.topic(), topicPartitionWithLogAppendTime.partition(), "", "message")).get(); consumer.seek(topicPartitionWithLogAppendTime, recordMetadata.offset()); ConsumerRecord<String, String> consumerRecord = consumer.poll(1000).records(topicPartitionWithLogAppendTime).get(0); System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp())); } AdminUtils.deleteTopic(zkUtils, topicPartitionWithCreateTime.topic()); AdminUtils.deleteTopic(zkUtils, topicPartitionWithLogAppendTime.topic()); } }
The output shows that in case of log append time the timestamps differ.
Create messages into topic test-topic-with-create-time-0 ... #1, MetaDataOffset: 0, MetaDataTime: 1483623773788, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623773788 #2, MetaDataOffset: 1, MetaDataTime: 1483623774178, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774178 #3, MetaDataOffset: 2, MetaDataTime: 1483623774183, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623774183 #4, MetaDataOffset: 3, MetaDataTime: 1483623774188, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623774188 #5, MetaDataOffset: 4, MetaDataTime: 1483623774193, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623774193 #6, MetaDataOffset: 5, MetaDataTime: 1483623774197, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623774197 #7, MetaDataOffset: 6, MetaDataTime: 1483623774202, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623774202 #8, MetaDataOffset: 7, MetaDataTime: 1483623774207, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623774207 #9, MetaDataOffset: 8, MetaDataTime: 1483623774212, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623774212 #10, MetaDataOffset: 9, MetaDataTime: 1483623774217, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623774217 Create messages into topic test-topic-with-log-append-time-0... #1, MetaDataOffset: 0, MetaDataTime: 1483623774224, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623774992 #2, MetaDataOffset: 1, MetaDataTime: 1483623774230, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774997 #3, MetaDataOffset: 2, MetaDataTime: 1483623774235, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623775002 #4, MetaDataOffset: 3, MetaDataTime: 1483623774239, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623775007 #5, MetaDataOffset: 4, MetaDataTime: 1483623774244, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623775011 #6, MetaDataOffset: 5, MetaDataTime: 1483623774248, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623775015 #7, MetaDataOffset: 6, MetaDataTime: 1483623774253, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623775020 #8, MetaDataOffset: 7, MetaDataTime: 1483623774257, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623775024 #9, MetaDataOffset: 8, MetaDataTime: 1483623774262, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623775029 #10, MetaDataOffset: 9, MetaDataTime: 1483623774267, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623775034
I assume the timestamps in the record meta data represent the create time but could not ensure that.
Attachments
Issue Links
- links to