Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.17.1
Description
MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider the following example:
FROM events MATCH_RECOGNIZE ( PARTITION BY user_id ORDER BY ts ASC MEASURES FIRST(A.ts) as _start, LAST(A.ts) as _middle, LAST(B.ts) as _finish ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS DEFINE A AS active is false, B AS active is true ) AS T
where events is a Postgresql table containing ~10000 records.
CREATE TABLE events ( id INT, user_id INT, ts TIMESTAMP(3), active BOOLEAN, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://postgres:5432/test', 'username' = 'test', 'password' = 'test', 'table-name' = 'events' );
It can happen that _finish is smaller than _start or _middle, which is wrong.
user_id _start _middle _finish 1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 14:34:44.264
Repository where I reproduced the problem: https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging
According to dwysakowicz: In BATCH the CepOperator is always created to process records in processing time:
https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54
A comparator is passed along to the operator covering the sorting on ts field: https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173 but this is only secondary sorting. It is applied only within records of the same timestamp.
Attachments
Issue Links
- relates to
-
FLINK-24865 Support MATCH_RECOGNIZE in Batch mode
- Closed
- links to