Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5253

TopologyTestDriver must handle streams created with patterns

    XMLWordPrintableJSON

Details

    • Important

    Description

      Context
      KStreamTestDriver TopologyTestDriver (added via KIP-247) is being used to unit test topologies while developing KStreams apps.

      One such topology uses a Pattern to consume from multiple topics at once.

      Problem
      The unit test of the topology fails because KStreamTestDriver TopologyTestDriver fails to deal with Patterns properly.

      Example
      Underneath is a unit test explaining what I understand should happen, but is failing.

      *Note: the example below is outdate as it used the old KStreamTestDriver. The overall test layout can be adopted for TopologyTestDriver though, thus, we just leave it in the description.*

      Explicitly adding a source topic matching the topic pattern, generates an exception as the topology builder explicitly checks overlapping topic names and patterns, in any order of adding pattern and topic. So, it is intended behaviour.

          @Test
          public void shouldProcessFromSourcesThatDoMatchThePattern() {
              // -- setup stream pattern
              final KStream<String, String> source = builder.stream(Pattern.compile("topic-source-\\d"));
              source.to("topic-sink");
      
              // -- setup processor to capture results
              final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
              source.process(processorSupplier);
      
              // -- add source to stream data from
              //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), "topic-source-3");
      
              // -- build test driver
              driver = new KStreamTestDriver(builder); // this should be TopologyTestDriver
              driver.setTime(0L);
      
              // -- test
              driver.process("topic-source-3", "A", "aa");
      
              // -- validate
              // no exception was thrown
              assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
          }
      

      Solution
      If anybody can help in defining the solution, I can create a pull request for this change.-

      Attachments

        Issue Links

          Activity

            People

              jadireddi Jagadesh Adireddi
              wimvanleuven Wim Van Leuven
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: