Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29849

Event time temporal join on an upsert source may produce incorrect execution plan

    XMLWordPrintableJSON

Details

    • Hide
      This resolves the correctness issue when do event time temporal join with a versioned table backed by an upsert source. When the right input of the join is an upsert source, it no longer generates a ChangelogNormalize node for it.
      Note this is an incompatible plan change compare to 1.16.0
      Show
      This resolves the correctness issue when do event time temporal join with a versioned table backed by an upsert source. When the right input of the join is an upsert source, it no longer generates a ChangelogNormalize node for it. Note this is an incompatible plan change compare to 1.16.0

    Description

      For current implementation, the execution plan is incorrect when do event time temporal join on an upsert source. There's two problems:
      1. for an upsert source, we should not add a ChangelogNormalize node under a temporal join input, or it will damage the versions of the version table. For versioned tables, we use a single-temporal mechanism which relies sequencial records of a same key to ensure the valid period of each version, so if the ChangelogNormalize was added then an UB message will be produced based on the previous UA or Insert message, and all the columns are totally same include event time, e.g.,
      original upsert input

      +I (key1, '2022-11-02 10:00:00', a1)
      +U (key1, '2022-11-02 10:01:03', a2)
      

      the versioned data should be:

      v1  [~, '2022-11-02 10:00:00')
      v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
      

      after ChangelogNormalize's processing, will output:

      +I (key1, '2022-11-02 10:00:00', a1)
      -U (key1, '2022-11-02 10:00:00', a1)
      +U (key1, '2022-11-02 10:01:03', a2)
      

      versions are incorrect:

      v1  ['2022-11-02 10:00:00', '2022-11-02 10:00:00')  // invalid period
      v2  ['2022-11-02 10:00:00', '2022-11-02 10:01:03')
      

      2. semantically, a filter cannot be pushed into an event time temporal join, otherwise, the filter may also corrupt the versioned table

      Attachments

        Activity

          People

            lincoln.86xy lincoln lee
            lincoln.86xy lincoln lee
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: