Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33936

Outputting Identical Results in Mini-Batch Aggregation with Set TTL

    XMLWordPrintableJSON

Details

    Description

      If mini-batch is enabled currently, and if the aggregated result is the same as the previous output, this current aggregation result will not be sent downstream. This will cause downstream nodes to not receive updated data. If there is a TTL set for states at this time, the TTL of downstream will not be updated either.

      The specific logic is as follows.

      https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224

                          if (!equaliser.equals(prevAggValue, newAggValue)) {
                              // new row is not same with prev row
                              if (generateUpdateBefore) {
                                  // prepare UPDATE_BEFORE message for previous row
                                  resultRow
                                          .replace(currentKey, prevAggValue)
                                          .setRowKind(RowKind.UPDATE_BEFORE);
                                  out.collect(resultRow);
                              }
                              // prepare UPDATE_AFTER message for new row
                              resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
                              out.collect(resultRow);
                          }
                          // new row is same with prev row, no need to output
      

      When mini-batch is not enabled, even if the aggregation result of this time is the same as last time, new results will still be sent if TTL is set.

      https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170

      
                      if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, newAggValue)) {
                          // newRow is the same as before and state cleaning is not enabled.
                          // We do not emit retraction and acc message.
                          // If state cleaning is enabled, we have to emit messages to prevent too early
                          // state eviction of downstream operators.
                          return;
                      } else {
                          // retract previous result
                          if (generateUpdateBefore) {
                              // prepare UPDATE_BEFORE message for previous row
                              resultRow
                                      .replace(currentKey, prevAggValue)
                                      .setRowKind(RowKind.UPDATE_BEFORE);
                              out.collect(resultRow);
                          }
                          // prepare UPDATE_AFTER message for new row
                          resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
                      }
      

      Therefore, based on the consideration of TTL scenarios, I believe that when mini-batch aggregation is enabled, new results should also output when the aggregated result is the same as the previous one.

      Attachments

        Issue Links

          Activity

            People

              hackergin Feng Jin
              hackergin Feng Jin
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: