Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4408

KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.1.0
    • 0.11.0.0
    • streams
    • Linux

    Description

      In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with KTables. The below test code worked fine under Kafka 0.10.0.1 but now produces this error:

      Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: alertInputTopic
      at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
      at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
      at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
      at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
      at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
      at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
      at org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:174)
      at mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)

      package mil.navy.icap.kafka.streams.processor.track;
      
      
      import java.io.IOException;
      import java.util.Properties;
      
      
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.common.serialization.Serdes.StringSerde;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import org.apache.kafka.common.serialization.StringSerializer;
      import org.apache.kafka.streams.StreamsConfig;
      import org.apache.kafka.streams.kstream.KStreamBuilder;
      import org.apache.kafka.streams.kstream.KTable;
      import org.apache.kafka.test.ProcessorTopologyTestDriver;
      
      
      public class ProcessorDriverTest2 {
       
       public static void main(String[] args) throws IOException, InterruptedException {
      
      
       System.out.println("ProcessorDriverTest2");
       
       Properties props = new Properties();
       props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ProcessorDriverTest2");
       props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
       
       StreamsConfig streamsConfig = new StreamsConfig(props);
       
       // topology
       KStreamBuilder kstreamBuilder = new KStreamBuilder();
       StringSerde stringSerde = new StringSerde();
       KTable<String, String> table = kstreamBuilder.table(stringSerde,
       stringSerde, "alertInputTopic");
       table.to(stringSerde, stringSerde, "alertOutputTopic");
       
       // create test driver
       ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
       streamsConfig, 
       kstreamBuilder, 
       "alertStore");
      
      
       StringSerializer serializer = new StringSerializer();
       StringDeserializer deserializer = new StringDeserializer();
      
      
       // send data to input topic
       testDriver.process("alertInputTopic", 
       "the Key", "the Value", serializer, serializer);
       
       // read data from output topic
       ProducerRecord<String, String> rec = testDriver.readOutput("alertOutputTopic", 
       deserializer, deserializer);
       
       System.out.println("rec: " + rec);
       }
      }
      

      Attachments

        Issue Links

          Activity

            People

              hrafzali Hamidreza Afzali
              bnikolaidis Byron Nikolaidis
              Votes:
              3 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: