Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.0.1, 2.0.0
-
None
-
None
-
windows10, Cent OS, Java8
Description
We are building a metrics reporter and one of the tests find different offset (6) in Linux. The following tests pass in Windows
createTopic(TOPIC1, 1, 1); // 1 partition 1 replication runConsumer(TOPIC1); runProducer(TOPIC1, "key", "value", 5); // 5 messages zkMetricsReporter.sendConsumerLagMetrics(Clock.defaultClock().time() / 1000); PartionOffsets partionOffsets = getPartionOffset(); // For topic1-0 assertEquals(5, partionOffsets.getCurrentOffset()); assertEquals(5, partionOffsets.getEndOffset());
and this is how we get offsets
Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host); KafkaConsumer<?, ?> consumer = createNewConsumer(groupId, host); BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> { throw new IllegalStateException(); }; Map<TopicPartition, PartionOffsets> result = logEndOffset.entrySet() .stream() .collect(Collectors.toMap(entry -> (entry.getKey()), entry -> { OffsetAndMetadata committed = consumer.committed(entry.getKey()); return new PartionOffsets(entry.getValue(), committed.offset(), entry.getKey() .partition(), topic); }, mergeFunction));