Description
Problem:
ProcessorTopologyTestDriver does not work when testing a topology that uses through().
org.apache.kafka.streams.errors.StreamsException: Store count2's change log (count2-topic) does not contain partition 1 at org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
Example:
object Topology1 { def main(args: Array[String]): Unit = { val inputTopic = "input" val stateStore = "count" val stateStore2 = "count2" val outputTopic2 = "count2-topic" val inputs = Seq[(String, Integer)](("A", 1), ("A", 2)) val props = new Properties props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") val builder = new KStreamBuilder builder.stream(Serdes.String, Serdes.Integer, inputTopic) .groupByKey(Serdes.String, Serdes.Integer) .count(stateStore) .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2) val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore, stateStore2) inputs.foreach { case (key, value) => { driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer) val record = driver.readOutput(outputTopic2, Serdes.String.deserializer, Serdes.Long.deserializer) println(record) } } } }
Attachments
Issue Links
- relates to
-
KAFKA-4408 KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0
- Resolved
- links to