Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.10.1
Description
For time range bounded over aggregation in streaming query, like below,
table .window(Over.partitionBy 'a orderBy 'rowtime preceding 1.hour as 'w) .groupBy('w) .select('a, aggregateFunction('b))
the operator must hold incoming records over the preceding time range in the state, but older records are no longer required and can be cleaned up.
Current implementation retracts the old records only when newer records come in and so the operator knows that enough time has passed. However, the retraction never happens unless a new record with the same key comes in and this causes a state that perhaps will never be released, which leads to an unlimitedly growing state especially when the keyspace mutates over time.
Since aggregate over bounded preceding time interval doesn't require old records by its nature, we can improve this by adding a timer that notifies the operator to retract old records, resulting in no changes in query result or severe performance degrade.
This is a distinct feature from state retention: state retention is to forget some states that are expected to be less important to reduce state memory, so it possibly changes query results. Enabling and disabling state retention both make sense with this change.
This issue applies to both row time range bound and proc time range bound. That is, we are going to have changes in both RowTimeRangeBoundedPrecedingFunction and ProcTimeRangeBoundedPrecedingFunction in flink-table-runtime-blink. I already have a running-in-production version with this change and would be glad to contribute.
Attachments
Issue Links
- links to