Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7593

Generated plan does not create correct groups

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Duplicate
    • 1.3.2
    • None
    • API / DataSet
    • None
    • Windows 7, Ubuntu 16.04, Flink 1.3.2

    Description

      Under specific circumstances Flink seems to generate an execution plan that is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files contained multiple entries per group, the grouping did not occur. After some work I managed to reduce the relevant part of our code to the minimal test case below. Be careful: All parts need to be present, even the irrelevant secondary output. If I remove anything else Flink generates correct code (either by introducing a combiner node prior to the reducer or by using "Sum (combine))" an the edge before the reducer.

      import java.util.ArrayList;
      import java.util.Collection;
      
      import org.apache.flink.api.java.DataSet;
      import org.apache.flink.api.java.ExecutionEnvironment;
      import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.core.fs.FileSystem.WriteMode;
      import org.apache.flink.types.LongValue;
      import org.apache.flink.util.LongValueSequenceIterator;
      
      public class FlinkOptimizerBug {
      
        public static void main(String[] args) throws Exception {
      
          ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          
          DataSet<Tuple2<Long, Long>> x = 
              env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class)
              .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
              .join(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class)
                  .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
              .where(0).equalTo(0).with((t1,t2) -> t1)
              .union(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class)
                  .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
              .map(l->l)
              .withForwardedFields("f0;f1");
          
          Collection  out = new ArrayList();
          x.output(new LocalCollectionOutputFormat<>(out ));
          
          x.groupBy(0)
          .sum(1) //BUG: this will not be grouped correctly, so there will be multiple outputs per group!
          .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
          .setParallelism(1);
          env.setParallelism(4);
          
          System.out.println(env.getExecutionPlan());
          env.execute();
        }
      }
      

      Invalid execution plan generated:

      {
      	"nodes": [
      
      	{
      		"id": 5,
      		"type": "source",
      		"pact": "Data Source",
      		"contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
      		"parallelism": "4",
      		"global_properties": [
      			{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "(none)" },
      			{ "name": "Grouping", "value": "not grouped" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "0.0" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "0.0" },
      			{ "name": "Cumulative Disk I/O", "value": "0.0" },
      			{ "name": "Cumulative CPU", "value": "0.0" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 4,
      		"type": "pact",
      		"pact": "Map",
      		"contents": "Map at main(FlinkOptimizerBug.java:24)",
      		"parallelism": "4",
      		"predecessors": [
      			{"id": 5, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
      		],
      		"driver_strategy": "Map",
      		"global_properties": [
      			{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "(none)" },
      			{ "name": "Grouping", "value": "not grouped" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "0.0" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "0.0" },
      			{ "name": "Cumulative Disk I/O", "value": "0.0" },
      			{ "name": "Cumulative CPU", "value": "0.0" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 7,
      		"type": "source",
      		"pact": "Data Source",
      		"contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
      		"parallelism": "4",
      		"global_properties": [
      			{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "(none)" },
      			{ "name": "Grouping", "value": "not grouped" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "0.0" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "0.0" },
      			{ "name": "Cumulative Disk I/O", "value": "0.0" },
      			{ "name": "Cumulative CPU", "value": "0.0" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 6,
      		"type": "pact",
      		"pact": "Map",
      		"contents": "Map at main(FlinkOptimizerBug.java:26)",
      		"parallelism": "4",
      		"predecessors": [
      			{"id": 7, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
      		],
      		"driver_strategy": "Map",
      		"global_properties": [
      			{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "(none)" },
      			{ "name": "Grouping", "value": "not grouped" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "0.0" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "0.0" },
      			{ "name": "Cumulative Disk I/O", "value": "0.0" },
      			{ "name": "Cumulative CPU", "value": "0.0" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 3,
      		"type": "pact",
      		"pact": "Join",
      		"contents": "Join at main(FlinkOptimizerBug.java:27)",
      		"parallelism": "4",
      		"predecessors": [
      			{"id": 4, "side": "first", "ship_strategy": "Hash Partition on [0]", "exchange_mode": "PIPELINED"},
      			{"id": 6, "side": "second", "ship_strategy": "Hash Partition on [0]", "exchange_mode": "PIPELINED"}
      		],
      		"driver_strategy": "Hybrid Hash (build: Map at main(FlinkOptimizerBug.java:24) (id: 4))",
      		"global_properties": [
      			{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "(none)" },
      			{ "name": "Grouping", "value": "not grouped" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "(unknown)" },
      			{ "name": "Disk I/O", "value": "(unknown)" },
      			{ "name": "CPU", "value": "(unknown)" },
      			{ "name": "Cumulative Network", "value": "(unknown)" },
      			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
      			{ "name": "Cumulative CPU", "value": "(unknown)" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 9,
      		"type": "source",
      		"pact": "Data Source",
      		"contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
      		"parallelism": "4",
      		"global_properties": [
      			{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "(none)" },
      			{ "name": "Grouping", "value": "not grouped" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "0.0" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "0.0" },
      			{ "name": "Cumulative Disk I/O", "value": "0.0" },
      			{ "name": "Cumulative CPU", "value": "0.0" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 8,
      		"type": "pact",
      		"pact": "Map",
      		"contents": "Map at main(FlinkOptimizerBug.java:29)",
      		"parallelism": "4",
      		"predecessors": [
      			{"id": 9, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
      		],
      		"driver_strategy": "Map",
      		"global_properties": [
      			{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "(none)" },
      			{ "name": "Grouping", "value": "not grouped" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "0.0" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "0.0" },
      			{ "name": "Cumulative Disk I/O", "value": "0.0" },
      			{ "name": "Cumulative CPU", "value": "0.0" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 2,
      		"type": "pact",
      		"pact": "Union",
      		"contents": "",
      		"parallelism": "4",
      		"predecessors": [
      			{"id": 3, "side": "first", "ship_strategy": "Forward", "exchange_mode": "PIPELINED"},
      			{"id": 8, "side": "second", "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
      		],
      		"global_properties": [
      			{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "(none)" },
      			{ "name": "Grouping", "value": "not grouped" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "0.0" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "(unknown)" },
      			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
      			{ "name": "Cumulative CPU", "value": "(unknown)" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 1,
      		"type": "pact",
      		"pact": "Map",
      		"contents": "Map at main(FlinkOptimizerBug.java:30)",
      		"parallelism": "4",
      		"predecessors": [
      			{"id": 2, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]", "exchange_mode": "PIPELINED"}
      		],
      		"driver_strategy": "Map",
      		"global_properties": [
      			{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
      			{ "name": "Partitioned on", "value": "[0]" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "[0:ASC]" },
      			{ "name": "Grouped on", "value": "[0]" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "(unknown)" },
      			{ "name": "Disk I/O", "value": "(unknown)" },
      			{ "name": "CPU", "value": "(unknown)" },
      			{ "name": "Cumulative Network", "value": "(unknown)" },
      			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
      			{ "name": "Cumulative CPU", "value": "(unknown)" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 0,
      		"type": "sink",
      		"pact": "Data Sink",
      		"contents": "org.apache.flink.api.java.io.LocalCollectionOutputFormat@52feb982",
      		"parallelism": "4",
      		"predecessors": [
      			{"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
      		],
      		"global_properties": [
      			{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
      			{ "name": "Partitioned on", "value": "[0]" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "[0:ASC]" },
      			{ "name": "Grouped on", "value": "[0]" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "0.0" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "(unknown)" },
      			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
      			{ "name": "Cumulative CPU", "value": "(unknown)" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 11,
      		"type": "pact",
      		"pact": "GroupReduce",
      		"contents": "SUM(1), at main(FlinkOptimizerBug.java:35",
      		"parallelism": "4",
      		"predecessors": [
      			{"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
      		],
      		"driver_strategy": "Sorted Group Reduce",
      		"global_properties": [
      			{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
      			{ "name": "Partitioned on", "value": "[0]" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "[0:ASC]" },
      			{ "name": "Grouped on", "value": "[0]" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "0.0" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "(unknown)" },
      			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
      			{ "name": "Cumulative CPU", "value": "(unknown)" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	},
      	{
      		"id": 10,
      		"type": "sink",
      		"pact": "Data Sink",
      		"contents": "CsvOutputFormat (path: /tmp/foo, delimiter: ,)",
      		"parallelism": "1",
      		"predecessors": [
      			{"id": 11, "ship_strategy": "Redistribute", "exchange_mode": "PIPELINED"}
      		],
      		"global_properties": [
      			{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
      			{ "name": "Partitioning Order", "value": "(none)" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"local_properties": [
      			{ "name": "Order", "value": "(none)" },
      			{ "name": "Grouping", "value": "not grouped" },
      			{ "name": "Uniqueness", "value": "not unique" }
      		],
      		"estimates": [
      			{ "name": "Est. Output Size", "value": "(unknown)" },
      			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
      		"costs": [
      			{ "name": "Network", "value": "(unknown)" },
      			{ "name": "Disk I/O", "value": "0.0" },
      			{ "name": "CPU", "value": "0.0" },
      			{ "name": "Cumulative Network", "value": "(unknown)" },
      			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
      			{ "name": "Cumulative CPU", "value": "(unknown)" }
      		],
      		"compiler_hints": [
      			{ "name": "Output Size (bytes)", "value": "(none)" },
      			{ "name": "Output Cardinality", "value": "(none)" },
      			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
      			{ "name": "Filter Factor", "value": "(none)" }		]
      	}
      	]
      }
      

      Output in CSV file:

      0,69
      1,63
      2,58
      3,61
      0,68
      1,65
      2,48
      3,55290
      0,58
      1,64
      2,67885
      3,61
      0,61031
      1,66673
      2,64
      3,67 
      

      Expected output (for example when removing the unnecessary collection output from the code):

      0,58096
      1,70450
      3,66549
      2,56882 
      

      Attachments

        1. flink-good-plan.json
          17 kB
          Chunhui Shi

        Activity

          People

            Unassigned Unassigned
            smee0815 Steffen Dienst
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: