Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-36626

Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.20

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 1.18.1, 1.20.0
    • None
    • Table SQL / API
    • None
    • AWS Managed Apache Flink 

    • Important

    Description

      There is a behavior change I found when migrating to Flink 1.18 or Flink 1.20 from Flink 1.15 in regards to Flink SQL temporal lookup joins that I haven't been able to pin point and is causing the query below to output different results.

      Flink SQL Query:

      WITH assets_setpoint AS (
        SELECT
          asset_id,
          TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,
          TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,
          LAST_VALUE(`value`) AS `value`
        FROM asset_readings
        JOIN metric FOR SYSTEM_TIME AS OF `proctime`
        ON metric.metric_id = asset_readings.metric_id
        WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')
        GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id
      ),
      assets_supply_air_temp AS (
        -- CAST needed to perform regular joins instead of temporal joins in the outer query
        SELECT CAST(asset_readings.`timestamp` AS TIMESTAMP) AS `timestamp`,
            asset_readings.asset_id,
            asset_readings.`value` AS `value`
        FROM asset_readings
        -- Metrics temporal lookup inner join
        JOIN metric FOR SYSTEM_TIME AS OF `proctime`
        ON metric.metric_id = asset_readings.metric_id
        -- Assets to ignore for this computed metric definition temporal lookup left join
        LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`
        ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId
        AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id
        WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')
        -- Filter assets not present in the asset to ignore for this computed metric definition table
        AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL
      )
      SELECT
      assets_supply_air_temp.`timestamp`,
      assets_supply_air_temp.asset_id,
      assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`
      FROM assets_supply_air_temp
      INNER JOIN assets_setpoint
      ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id
      AND assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp

       

      Schema:
      {}{}{}{}{}{}{}{}{}{}{}{}{}{}{}----{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}-------{+}
      |      name |                        type |  null | key |        extras |          watermark |
      {}{}{}{}{}{}{}{}{}{}{}{}{}{}{}----{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}-------{+}
      | timestamp |  TIMESTAMP_LTZ(3) ROWTIME |  TRUE |     |               | SOURCE_WATERMARK() |
      |  asset_id |                      BIGINT |  TRUE |     |               |                    |
      | metric_id |                         INT |  TRUE |     |               |                    |
      |     value |                      DOUBLE |  TRUE |     |               |                    |
      |  metadata |         MAP<STRING, STRING> |  TRUE |     |               |                    |
      |  proctime | TIMESTAMP_LTZ(3) PROCTIME | FALSE |     | AS PROCTIME() |                    |
      {}{}{}{}{}{}{}{}{}{}{}{}{}{}{}----{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}-------{+}
      6 rows in set
      ------------------------------------------------
      |                                     table name |
      ------------------------------------------------
      |                                 asset_readings |
      |              asset_relationship_parent_to_unit |
      | asset_to_ignore_per_computed_metric_definition |
      |                                         metric |
      ------------------------------------------------

      Results:

      • On Flink 1.15 the difference (assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data)
      • On Flink 1.18+, for the same query, this difference always results in 0
      • On Flink 1.18+, updating the query to use regular join against the metric lookup table (removing FOR SYSTEM_TIME AS OF `proctime`) makes the query to output the correct value, however this causes a performance hit as the assets_readings table is built from a Kinesis data stream and the metric table can change over time.
      • Please see the attached "Task Execution Plan.txt" file to see the difference in temporal joins between Flink 1.15 and Flink 1.20

       

      I have tried updating the query using different formats with temporal joins but I have not found a workaround and I don't know why this is happening. 

      Any help would be appreciated

       

      Attachments

        1. Tasks Execution Plan.txt
          8 kB
          Eduardo Breijo

        Issue Links

          Activity

            People

              Unassigned Unassigned
              eduardobreijo Eduardo Breijo
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: