Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6742

TopologyTestDriver error when dealing with stores from GlobalKTable

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.1.0
    • 1.1.1, 2.0.0
    • streams
    • None

    Description

      This junit test simply fails:

      @Test

      public void globalTable() {

      StreamsBuilder builder = new StreamsBuilder();

      @SuppressWarnings("unused")

      final KTable<String,String> localTable = builder

      .table("local", 

      Consumed.with(Serdes.String(), Serdes.String()),

      Materialized.as("localStore"))

      ;

      @SuppressWarnings("unused")

      final GlobalKTable<String,String> globalTable = builder

      .globalTable("global", 

      Consumed.with(Serdes.String(), Serdes.String()),

              Materialized.as("globalStore"))

      ;

      //

      Properties props = new Properties();

      props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");

      props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost");

      TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), props);

      //

      final KeyValueStore<String,String> localStore = testDriver.getKeyValueStore("localStore");

      Assert.assertNotNull(localStore);

      Assert.assertNotNull(testDriver.getAllStateStores().get("localStore"));

      //

      final KeyValueStore<String,String> globalStore = testDriver.getKeyValueStore("globalStore");

      Assert.assertNotNull(globalStore);

      Assert.assertNotNull(testDriver.getAllStateStores().get("globalStore"));

      //

          final ConsumerRecordFactory<String,String> crf = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());

      testDriver.pipeInput(crf.create("local", "one", "TheOne"));

      testDriver.pipeInput(crf.create("global", "one", "TheOne"));

      //

      Assert.assertEquals("TheOne", localStore.get("one"));

      Assert.assertEquals("TheOne", globalStore.get("one"));

       

       

      to make it work I had to modify the TopologyTestDriver class as follow:

      ...

          public Map<String, StateStore> getAllStateStores() {

      //        final Map<String, StateStore> allStores = new HashMap<>();

      //        for (final String storeName : internalTopologyBuilder.allStateStoreName())

      { //            allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName)); //        }

      //        return allStores;

          // FIXME

          final ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr();

              final Map<String, StateStore> allStores = new HashMap<>();

              for (final String storeName : internalTopologyBuilder.allStateStoreName())

      {             StateStore res = psm.getStore(storeName);             if (res == null)               res = psm.getGlobalStore(storeName);             allStores.put(storeName, res);         }

              return allStores;

          }

      ...

          public StateStore getStateStore(final String name) {

      //        return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);

              // FIXME

          final ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr();

              StateStore res = psm.getStore(name);

              if (res == null)

              res = psm.getGlobalStore(name);

              return res;

          }

       

      moreover I think it would be very useful to make the internal MockProducer public for testing cases where a producer is used along side with the "normal" stream processing by adding the method:

          /**

           * @return records sent with this producer are automatically streamed to the topology.

           */

          public final Producer<byte[], byte[]> getProducer()

      {      return producer;     }

       

      unfortunately this introduces another problem that could be verified by adding the following lines to the previous junit test:

      ...

      **

      //

      ConsumerRecord<byte[],byte[]> cr = crf.create("dummy", "two", "Second"); // just to serialize keys and values

      testDriver.getProducer().send(new ProducerRecord<>("local", null, cr.timestamp(), cr.key(), cr.value()));

      testDriver.getProducer().send(new ProducerRecord<>("global", null, cr.timestamp(), cr.key(), cr.value()));

      testDriver.advanceWallClockTime(0);

      Assert.assertEquals("TheOne", localStore.get("one"));

      Assert.assertEquals("Second", localStore.get("two"));

      Assert.assertEquals("TheOne", globalStore.get("one"));

      Assert.assertEquals("Second", globalStore.get("two"));

      }

       

      that could be fixed with:

       

          private void captureOutputRecords() {

              // Capture all the records sent to the producer ...

              final List<ProducerRecord<byte[], byte[]>> output = producer.history();

              producer.clear();

              for (final ProducerRecord<byte[], byte[]> record : output) {

                  Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());

                  if (outputRecords == null)

      {                 outputRecords = *new* LinkedList<>();                 outputRecordsByTopic.put(record.topic(), outputRecords);             }

                  outputRecords.add(record);

       

                  // Forward back into the topology if the produced record is to an internal or a source topic ...

                  final String outputTopicName = record.topic();

                  if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)

                  || globalPartitionsByTopic.containsKey(outputTopicName)) {  // FIXME

                      final byte[] serializedKey = record.key();

                      final byte[] serializedValue = record.value();

       

                      pipeInput(new ConsumerRecord<>(

                          outputTopicName,

                          -1,

                          -1L,

                          record.timestamp(),

                          TimestampType.CREATE_TIME,

                          0L,

                          serializedKey == null ? 0 : serializedKey.length,

                          serializedValue == null ? 0 : serializedValue.length,

                          serializedKey,

                          serializedValue));

                  }

              }

          }

       

       

       

      Thank you

      Attachments

        Issue Links

          Activity

            People

              vale68 Valentino Proietti
              vale68 Valentino Proietti
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: