Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.5.0
-
None
-
None
-
windows/linux
Description
I have regular groupBy&Counting stream configuration:
fun addStream(kStreamBuilder: StreamsBuilder) { val storeSupplier = Stores.inMemoryWindowStore("count-store", Duration.ofDays(10), Duration.ofDays(1), false) val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = Stores .windowStoreBuilder(storeSupplier, JsonSerde(CountableEvent::class.java), Serdes.Long()) kStreamBuilder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .map {_, jsonRepresentation -> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} .groupByKey() .windowedBy(TimeWindows.of(Duration.ofDays(1))) .count(Materialized.with(JsonSerde(CountableEvent::class.java), Serdes.Long())) .toStream() .to("topic1-count") val storeConsumed = Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), Duration.ofDays(1).toMillis()), Serdes.Long()) kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", storeConsumed, passThroughProcessorSupplier) }
While sending to "topic1-count", for serializing the key TimeWindowedSerializer is used which is using WindowKeySchema.toBinary so the message key format is:
real_grouping_key + timestamp(8bytes)
Everything works. I can get correct values from state-store. But, in recovery scenario, when GlobalStateManagerImpl enters offset < highWatermark loop then
InMemoryWindowStore stateRestoreCallback reads from "topic1-count" and fails to extract valid key and timestamp using WindowKeySchema.extractStoreKeyBytes and WindowKeySchema.extractStoreTimestamp. It fails because it expects format:
real_grouping_key + timestamp(8bytes) + sequence_number(4bytes)
How this is supposed to work in this case?
Attachments
Issue Links
- relates to
-
KAFKA-10137 Clean-up retain Duplicate logic in Window Stores
- Open