Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2883

Add documentation to forbid key-modifying ReduceFunction

    Details

      Description

      If one uses a combinable reduce operation which also changes the key value of the underlying data element, then the results of the reduce operation can become wrong. The reason is that after the combine phase, another reduce operator is executed which will then reduce the elements based on the new key values. This might be not so surprising if one explicitly defined ones GroupReduceOperation as combinable. However, the ReduceFunction conceals the fact that a combiner is used implicitly. Furthermore, the API does not prevent the user from changing the key fields which could solve the problem.

      The following example program illustrates the problem

      val env = ExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(1)
      
      val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))
      
      val result = input.groupBy(0).reduce{
        (left, right) =>
          (left._1 + right._1, left._2 + right._2)
      }
      
      result.output(new PrintingOutputFormat[Int]())
      
      env.execute()
      

      The expected output is

      (2, 5)
      (2, 3)
      (6, 7)
      

      However, the actual output is

      (4, 8)
      (6, 7)
      

      I think that the underlying problem is that associativity and commutativity is not sufficient for a combinable reduce operation. Additionally we also need to make sure that the key stays the same.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                greghogan Greg Hogan
                Reporter:
                till.rohrmann Till Rohrmann
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: