Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Introduction
In Flink, after applying a group by, users may use LAST_VALUE to process certain fields to ensure that all fields have corresponding aggregation functions. Currently, LAST_VALUE does not support the ROW type syntax, so users always apply the LAST_VALUE function to each individual field separately, as shown below.
SELECT
LAST_VALUE(bool_a) AS last_bool_a,
LAST_VALUE(int2_b) AS last_int2_b,
LAST_VALUE(int4_c) AS last_int4_c,
LAST_VALUE(int8_d) AS last_int8_d,
LAST_VALUE(float4_e) AS last_float4_e,
LAST_VALUE(float4_f) AS last_float4_f,
LAST_VALUE(numeric_g) AS last_numeric_g,
LAST_VALUE(text_m) AS last_text_m,
LAST_VALUE(varchar_p) AS last_varchar_p,
date_h
FROM source_table
GROUP BY date_h
If the upstream operator is a retract stream, this approach will lead to redundant StateMap traversal. To facilitate retraction, Flink's internal{{ LastValueWithRetractAggFunction}} will store all historical data related to the primary key. When the last value is deleted, it will traverse all keys in the{{ orderToValue}} (which maps timestamps to data) and this MapView is stored in the form of StateMap. More LAST_VALUE functions leads to more times the read and write operations of RocksDB. Therefore, I advocate for handling ROW types with LAST_VALUE, allowing support for all fields with just one LAST_VALUE function as below.
SELECT
LAST_VALUE(
ROW(
bool_a,
int2_b,
int4_c,
int8_d,
float4_e,
float4_f,
numeric_g,
text_m,
varchar_p
)
) AS row_data,
date_h
FROM source_table
GROUP BY date_h
The experiment indicates that applying the ROW type to the LAST_VALUE function can improve the processing speed for retract streams, but has no effect on append-only streams.
Evaluation:
The throughput of jobs was compared based on whether the ROW type was used in the LAST_VALUE function, considering both retract and append-only scenarios.
Retraction
Use a deduplication operator to convert the append-only stream generated by datagen into a retract stream.
Two jobs show little difference in throughput (Row 4817: Mean 1808).
Through flame graph analysis, applying the ROW type to the LAST_VALUE
function reduces the consumption of the aggregate function calls to accumulate,
with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%).
LastValueWithRetractAccumulator uses MapState storage MapView.
Therefore, updating the LastValueWithRetractAccumulator requires reading from or writing to RocksDB.
AppendOnly
Two jobs show little difference in throughput (Row 13411: Mean 10673). Further examination of the flame graphs for both processes reveals that the bottleneck
for both jobs lies in getting RocksDBValueState which is called by GroupFunction.
Using ROW aggregation does not yield significant optimization in this part. I suspect it's
because Flink uses RowData to store data from multiple Accumulators, and every time
the accState invokes the value method, it reads all the Accumulators at the same time.
Therefore, the use of ROW optimization might not be very effective.
Conclusion
- Using ROW type for LAST_VALUE Aggregation can improve the processing speed for retract streams, with effectiveness proportional to the number of fields contained in the ROW.
- Using ROW type for LAST_VALUE Aggregation results in limited improvements , as the optimization effect on state backend read speed is not significant.