Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33722

MATCH_RECOGNIZE in batch mode ignores events order

    XMLWordPrintableJSON

Details

    Description

      MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider the following example:

          FROM events
              MATCH_RECOGNIZE (
                  PARTITION BY user_id
                  ORDER BY ts ASC
                  MEASURES
                      FIRST(A.ts) as _start,
                      LAST(A.ts) as _middle,
                      LAST(B.ts) as _finish
                  ONE ROW PER MATCH
                  AFTER MATCH SKIP PAST LAST ROW
                  PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
                  DEFINE
                      A AS active is false,
                      B AS active is true
              ) AS T 

      where events is a Postgresql table containing ~10000 records.

      CREATE TABLE events (
        id INT,
        user_id INT,
        ts TIMESTAMP(3),
        active BOOLEAN,
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
        PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:postgresql://postgres:5432/test',
          'username' = 'test',
          'password' = 'test',
          'table-name' = 'events'
      ); 

      It can happen that _finish is smaller than _start or _middle, which is wrong.

         user_id                  _start                 _middle                 _finish
               1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 14:34:44.264

       

      Repository where I reproduced the problem: https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging


       

      According to dwysakowicz:  In BATCH the CepOperator is always created to process records in processing time:
      https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54
      A comparator is passed along to the operator covering the sorting on ts field: https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173 but this is only secondary sorting. It is applied only within records of the same timestamp.

      Attachments

        Issue Links

          Activity

            People

              grzegorz.kolakowski Grzegorz Kołakowski
              grzegorz.kolakowski Grzegorz Kołakowski
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: