Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
0.10.2.0
-
None
-
None
Description
This issue is related to the recovery process of a global store. It might be that I'm misunderstanding the design of the global store as it's all quite new to me, but I wanted to verify this case.
I'm trying to create a global store with a processor which transforms the values from the source and puts them into the state store, and I want all these transformed values to be available in every streams job (therefore the use of a global store)
I'll give you an example which I created based on an existing Kafka Streams unit test:
final StateStoreSupplier storeSupplier = Stores.create("my-store") .withStringKeys().withIntegerValues().inMemory().disableLogging().build(); final String global = "global"; final String topic = "topic"; final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) storeSupplier.get(); final TopologyBuilder topologyBuilder = this.builder .addGlobalStore(globalStore, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new ValueToLengthStatefulProcessor("my-store"))); driver = new ProcessorTopologyTestDriver(config, topologyBuilder); driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); assertEquals("value1".length(), globalStore.get("key1")); assertEquals("value2".length(), globalStore.get("key2"));
The ValueToLengthStatefulProcessor basically takes the incoming value, calculates the length of the string, and puts the result in the state store. Note the difference in types between the source stream (string values) and the data store (integer values)
If I understand global stores correctly and based on what I've tried out already, the stream of data runs like this:
a source stream named "global" reading values from a Kafka topic called "topic" -> ValueToLengthStatefulProcessor -> "my-store" state store
However, when the streams job starts up it runs the recovery process by reading out the source stream again. I've noticed that in this case it seems to skip the processor entirely and acts like the source stream is the changelog of the state store, making the data flow like this during the recovery process:
source stream -> "my store" state store
Because it acts like the source stream is the changelog of the state store, it also tries to use the deserializer of the state store. This won't work since the values of the state store should be integers, while the values in the source stream are strings.
So all this will startup nicely as long as the source stream has no values yet. However, once the source stream has (string) values, the startup recovery process will fail since it will be sourcing directly to the state store instead of passing the source values to the processor.
I believe this is caused by the following line of code in TopologyBuilder.addGlobalStore, which connects the store directly to the source topic.
https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java#L507
Please let me know if I'm totally misunderstanding how global stores should work. But I think this might be a crucial bug or design flaw.