Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20100

Lag aggregate function does not return lag, but current row

    XMLWordPrintableJSON

Details

    Description

      The lag aggregate function seems to always return the current row and not the row one lagged behind:

      from pyflink.table import EnvironmentSettings, StreamTableEnvironment
      
      env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
      t_env = StreamTableEnvironment.create( environment_settings=env_settings)
      
      t_env.execute_sql("""
      CREATE TABLE datagen (
       foo INT,
       message_time AS to_timestamp(from_unixtime(foo)),
       WATERMARK FOR message_time AS message_time
      ) WITH (
       'connector' = 'datagen',
       'rows-per-second'='3',
       'fields.foo.kind'='sequence',
       'fields.foo.start'='1',
       'fields.foo.end'='10'
      )""")
      
      t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)")
      
      t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')")
      t.execute_insert("output")
      

       This results in

      +I(1,1)  // Expected (1, null)
      +I(2,2)  // Expected (2, 1)
      +I(3,3)  // Expected (3, 2)
      +I(4,4)  // and so on
      +I(5,5)
      +I(6,6)
      +I(7,7)
      +I(8,8)
      +I(9,9)
      +I(10,10)
      

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              t.schneider Thilo Schneider
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: