Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20405

The LAG function in over window is not implemented correctly

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.12.0
    • None
    • Table SQL / Runtime
    • None

    Description

      For LAG(input, offset, default) function in over window, it always return current row's input no matter how the offset is set.

      After see the codegen code of the function, I think the implementation is not correct and need to correct.

      
      // the offset and default value is never used
      public UnboundedOverAggregateHelper$24(java.lang.Object[] references) throws Exception {                                    constant$14 = ((int) 1);            constant$14isNull = false;                                                constant$15 = ((org.apache.flink.table.data.binary.BinaryStringData) str$13);            constant$15isNull = false;                        typeSerializer$19 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));          }
      
      public void accumulate(org.apache.flink.table.data.RowData accInput) throws Exception {                        org.apache.flink.table.data.binary.BinaryStringData field$21;            boolean isNull$21;            org.apache.flink.table.data.binary.BinaryStringData field$22;            isNull$21 = accInput.isNullAt(2);            field$21 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;            if (!isNull$21) {              field$21 = ((org.apache.flink.table.data.binary.BinaryStringData) accInput.getString(2));            }            field$22 = field$21;            if (!isNull$21) {              field$22 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$19.copy(field$22));            }                                                if (agg0_leadlag != field$22) {              agg0_leadlag = ((org.apache.flink.table.data.binary.BinaryStringData) typeSerializer$19.copy(field$22));            }                   ;            agg0_leadlagIsNull = isNull$21;                                         }
      

       

      The question comes from user mail list[1]

      [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkSQL-kafka-gt-dedup-gt-kafka-td39335.html

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              leonard Leonard Xu
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: