Description
Problem:
When using ProcessorTopologyTestDriver, the extracted timestamp is not forwarded with the produced record to the internal topics.
Example:
object Topology1 { def main(args: Array[String]): Unit = { val inputTopic = "input" val outputTopic = "output" val stateStore = "count" val inputs = Seq[(String, Integer)](("A@1450000000", 1), ("B@1450000000", 2)) val props = new Properties props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[MyTimestampExtractor].getName) val windowedStringSerde = Serdes.serdeFrom(new WindowedSerializer(Serdes.String.serializer), new WindowedDeserializer(Serdes.String.deserializer)) val builder = new KStreamBuilder builder.stream(Serdes.String, Serdes.Integer, inputTopic) .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v)) .groupByKey(Serdes.String, Serdes.Integer) .count(TimeWindows.of(1000L), stateStore) .to(windowedStringSerde, Serdes.Long, outputTopic) val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore) inputs.foreach { case (key, value) => { driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer) val record = driver.readOutput(outputTopic, Serdes.String.deserializer, Serdes.Long.deserializer) println(record) } } } }
Attachments
Issue Links
- links to