Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
In current code base, we use OperatorPlan#forceConnect() while merge the physical plan of spliter and splittee in MultiQueryOptimizationSpark.
The difference between OperatorPlan#connect and OperatorPlan#forceConnect is not checking whether support multiOutputs and multiInputs or not in forceConnect.
/** * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs * and whether to operator supports multiInputs * * @param from Operator data will flow from. * @param to Operator data will flow to. * @throws PlanException if connect from or to which is not in the plan */ public void forceConnect(E from, E to) throws PlanException { markDirty(); // Check that both nodes are in the plan. checkInPlan(from); checkInPlan(to); mFromEdges.put(from, to); mToEdges.put(to, from); }
Let's use an example to explain why add forceConnect before.
A = load './split5' AS (a0:int, a1:int, a2:int); B = foreach A generate a0, a1; C = join A by a0, B by a0; D = filter C by A::a1>=B::a1; store D into './split5.out';
before multiquery optimization
scope-37->scope-43 scope-43 #-------------------------------------------------- # Spark Plan #-------------------------------------------------- Spark node scope-37 Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage) - scope-38 | |---A: New For Each(false,false,false)[bag] - scope-10 | | | Cast[int] - scope-2 | | | |---Project[bytearray][0] - scope-1 | | | Cast[int] - scope-5 | | | |---Project[bytearray][1] - scope-4 | | | Cast[int] - scope-8 | | | |---Project[bytearray][2] - scope-7 | |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-0-------- Spark node scope-43 D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36 | |---D: Filter[bag] - scope-32 | | | Greater Than or Equal[boolean] - scope-35 | | | |---Project[int][1] - scope-33 | | | |---Project[int][4] - scope-34 | |---C: New For Each(true,true)[tuple] - scope-31 | | | Project[bag][1] - scope-29 | | | Project[bag][2] - scope-30 | |---C: Package(Packager)[tuple]{int} - scope-24 | |---C: Global Rearrange[tuple] - scope-23 | |---C: Local Rearrange[tuple]{int}(false) - scope-25 | | | | | Project[int][0] - scope-26 | | | |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage) - scope-39 | |---C: Local Rearrange[tuple]{int}(false) - scope-27 | | | Project[int][0] - scope-28 | |---B: New For Each(false,false)[bag] - scope-20 | | | Project[int][0] - scope-16 | | | Project[int][1] - scope-18 | |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage) - scope-41--------
after multiquery optimization
after multiquery optimization: scope-37 #-------------------------------------------------- # Spark Plan #-------------------------------------------------- Spark node scope-37 D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36 | |---D: Filter[bag] - scope-32 | | | Greater Than or Equal[boolean] - scope-35 | | | |---Project[int][1] - scope-33 | | | |---Project[int][4] - scope-34 | |---C: New For Each(true,true)[tuple] - scope-31 | | | Project[bag][1] - scope-29 | | | Project[bag][2] - scope-30 | |---C: Package(Packager)[tuple]{int} - scope-24 | |---C: Global Rearrange[tuple] - scope-23 | |---C: Local Rearrange[tuple]{int}(false) - scope-25 | | | | | Project[int][0] - scope-26 | | | |---A: New For Each(false,false,false)[bag] - scope-10 | | | | | Cast[int] - scope-2 | | | | | |---Project[bytearray][0] - scope-1 | | | | | Cast[int] - scope-5 | | | | | |---Project[bytearray][1] - scope-4 | | | | | Cast[int] - scope-8 | | | | | |---Project[bytearray][2] - scope-7 | | | |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-0 | |---C: Local Rearrange[tuple]{int}(false) - scope-27 | | | Project[int][0] - scope-28 | |---B: New For Each(false,false)[bag] - scope-20 | | | Project[int][0] - scope-16 | | | Project[int][1] - scope-18 | |---A: New For Each(false,false,false)[bag] - scope-10 | | | Cast[int] - scope-2 | | | |---Project[bytearray][0] - scope-1 | | | Cast[int] - scope-5 | | | |---Project[bytearray][1] - scope-4 | | | Cast[int] - scope-8 | | | |---Project[bytearray][2] - scope-7 | |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-0--------
We connect ForEach(scope-10) in SparkNode(scope-37) with ForEach(scope-20) and LocalRearrange(scope-25) in SparkNode(scope-43). The successors of ForEach(scope-10) are scope-20 and scope-25 after multiquery optimization. Here we need use OperatorPlan#forceConnect(from, to) because POForEach#supportsMultipleOutputs are false. Why there is no problem in mr mode? in mr, clone ForEach(scope-10) as ForEach(scope-xxx), so the size of successors of POForEach is always 1.
before multiquery optimization in mr mode:
#-------------------------------------------------- # Map Reduce Plan #-------------------------------------------------- MapReduce node scope-37 Map Plan Store(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage) - scope-38 | |---A: New For Each(false,false,false)[bag] - scope-10 | | | Cast[int] - scope-2 | | | |---Project[bytearray][0] - scope-1 | | | Cast[int] - scope-5 | | | |---Project[bytearray][1] - scope-4 | | | Cast[int] - scope-8 | | | |---Project[bytearray][2] - scope-7 | |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-0-------- Global sort: false ---------------- MapReduce node scope-43 Map Plan Union[tuple] - scope-44 | |---C: Local Rearrange[tuple]{int}(false) - scope-25 | | | | | Project[int][0] - scope-26 | | | |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage) - scope-39 | |---C: Local Rearrange[tuple]{int}(false) - scope-27 | | | Project[int][0] - scope-28 | |---B: New For Each(false,false)[bag] - scope-20 | | | Project[int][0] - scope-16 | | | Project[int][1] - scope-18 | |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage) - scope-41-------- Reduce Plan D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36 | |---D: Filter[bag] - scope-32 | | | Greater Than or Equal[boolean] - scope-35 | | | |---Project[int][1] - scope-33 | | | |---Project[int][4] - scope-34 | |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24-------- Global sort: false ----------------
after multiquery optimization in mr mode, scope-53 and scope-20 is the clone of scope-10
#-------------------------------------------------- # Map Reduce Plan #-------------------------------------------------- MapReduce node scope-43 Map Plan Union[tuple] - scope-44 | |---C: Local Rearrange[tuple]{int}(false) - scope-25 | | | | | Project[int][0] - scope-26 | | | |---A: New For Each(false,false,false)[bag] - scope-53 | | | | | Cast[int] - scope-48 | | | | | |---Project[bytearray][0] - scope-47 | | | | | Cast[int] - scope-50 | | | | | |---Project[bytearray][1] - scope-49 | | | | | Cast[int] - scope-52 | | | | | |---Project[bytearray][2] - scope-51 | | | |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-46 | |---C: Local Rearrange[tuple]{int}(false) - scope-27 | | | Project[int][0] - scope-28 | |---B: New For Each(false,false)[bag] - scope-20 | | | Project[int][0] - scope-16 | | | Project[int][1] - scope-18 | |---A: New For Each(false,false,false)[bag] - scope-61 | | | Cast[int] - scope-56 | | | |---Project[bytearray][0] - scope-55 | | | Cast[int] - scope-58 | | | |---Project[bytearray][1] - scope-57 | | | Cast[int] - scope-60 | | | |---Project[bytearray][2] - scope-59 | |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-54-------- Reduce Plan D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36 | |---D: Filter[bag] - scope-32 | | | Greater Than or Equal[boolean] - scope-35 | | | |---Project[int][1] - scope-33 | | | |---Project[int][4] - scope-34 | |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24-------- Global sort: false ----------------
Attachments
Attachments
Issue Links
- is related to
-
PIG-4797 Optimization for join/group case for spark mode
- Closed
- links to