Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7536

TopologyTestDriver cannot pre-populate KTable or GlobalKTable

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 2.0.0
    • Fix Version/s: 2.2.0, 2.1.1, 2.0.2
    • Component/s: streams
    • Labels:
      None

      Description

      I have a GlobalKTable that's defined as

      
      GlobalKTable<String, ByteString> userIdsByEmail = topology          
         .globalTable(USER_IDS_BY_EMAIL.name,
                             USER_IDS_BY_EMAIL.consumed(),
                             Materialized.as("user-ids-by-email"));
      

      And the following test in Spock:

          def topology = // my topology
          def driver = new TopologyTestDriver(topology, config())
      
          def cleanup() {
              driver.close()
          }
      
          def "create from email request"() {
      
              def store = driver.getKeyValueStore('user-ids-by-email')
              store.put('string', ByteString.copyFrom(new byte[0]))
              // more, but it fails at the `put` above
      

      When I run this, I get the following:

      
      [2018-10-23 19:35:27,055] INFO (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock Restoring state for global store user-ids-by-email
      
      java.lang.NullPointerException
      	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
      	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
      	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
      	at pony.message.MessageWriteStreamsTest.create from mailgun email request(MessageWriteStreamsTest.groovy:52)
      
      [2018-10-23 19:35:27,189] INFO (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
      

      The same issue applies to KTable.

      I've noticed that I can put() to the store if I first write to it with driver.pipeInput. But otherwise I get the above error.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                guozhang Guozhang Wang
                Reporter:
                dminkovsky Dmitry Minkovsky
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: