Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-36377

Support the use of the LAST_VALUE aggregate function on ROW type data

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None

    Description

      Introduction

      In Flink, after applying a group by, users may use LAST_VALUE to process certain fields to ensure that all fields have corresponding aggregation functions. Currently, LAST_VALUE does not support the ROW type syntax, so users always apply the LAST_VALUE function to each individual field separately, as shown below.
      SELECT
          LAST_VALUE(bool_a) AS last_bool_a,
          LAST_VALUE(int2_b) AS last_int2_b,
          LAST_VALUE(int4_c) AS last_int4_c,
          LAST_VALUE(int8_d) AS last_int8_d,
          LAST_VALUE(float4_e) AS last_float4_e,
          LAST_VALUE(float4_f) AS last_float4_f,
          LAST_VALUE(numeric_g) AS last_numeric_g,
          LAST_VALUE(text_m) AS last_text_m,
          LAST_VALUE(varchar_p) AS last_varchar_p,
          date_h
      FROM source_table
      GROUP BY date_h
       
      If the upstream operator is a retract stream, this approach will lead to redundant StateMap traversal. To facilitate retraction, Flink's internal{{ LastValueWithRetractAggFunction}} will store all historical data related to the primary key. When the last value is deleted, it will traverse all keys in the{{ orderToValue}} (which maps timestamps to data) and this MapView is stored in the form of StateMap. More LAST_VALUE functions leads to more times the read and write operations of RocksDB. Therefore, I advocate for handling ROW types with LAST_VALUE, allowing support for all fields with just one LAST_VALUE function as below.
      SELECT
       LAST_VALUE(
          ROW(
              bool_a,
              int2_b,
              int4_c,
              int8_d,
              float4_e,
              float4_f,
              numeric_g,
              text_m,
              varchar_p
          )
      ) AS row_data,
      date_h
      FROM source_table
      GROUP BY date_h
      The experiment indicates that applying the ROW type to the LAST_VALUE function can improve the processing speed for retract streams, but has no effect on append-only streams.

      Evaluation:

      The throughput of jobs was compared based on whether the ROW type was used in the LAST_VALUE function, considering both retract and append-only scenarios.

      Retraction

      Use a deduplication operator to convert the append-only stream generated by datagen into a retract stream.

      Two jobs show little difference in throughput (Row 4817: Mean 1808).
      Through flame graph analysis, applying the ROW type to the LAST_VALUE
      function reduces the consumption of the aggregate function calls to accumulate,
      with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%).
      LastValueWithRetractAccumulator uses MapState storage MapView.
      Therefore, updating the LastValueWithRetractAccumulator requires reading from or writing to RocksDB.

      AppendOnly

      Two jobs show little difference in throughput (Row 13411: Mean 10673). Further examination of the flame graphs for both processes reveals that the bottleneck
      for both jobs lies in getting RocksDBValueState which is called by GroupFunction.
      Using ROW aggregation does not yield significant optimization in this part. I suspect it's
      because Flink uses RowData to store data from multiple Accumulators, and every time
      the accState invokes the value method, it reads all the Accumulators at the same time.
      Therefore, the use of ROW optimization might not be very effective.

      Conclusion

      1. Using ROW type for LAST_VALUE Aggregation can improve the processing speed for retract streams, with effectiveness proportional to the number of fields contained in the ROW.
      2. Using ROW type for LAST_VALUE Aggregation results in limited improvements , as the optimization effect on state backend read speed is not significant.

      Attachments

        Activity

          People

            dezhouliyang Yang Li
            dezhouliyang Yang Li
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: