Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.6.0
-
None
-
None
Description
I am using Kafka Streams DSL to create topology.
StreamsBuilder streamsBuilder=new StreamsBuilder();
valueSerde.configure(getConfigForSpecificAvro(appId),false);
KStream<String, AvroDTO> stream = streamsBuilder.stream(inputTopic, Consumed.with(new Serdes.StringSerde(), valueSerde));
KStream<String, AvroDTO> filtered = stream .filter((key, value) -> ServiceConsumer.filter(key,value));
filtered .map((KeyValueMapper<String, AvroDTO, KeyValue<String, SpecificRecordBase>>) (key, value) -> ServiceConsumer.process(key,value))
.to((k,v,recordContext) -> v instanceof AvroDTO? dlqTopic:outputTopic,Produced.with(new Serdes.StringSerde(), valueSerde));
Topology topology=streamsBuilder.build();
KafkaStreams kafkaStreams=new KafkaStreams(topology,getKafkaStreamsConfig(appId));
To Test the Topology, I am using TopologyTestDriver.
when(getKafkaStreamsConfig(any())).thenReturn(kafkaConfig);
when(ServiceConsumer.filter(any(),any())).thenReturn(false);
// when(ServiceConsumer.process(any(),any())).thenReturn(new KeyValue<>(statusDto.getTaskId().toString(),statusDto));
topologyTestDriver=new TopologyTestDriver(getTopologyAndStartKafkaStreams(),kafkaConfig);
StreamInput = topologyTestDriver.createInputTopic("INPUT_TOPIC", new StringSerializer(), new KafkaAvroSerializer(schemaRegistryClient));
UpdateOutput =topologyTestDriver.createOutputTopic("OUTPUT_TOPIC",new StringDeserializer(),new KafkaAvroDeserializer(schemaRegistryClient,config));
StreamInput.pipeInput("Hi);
assertThat(UpdateOutput .isEmpty()).isTrue();
I am checking if there are no filtered messages then my output topic is empty.
Getting Error while Unit Testing
java.lang.IllegalArgumentException: Unknown topic: OUTPUT_TOPIC
Changing when(ServiceConsumer.filter(any(),any())).thenReturn(false); to true doesnt gives any error.