Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4789

ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.2.0
    • 0.11.0.0
    • streams

    Description

      Problem:

      When using ProcessorTopologyTestDriver, the extracted timestamp is not forwarded with the produced record to the internal topics.

      Example:

      object Topology1 {
      
        def main(args: Array[String]): Unit = {
      
          val inputTopic = "input"
          val outputTopic = "output"
          val stateStore = "count"
          val inputs = Seq[(String, Integer)](("A@1450000000", 1), ("B@1450000000", 2))
      
          val props = new Properties
          props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
          props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
          props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[MyTimestampExtractor].getName)
      
          val windowedStringSerde = Serdes.serdeFrom(new WindowedSerializer(Serdes.String.serializer),
            new WindowedDeserializer(Serdes.String.deserializer))
      
          val builder = new KStreamBuilder
          builder.stream(Serdes.String, Serdes.Integer, inputTopic)
            .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
            .groupByKey(Serdes.String, Serdes.Integer)
            .count(TimeWindows.of(1000L), stateStore)
            .to(windowedStringSerde, Serdes.Long, outputTopic)
      
          val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore)
          inputs.foreach {
            case (key, value) => {
              driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer)
              val record = driver.readOutput(outputTopic, Serdes.String.deserializer, Serdes.Long.deserializer)
              println(record)
            }
          }
      
        }
      }
      

      Attachments

        Issue Links

          Activity

            People

              hrafzali Hamidreza Afzali
              hrafzali Hamidreza Afzali
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: