Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
2.3.0
-
None
Description
I was attempting to write a unit test for a class implementing the Processor interface that contained a WindowStore, but running the test fails with a ClassCastException coming out of InMemoryWindowStore.init attempting to cast MockProcessorContext to InternalProcessorContext.
Minimal code to reproduce:
package com.cantgetthistowork; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; public class InMemWindowProcessor implements Processor<String, String> { private ProcessorContext context; private WindowStore<String, String> windowStore; @Override public void init(ProcessorContext context) { this.context = context; windowStore = (WindowStore<String, String>) context.getStateStore("my-win-store"); } @Override public void process(String key, String value) { } @Override public void close() { } }
package com.cantgetthistowork; import java.time.Duration; import java.time.Instant; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.junit.Before; import org.junit.Test; public class InMemWindowProcessorTest { InMemWindowProcessor processor = null; MockProcessorContext context = null; @Before public void setup() { processor = new InMemWindowProcessor(); context = new MockProcessorContext(); WindowStore<String, String> store = Stores.windowStoreBuilder( Stores.inMemoryWindowStore( "my-win-store", Duration.ofMinutes(10), Duration.ofSeconds(10), false ), Serdes.String(), Serdes.String() ) .withLoggingDisabled() .build(); store.init(context, store); context.register(store, null); processor.init(context); } @Test public void testThings() { Instant baseTime = Instant.now(); context.setTimestamp(baseTime.toEpochMilli()); context.setTopic("topic-name"); processor.process("key1", "value1"); } }
I was trying this with maven, with mvn --version outputting:
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-03T13:39:06-06:00) Maven home: ~/opt/apache-maven-3.5.0 Java version: 1.8.0_212, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: "unix"
And finally the stack trace:
------------------------------------------------------- T E S T S ------------------------------------------------------- Running com.cantgetthistowork.InMemWindowProcessorTest SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< FAILURE! testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: 0.05 sec <<< ERROR! java.lang.ClassCastException: org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext at org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90) at com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189) at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165) at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75) Results : Tests in error: testThings(com.cantgetthistowork.InMemWindowProcessorTest): org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext Tests run: 1, Failures: 0, Errors: 1, Skipped: 0
Attachments
Issue Links
- duplicates
-
KAFKA-10200 MockProcessorContext doesn't work with WindowStores
- Resolved
- is related to
-
KAFKA-9109 Get Rid of Cast from ProcessorContext to InternalProcessorContext
- Open
- relates to
-
KAFKA-9088 Consolidate InternalMockProcessorContext and MockInternalProcessorContext
- Patch Available