Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Duplicate
-
1.11.2, 1.12.0
-
None
Description
The lag aggregate function seems to always return the current row and not the row one lagged behind:
from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create( environment_settings=env_settings) t_env.execute_sql(""" CREATE TABLE datagen ( foo INT, message_time AS to_timestamp(from_unixtime(foo)), WATERMARK FOR message_time AS message_time ) WITH ( 'connector' = 'datagen', 'rows-per-second'='3', 'fields.foo.kind'='sequence', 'fields.foo.start'='1', 'fields.foo.end'='10' )""") t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)") t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')") t.execute_insert("output")
This results in
+I(1,1) // Expected (1, null) +I(2,2) // Expected (2, 1) +I(3,3) // Expected (3, 2) +I(4,4) // and so on +I(5,5) +I(6,6) +I(7,7) +I(8,8) +I(9,9) +I(10,10)
Attachments
Issue Links
- duplicates
-
FLINK-19449 LEAD/LAG cannot work correctly in streaming mode
- Closed