Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
0.10.2.0
-
None
Description
Currently it is not possible to initialise a global store with an in-memory logged store via the TopologyBuilder. This results in the following exception:
java.lang.ClassCastException: org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl cannot be cast to org.apache.kafka.streams.processor.internals.RecordCollector$Supplier
at org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:52)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:44)
at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
at org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97)
at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)
at org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:215)
at org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235)
...
I've created a PR which includes a unit this to verify this behavior.
If the below PR gets merge, the fixing PR should leverage the provided test ProcessorTopologyTest#shouldDriveInMemoryLoggedGlobalStore by removing the @ignore annotation.
Attachments
Issue Links
- Is contained by
-
KAFKA-5045 KTable materialization and improved semantics
- Resolved
- links to