Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19592

MiniBatchGroupAggFunction should emit messages to prevent too early state eviction of downstream operators

    XMLWordPrintableJSON

Details

    Description

      Currently, GroupAggFunction will emit a retract and a new insert message when a new message with the same key arrives. According to Flink-8566, it's a feature to prevent too early state eviction of downstream operators.

      However, MiniBatchGroupAggFunction doesn't. Before Flink-8566 being resolved, it should also emit these messages.

      GroupAggFunction.java:

      if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) {
      	// newRow is the same as before and state cleaning is not enabled.
      	// We do not emit retraction and acc message.
      	// If state cleaning is enabled, we have to emit messages to prevent too early
      	// state eviction of downstream operators.
      	return;
      } else {
      	// retract previous result
      	if (generateUpdateBefore) {
      		// prepare UPDATE_BEFORE message for previous row
      		resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
      		out.collect(resultRow);
      	}
      	// prepare UPDATE_AFTER message for new row
      	resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
      }
      

      MiniBatchGroupAggFunction.java:

       

      if (!equaliser.equals(prevAggValue, newAggValue)) {
      	// new row is not same with prev row
      	if (generateUpdateBefore) {
      		// prepare UPDATE_BEFORE message for previous row
      		resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
      		out.collect(resultRow);
      	}
      	// prepare UPDATE_AFTER message for new row
      	resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
      	out.collect(resultRow);
      }
      // new row is same with prev row, no need to output
      

       

       

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              MinGW Smile
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated: