Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8630

Unit testing a streams processor with a WindowStore throws a ClassCastException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.3.0
    • 2.7.0, 2.6.1
    • streams-test-utils
    • None

    Description

      I was attempting to write a unit test for a class implementing the Processor interface that contained a WindowStore, but running the test fails with a ClassCastException coming out of InMemoryWindowStore.init attempting to cast MockProcessorContext to InternalProcessorContext.

      Minimal code to reproduce:

      package com.cantgetthistowork;
      
      import org.apache.kafka.streams.processor.Processor;
      import org.apache.kafka.streams.processor.ProcessorContext;
      import org.apache.kafka.streams.state.WindowStore;
      
      public class InMemWindowProcessor implements Processor<String, String> {
      
        private ProcessorContext context;
        private WindowStore<String, String> windowStore;
      
        @Override
        public void init(ProcessorContext context) {
          this.context = context;
      
          windowStore = (WindowStore<String, String>) context.getStateStore("my-win-store");
        }
      
        @Override
        public void process(String key, String value) {
        }
      
        @Override
        public void close() {
        }
      
      }
      
      package com.cantgetthistowork;
      
      import java.time.Duration;
      import java.time.Instant;
      
      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.streams.processor.MockProcessorContext;
      import org.apache.kafka.streams.state.Stores;
      import org.apache.kafka.streams.state.WindowStore;
      import org.junit.Before;
      import org.junit.Test;
      
      public class InMemWindowProcessorTest {
      
        InMemWindowProcessor processor = null;
        MockProcessorContext context = null;
      
        @Before
        public void setup() {
          processor = new InMemWindowProcessor();
          context = new MockProcessorContext();
      
          WindowStore<String, String> store =
            Stores.windowStoreBuilder(
              Stores.inMemoryWindowStore(
                "my-win-store",
                Duration.ofMinutes(10),
                Duration.ofSeconds(10),
                false
              ),
              Serdes.String(),
              Serdes.String()
            )
            .withLoggingDisabled()
            .build();
          store.init(context, store);
          context.register(store, null);
          processor.init(context);
        }
      
        @Test
        public void testThings() {
          Instant baseTime = Instant.now();
          context.setTimestamp(baseTime.toEpochMilli());
          context.setTopic("topic-name");
          processor.process("key1", "value1");
        }
      
      }
      

       

      I was trying this with maven, with mvn --version outputting:

      Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-03T13:39:06-06:00)
      Maven home: ~/opt/apache-maven-3.5.0
      Java version: 1.8.0_212, vendor: Oracle Corporation
      Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
      Default locale: en_US, platform encoding: UTF-8
      OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: "unix"

      And finally the stack trace:

      -------------------------------------------------------
       T E S T S
      -------------------------------------------------------
      Running com.cantgetthistowork.InMemWindowProcessorTest
      SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
      SLF4J: Defaulting to no-operation (NOP) logger implementation
      SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
      Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< FAILURE!
      testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 0.05 sec  <<< ERROR!
      java.lang.ClassCastException: org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext
          at org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
          at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
          at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
          at com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
          at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
          at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
          at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
          at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
          at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
          at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
          at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
      
      
      Results :
      
      Tests in error: 
        testThings(com.cantgetthistowork.InMemWindowProcessorTest): org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext
      
      Tests run: 1, Failures: 0, Errors: 1, Skipped: 0

       

       

      Attachments

        Issue Links

          Activity

            People

              vvcephei John Roesler
              fetherolfjd Justin Fetherolf
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: