Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12542

Unknown Output topic when Filter Processor is returning false

Agile BoardAttach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.6.0
    • Fix Version/s: None
    • Component/s: streams-test-utils
    • Labels:
      None

      Description

      I am using Kafka Streams DSL to create topology.

      StreamsBuilder streamsBuilder=new StreamsBuilder();
      valueSerde.configure(getConfigForSpecificAvro(appId),false);
      KStream<String, AvroDTO> stream = streamsBuilder.stream(inputTopic, Consumed.with(new Serdes.StringSerde(), valueSerde));
      KStream<String, AvroDTO> filtered = stream .filter((key, value) -> ServiceConsumer.filter(key,value));
      filtered .map((KeyValueMapper<String, AvroDTO, KeyValue<String, SpecificRecordBase>>) (key, value) -> ServiceConsumer.process(key,value))
      .to((k,v,recordContext) -> v instanceof AvroDTO? dlqTopic:outputTopic,Produced.with(new Serdes.StringSerde(), valueSerde));
      Topology topology=streamsBuilder.build();
      KafkaStreams kafkaStreams=new KafkaStreams(topology,getKafkaStreamsConfig(appId));

       

      To Test the Topology, I am using TopologyTestDriver.  

      when(getKafkaStreamsConfig(any())).thenReturn(kafkaConfig);
      when(ServiceConsumer.filter(any(),any())).thenReturn(false);
      // when(ServiceConsumer.process(any(),any())).thenReturn(new KeyValue<>(statusDto.getTaskId().toString(),statusDto));
      topologyTestDriver=new TopologyTestDriver(getTopologyAndStartKafkaStreams(),kafkaConfig);
      StreamInput = topologyTestDriver.createInputTopic("INPUT_TOPIC", new StringSerializer(), new KafkaAvroSerializer(schemaRegistryClient));
      UpdateOutput =topologyTestDriver.createOutputTopic("OUTPUT_TOPIC",new StringDeserializer(),new KafkaAvroDeserializer(schemaRegistryClient,config));

      StreamInput.pipeInput("Hi);
      assertThat(UpdateOutput .isEmpty()).isTrue();

       

      I am checking if there are no filtered messages then my output topic is empty.

      Getting Error while Unit Testing

      java.lang.IllegalArgumentException: Unknown topic: OUTPUT_TOPIC

       

      Changing when(ServiceConsumer.filter(any(),any())).thenReturn(false); to true doesnt gives any error.

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              sinhash SHWETA SINHA

              Dates

              • Created:
                Updated:

                Issue deployment