Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33672

Use MapState.entries() instead of keys() and get() in over window

    XMLWordPrintableJSON

Details

    Description

      In code logic related with over windows, such asĀ org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction

      private transient MapState<Long, List<RowData>> inputState;
      
      public void onTimer(
              long timestamp,
              KeyedProcessFunction<K, RowData, RowData>.OnTimerContext ctx,
              Collector<RowData> out)
              throws Exception {
          //...
          Iterator<Long> iter = inputState.keys().iterator();
          //...
          while (iter.hasNext()) {
              Long elementKey = iter.next();
              if (elementKey < limit) {
                  // element key outside of window. Retract values
                  List<RowData> elementsRemove = inputState.get(elementKey);
                  // ...
              }
          }
          //...
      } 

      As we can see, there is a combination of key iteration and get the value for iterated key from inputState. However for RocksDB, the key iteration calls entry iteration, which means actually we could replace it by entry iteration without introducing any extra overhead. And as a result, we could save a function call of get() by using getValue() of iterated entry at very low cost.

      Attachments

        Issue Links

          Activity

            People

              zakelly Zakelly Lan
              zakelly Zakelly Lan
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: