Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12542

Unknown Output topic when Filter Processor is returning false

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.6.0
    • None
    • streams-test-utils
    • None

    Description

      I am using Kafka Streams DSL to create topology.

      StreamsBuilder streamsBuilder=new StreamsBuilder();
      valueSerde.configure(getConfigForSpecificAvro(appId),false);
      KStream<String, AvroDTO> stream = streamsBuilder.stream(inputTopic, Consumed.with(new Serdes.StringSerde(), valueSerde));
      KStream<String, AvroDTO> filtered = stream .filter((key, value) -> ServiceConsumer.filter(key,value));
      filtered .map((KeyValueMapper<String, AvroDTO, KeyValue<String, SpecificRecordBase>>) (key, value) -> ServiceConsumer.process(key,value))
      .to((k,v,recordContext) -> v instanceof AvroDTO? dlqTopic:outputTopic,Produced.with(new Serdes.StringSerde(), valueSerde));
      Topology topology=streamsBuilder.build();
      KafkaStreams kafkaStreams=new KafkaStreams(topology,getKafkaStreamsConfig(appId));

       

      To Test the Topology, I am using TopologyTestDriver.  

      when(getKafkaStreamsConfig(any())).thenReturn(kafkaConfig);
      when(ServiceConsumer.filter(any(),any())).thenReturn(false);
      // when(ServiceConsumer.process(any(),any())).thenReturn(new KeyValue<>(statusDto.getTaskId().toString(),statusDto));
      topologyTestDriver=new TopologyTestDriver(getTopologyAndStartKafkaStreams(),kafkaConfig);
      StreamInput = topologyTestDriver.createInputTopic("INPUT_TOPIC", new StringSerializer(), new KafkaAvroSerializer(schemaRegistryClient));
      UpdateOutput =topologyTestDriver.createOutputTopic("OUTPUT_TOPIC",new StringDeserializer(),new KafkaAvroDeserializer(schemaRegistryClient,config));

      StreamInput.pipeInput("Hi);
      assertThat(UpdateOutput .isEmpty()).isTrue();

       

      I am checking if there are no filtered messages then my output topic is empty.

      Getting Error while Unit Testing

      java.lang.IllegalArgumentException: Unknown topic: OUTPUT_TOPIC

       

      Changing when(ServiceConsumer.filter(any(),any())).thenReturn(false); to true doesnt gives any error.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            sinhash SHWETA SINHA
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: