Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10322

InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.5.0
    • None
    • streams
    • None
    • windows/linux

    Description

      I have regular groupBy&Counting stream configuration:

         
          fun addStream(kStreamBuilder: StreamsBuilder) {
              val storeSupplier = Stores.inMemoryWindowStore("count-store",
                      Duration.ofDays(10),
                      Duration.ofDays(1),
                      false)
              val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = Stores
                      .windowStoreBuilder(storeSupplier, JsonSerde(CountableEvent::class.java), Serdes.Long())
      
              kStreamBuilder
                      .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
                      .map {_, jsonRepresentation -> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
                      .groupByKey()
                      .windowedBy(TimeWindows.of(Duration.ofDays(1)))
                      .count(Materialized.with(JsonSerde(CountableEvent::class.java), Serdes.Long()))
                      .toStream()
                      .to("topic1-count")
      
              val storeConsumed = Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), Duration.ofDays(1).toMillis()), Serdes.Long())
              kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", storeConsumed, passThroughProcessorSupplier)
          }

      While sending to "topic1-count", for serializing the key TimeWindowedSerializer is used which is using WindowKeySchema.toBinary so the message key format is:

      real_grouping_key + timestamp(8bytes)

       

      Everything works. I can get correct values from state-store. But, in recovery scenario, when GlobalStateManagerImpl enters offset < highWatermark loop then

      InMemoryWindowStore stateRestoreCallback reads from "topic1-count" and fails to extract valid key and timestamp using WindowKeySchema.extractStoreKeyBytes and WindowKeySchema.extractStoreTimestamp. It fails because it expects format:

      real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) 

      How this is supposed to work in this case?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              tbradlo Tomasz Bradło
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: