Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10100

Optimizer pushes partitioning past Null-Filter

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Critical
    • Resolution: Unresolved
    • Affects Version/s: 1.3.3, 1.4.2, 1.5.2, 1.6.0, 1.7.0
    • Fix Version/s: None
    • Component/s: API / DataSet
    • Labels:
      None

      Description

      The DataSet optimizer pushes certain operations like partitioning or sorting past Filter operators.
      It does that because it knows that a FilterFunction cannot modify the records but only indicate whether a record should be forwarded or not.

      However, this causes problems if the filter should remove records with null keys. In that case, the partitioning can be pushed past the filter such that the partitioner has to deal with null keys. This can fail with a NullPointerException.

      The following code produces an affected plan.

      List<Row> rowList = new ArrayList<>();
      rowList.add(Row.of(null, 1L));
      rowList.add(Row.of(2L, 2L));
      rowList.add(Row.of(2L, 2L));
      rowList.add(Row.of(3L, 3L));
      rowList.add(Row.of(null, 3L));
      
      DataSet<Row> rows = env.fromCollection(rowList, Types.ROW(Types.LONG, Types.LONG));
      
      DataSet<Long> result = rows
        .filter(r -> r.getField(0) != null)
          .setParallelism(4)
        .groupBy(0)
        .reduceGroup((Iterable<Row> vals, Collector<Long> out) -> {
            long cnt = 0L;
            for(Row v : vals) { cnt++; }
              out.collect(cnt);
            }).returns(Types.LONG)
              .setParallelism(4);
      
      result.output(new DiscardingOutputFormat());
      System.out.println(env.getExecutionPlan());
      

      To resolve the problem, we could remove the field-forward property of FilterFunction. In general, it is typically more efficient to filter before shipping or sorting data. So this might also improve the performance of certain plans.

      As a workaround until this bug is fix, users can implement the filter with a FlatMapFunction. FlatMapFunction is a more generic interface and the optimizer cannot automatically infer how the function behaves and won't push partitionings or sorts past the function.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                fhueske Fabian Hueske
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated: