Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10566

Flink Planning is exponential in the number of stages

    XMLWordPrintableJSON

Details

    Description

      This makes it nearly impossible to run graphs with 100 or more stages. (The execution itself is still sub-second, but the job submission takes increasingly long.)

      I can reproduce this with the following pipeline, which resembles my real-world workloads (with depth up to 10 and width up, and past, 50). On Flink it seems getting width beyond width 10 is problematic (times out after hours). Note the log scale on the chart for time. 

       

        public static void runPipeline(int depth, int width) throws Exception {
          final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      
          DataSet<String> input = env.fromElements("a", "b", "c");
          DataSet<String> stats = null;
      
          for (int i = 0; i < depth; i++) {
            stats = analyze(input, stats, width / (i + 1) + 1);
          }
      
          stats.writeAsText("out.txt");
          env.execute("depth " + depth + " width " + width);
        }
      
        public static DataSet<String> analyze(DataSet<String> input, DataSet<String> stats, int branches) {
          System.out.println("analyze " + branches);
          for (int i = 0; i < branches; i++) {
            final int ii = i;
      
            if (stats != null) {
              input = input.map(new RichMapFunction<String, String>() {
                  @Override
                  public void open(Configuration parameters) throws Exception {
                    Collection<String> broadcastSet = getRuntimeContext().getBroadcastVariable("stats");
                  }
                  @Override
                  public String map(String value) throws Exception {
                    return value;
                  }
                }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
            }
      
            DataSet<String> branch = input
                                     .map(s -> new Tuple2<Integer, String>(0, s + ii))
                                     .groupBy(0)
                                     .minBy(1)
                                     .map(kv -> kv.f1);
            if (stats == null) {
              stats = branch;
            } else {
              stats = stats.union(branch);
            }
          }
          return stats.map(s -> "(" + s + ").stats");
        }
      
      

      Attachments

        1. chart.png
          12 kB
          Robert Bradshaw

        Issue Links

          Activity

            People

              mxm Maximilian Michels
              robertwb Robert Bradshaw
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m