Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7101

Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.3.0, 1.3.1
    • 1.4.0
    • Table SQL / API
    • None

    Description

      When Non-windowed group-aggregate using minIdleStateRetentionTime config and retract AGG, Will emit "NULL" agg value which we do not expect.
      For example: (IntSumWithRetractAggFunction)
      1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true)
      2. Cleanup state
      3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, getValue= null

      So, we must change the logic of GroupAggProcessFunction as follows:

      if (inputCnt != 0) {
           ...
          } else {
           ...
          }
      

      TO

      if (inputCnt > 0) {
           ...
          } else {
          if( null != prevRow.row){
           ...
           }
          }
      

      In this case, the result will bigger than expected, but i think it's make sense, because user want cleanup state.(they should know the impact)
      What do you think? fhueske hequn8128

      Attachments

        1. screenshot-1.png
          116 kB
          sunjincheng

        Issue Links

          Activity

            People

              sunjincheng121 sunjincheng
              sunjincheng121 sunjincheng
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: