Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
1.18.1, 1.20.0
-
None
-
None
-
AWS Managed Apache Flink
-
Important
Description
There is a behavior change I found when migrating to Flink 1.18 or Flink 1.20 from Flink 1.15 in regards to Flink SQL temporal lookup joins that I haven't been able to pin point and is causing the query below to output different results.
Flink SQL Query:
WITH assets_setpoint AS (
SELECT
asset_id,
TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,
TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,
LAST_VALUE(`value`) AS `value`
FROM asset_readings
JOIN metric FOR SYSTEM_TIME AS OF `proctime`
ON metric.metric_id = asset_readings.metric_id
WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')
GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id
),
assets_supply_air_temp AS (
-- CAST needed to perform regular joins instead of temporal joins in the outer query
SELECT CAST(asset_readings.`timestamp` AS TIMESTAMP) AS `timestamp`,
asset_readings.asset_id,
asset_readings.`value` AS `value`
FROM asset_readings
-- Metrics temporal lookup inner join
JOIN metric FOR SYSTEM_TIME AS OF `proctime`
ON metric.metric_id = asset_readings.metric_id
-- Assets to ignore for this computed metric definition temporal lookup left join
LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`
ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId
AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id
WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')
-- Filter assets not present in the asset to ignore for this computed metric definition table
AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL
)
SELECT
assets_supply_air_temp.`timestamp`,
assets_supply_air_temp.asset_id,
assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`
FROM assets_supply_air_temp
INNER JOIN assets_setpoint
ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id
AND assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp
Schema:
{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}----{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}-------{+}
| name | type | null | key | extras | watermark |
{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}----{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}-------{+}
| timestamp | TIMESTAMP_LTZ(3) ROWTIME | TRUE | | | SOURCE_WATERMARK() |
| asset_id | BIGINT | TRUE | | | |
| metric_id | INT | TRUE | | | |
| value | DOUBLE | TRUE | | | |
| metadata | MAP<STRING, STRING> | TRUE | | | |
| proctime | TIMESTAMP_LTZ(3) PROCTIME | FALSE | | AS PROCTIME() | |
{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}----{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}-------{+}
6 rows in set
------------------------------------------------
| table name |
------------------------------------------------
| asset_readings |
| asset_relationship_parent_to_unit |
| asset_to_ignore_per_computed_metric_definition |
| metric |
------------------------------------------------
Results:
- On Flink 1.15 the difference (assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data)
- On Flink 1.18+, for the same query, this difference always results in 0
- On Flink 1.18+, updating the query to use regular join against the metric lookup table (removing FOR SYSTEM_TIME AS OF `proctime`) makes the query to output the correct value, however this causes a performance hit as the assets_readings table is built from a Kinesis data stream and the metric table can change over time.
- Please see the attached "Task Execution Plan.txt" file to see the difference in temporal joins between Flink 1.15 and Flink 1.20
I have tried updating the query using different formats with temporal joins but I have not found a workaround and I don't know why this is happening.
Any help would be appreciated
Attachments
Attachments
Issue Links
- is related to
-
FLINK-34170 Include the look up join conditions in the optimised plan.
- Open