Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4828

ProcessorTopologyTestDriver does not work when using .through()

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.2.0
    • 0.11.0.0
    • streams

    Description

      Problem:

      ProcessorTopologyTestDriver does not work when testing a topology that uses through().

      org.apache.kafka.streams.errors.StreamsException: Store count2's change log (count2-topic) does not contain partition 1
      	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
      

      Example:

      object Topology1 {
      
        def main(args: Array[String]): Unit = {
      
          val inputTopic = "input"
          val stateStore = "count"
          val stateStore2 = "count2"
          val outputTopic2 = "count2-topic"
          val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))
      
          val props = new Properties
          props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
          props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      
          val builder = new KStreamBuilder
          builder.stream(Serdes.String, Serdes.Integer, inputTopic)
            .groupByKey(Serdes.String, Serdes.Integer)
            .count(stateStore)
            .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)
      
          val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore, stateStore2)
          inputs.foreach {
            case (key, value) => {
              driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer)
              val record = driver.readOutput(outputTopic2, Serdes.String.deserializer, Serdes.Long.deserializer)
              println(record)
            }
          }
        }
      }
      

      Attachments

        Issue Links

          Activity

            People

              hrafzali Hamidreza Afzali
              hrafzali Hamidreza Afzali
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: