Details
Description
In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with KTables. The below test code worked fine under Kafka 0.10.0.1 but now produces this error:
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: alertInputTopic
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
at org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:174)
at mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)
package mil.navy.icap.kafka.streams.processor.track; import java.io.IOException; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.ProcessorTopologyTestDriver; public class ProcessorDriverTest2 { public static void main(String[] args) throws IOException, InterruptedException { System.out.println("ProcessorDriverTest2"); Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ProcessorDriverTest2"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); StreamsConfig streamsConfig = new StreamsConfig(props); // topology KStreamBuilder kstreamBuilder = new KStreamBuilder(); StringSerde stringSerde = new StringSerde(); KTable<String, String> table = kstreamBuilder.table(stringSerde, stringSerde, "alertInputTopic"); table.to(stringSerde, stringSerde, "alertOutputTopic"); // create test driver ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver( streamsConfig, kstreamBuilder, "alertStore"); StringSerializer serializer = new StringSerializer(); StringDeserializer deserializer = new StringDeserializer(); // send data to input topic testDriver.process("alertInputTopic", "the Key", "the Value", serializer, serializer); // read data from output topic ProducerRecord<String, String> rec = testDriver.readOutput("alertOutputTopic", deserializer, deserializer); System.out.println("rec: " + rec); } }
Attachments
Issue Links
- is related to
-
KAFKA-4828 ProcessorTopologyTestDriver does not work when using .through()
- Resolved