Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13309

InMemorySessionStore#fetch/backwardFetch doesn't return in correct order

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.1.0, 3.0.1
    • streams
    • None

    Description

      We supported backward iterator for SessionStore in KAFKA-9929. But we cannot return the correct order when fetch/backwardFetch the key range when there are multiple records in the same session window.

      For example:

      We have a session window inactivity gap with 10 ms, and the records:

      key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0)

      key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0)

      key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0)

      key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100)

       

      So, when fetch("A" /*key from*/, "D" /*key to*/), we expected to have [A, B, C, D], but we'll have [C, B A, D ]

       

      And the reason is here:

      https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java#L276-L295

       

      public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) {
                      return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); // <-- the final param is "isFarwarded", which should be true for "fetch" case, and false for "backwardFetch" case
                  }
      

      We pass "false" in the "is forward" parameter for `fetch` method, and "true" for "backwardFetch" method, which obviously is wrong.

       

      Attachments

        Issue Links

          Activity

            People

              showuon Luke Chen
              showuon Luke Chen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: