Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-4856 Optimization for pig on spark
  3. PIG-4871

Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark

VotersWatch issueWatchersLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • spark-branch
    • spark
    • None

    Description

      In current code base, we use OperatorPlan#forceConnect() while merge the physical plan of spliter and splittee in MultiQueryOptimizationSpark.
      The difference between OperatorPlan#connect and OperatorPlan#forceConnect is not checking whether support multiOutputs and multiInputs or not in forceConnect.

       /**
           * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs
           * and whether to operator supports multiInputs
           *
           * @param from Operator data will flow from.
           * @param to   Operator data will flow to.
           * @throws PlanException if connect from or to which is not in the plan
           */
          public void forceConnect(E from, E to) throws PlanException {
              markDirty();
      
              // Check that both nodes are in the plan.
              checkInPlan(from);
              checkInPlan(to);
              mFromEdges.put(from, to);
              mToEdges.put(to, from);
          }
      
      

      Let's use an example to explain why add forceConnect before.

      A = load './split5'  AS (a0:int, a1:int, a2:int);
      B = foreach A generate a0, a1;
      C = join A by a0, B by a0;
      D = filter C by A::a1>=B::a1;
      store D into './split5.out';
      

      before multiquery optimization

      scope-37->scope-43 
      scope-43
      #--------------------------------------------------
      # Spark Plan                                  
      #--------------------------------------------------
      
      Spark node scope-37
      Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage) - scope-38
      |
      |---A: New For Each(false,false,false)[bag] - scope-10
          |   |
          |   Cast[int] - scope-2
          |   |
          |   |---Project[bytearray][0] - scope-1
          |   |
          |   Cast[int] - scope-5
          |   |
          |   |---Project[bytearray][1] - scope-4
          |   |
          |   Cast[int] - scope-8
          |   |
          |   |---Project[bytearray][2] - scope-7
          |
          |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-0--------
      
      Spark node scope-43
      D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
      |
      |---D: Filter[bag] - scope-32
          |   |
          |   Greater Than or Equal[boolean] - scope-35
          |   |
          |   |---Project[int][1] - scope-33
          |   |
          |   |---Project[int][4] - scope-34
          |
          |---C: New For Each(true,true)[tuple] - scope-31
              |   |
              |   Project[bag][1] - scope-29
              |   |
              |   Project[bag][2] - scope-30
              |
              |---C: Package(Packager)[tuple]{int} - scope-24
                  |
                  |---C: Global Rearrange[tuple] - scope-23
                      |
                      |---C: Local Rearrange[tuple]{int}(false) - scope-25
                      |   |   |
                      |   |   Project[int][0] - scope-26
                      |   |
                      |   |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage) - scope-39
                      |
                      |---C: Local Rearrange[tuple]{int}(false) - scope-27
                          |   |
                          |   Project[int][0] - scope-28
                          |
                          |---B: New For Each(false,false)[bag] - scope-20
                              |   |
                              |   Project[int][0] - scope-16
                              |   |
                              |   Project[int][1] - scope-18
                              |
                              |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage) - scope-41--------

      after multiquery optimization

      after multiquery optimization:
      scope-37
      #--------------------------------------------------
      # Spark Plan                                  
      #--------------------------------------------------
      
      Spark node scope-37
      D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
      |
      |---D: Filter[bag] - scope-32
          |   |
          |   Greater Than or Equal[boolean] - scope-35
          |   |
          |   |---Project[int][1] - scope-33
          |   |
          |   |---Project[int][4] - scope-34
          |
          |---C: New For Each(true,true)[tuple] - scope-31
              |   |
              |   Project[bag][1] - scope-29
              |   |
              |   Project[bag][2] - scope-30
              |
              |---C: Package(Packager)[tuple]{int} - scope-24
                  |
                  |---C: Global Rearrange[tuple] - scope-23
                      |
                      |---C: Local Rearrange[tuple]{int}(false) - scope-25
                      |   |   |
                      |   |   Project[int][0] - scope-26
                      |   |
                      |   |---A: New For Each(false,false,false)[bag] - scope-10
                      |       |   |
                      |       |   Cast[int] - scope-2
                      |       |   |
                      |       |   |---Project[bytearray][0] - scope-1
                      |       |   |
                      |       |   Cast[int] - scope-5
                      |       |   |
                      |       |   |---Project[bytearray][1] - scope-4
                      |       |   |
                      |       |   Cast[int] - scope-8
                      |       |   |
                      |       |   |---Project[bytearray][2] - scope-7
                      |       |
                      |       |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-0
                      |
                      |---C: Local Rearrange[tuple]{int}(false) - scope-27
                          |   |
                          |   Project[int][0] - scope-28
                          |
                          |---B: New For Each(false,false)[bag] - scope-20
                              |   |
                              |   Project[int][0] - scope-16
                              |   |
                              |   Project[int][1] - scope-18
                              |
                              |---A: New For Each(false,false,false)[bag] - scope-10
                                  |   |
                                  |   Cast[int] - scope-2
                                  |   |
                                  |   |---Project[bytearray][0] - scope-1
                                  |   |
                                  |   Cast[int] - scope-5
                                  |   |
                                  |   |---Project[bytearray][1] - scope-4
                                  |   |
                                  |   Cast[int] - scope-8
                                  |   |
                                  |   |---Project[bytearray][2] - scope-7
                                  |
                                  |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-0--------
      

      We connect ForEach(scope-10) in SparkNode(scope-37) with ForEach(scope-20) and LocalRearrange(scope-25) in SparkNode(scope-43). The successors of ForEach(scope-10) are scope-20 and scope-25 after multiquery optimization. Here we need use OperatorPlan#forceConnect(from, to) because POForEach#supportsMultipleOutputs are false. Why there is no problem in mr mode? in mr, clone ForEach(scope-10) as ForEach(scope-xxx), so the size of successors of POForEach is always 1.

      before multiquery optimization in mr mode:

      #--------------------------------------------------
      # Map Reduce Plan                                  
      #--------------------------------------------------
      MapReduce node scope-37
      Map Plan
      Store(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage) - scope-38
      |
      |---A: New For Each(false,false,false)[bag] - scope-10
          |   |
          |   Cast[int] - scope-2
          |   |
          |   |---Project[bytearray][0] - scope-1
          |   |
          |   Cast[int] - scope-5
          |   |
          |   |---Project[bytearray][1] - scope-4
          |   |
          |   Cast[int] - scope-8
          |   |
          |   |---Project[bytearray][2] - scope-7
          |
          |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-0--------
      Global sort: false
      ----------------
      
      MapReduce node scope-43
      Map Plan
      Union[tuple] - scope-44
      |
      |---C: Local Rearrange[tuple]{int}(false) - scope-25
      |   |   |
      |   |   Project[int][0] - scope-26
      |   |
      |   |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage) - scope-39
      |
      |---C: Local Rearrange[tuple]{int}(false) - scope-27
          |   |
          |   Project[int][0] - scope-28
          |
          |---B: New For Each(false,false)[bag] - scope-20
              |   |
              |   Project[int][0] - scope-16
              |   |
              |   Project[int][1] - scope-18
              |
              |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage) - scope-41--------
      Reduce Plan
      D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
      |
      |---D: Filter[bag] - scope-32
          |   |
          |   Greater Than or Equal[boolean] - scope-35
          |   |
          |   |---Project[int][1] - scope-33
          |   |
          |   |---Project[int][4] - scope-34
          |
          |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24--------
      Global sort: false
      ----------------
      

      after multiquery optimization in mr mode, scope-53 and scope-20 is the clone of scope-10

      #--------------------------------------------------
      # Map Reduce Plan                                  
      #--------------------------------------------------
      MapReduce node scope-43
      Map Plan
      Union[tuple] - scope-44
      |
      |---C: Local Rearrange[tuple]{int}(false) - scope-25
      |   |   |
      |   |   Project[int][0] - scope-26
      |   |
      |   |---A: New For Each(false,false,false)[bag] - scope-53
      |       |   |
      |       |   Cast[int] - scope-48
      |       |   |
      |       |   |---Project[bytearray][0] - scope-47
      |       |   |
      |       |   Cast[int] - scope-50
      |       |   |
      |       |   |---Project[bytearray][1] - scope-49
      |       |   |
      |       |   Cast[int] - scope-52
      |       |   |
      |       |   |---Project[bytearray][2] - scope-51
      |       |
      |       |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-46
      |
      |---C: Local Rearrange[tuple]{int}(false) - scope-27
          |   |
          |   Project[int][0] - scope-28
          |
          |---B: New For Each(false,false)[bag] - scope-20
              |   |
              |   Project[int][0] - scope-16
              |   |
              |   Project[int][1] - scope-18
              |
              |---A: New For Each(false,false,false)[bag] - scope-61
                  |   |
                  |   Cast[int] - scope-56
                  |   |
                  |   |---Project[bytearray][0] - scope-55
                  |   |
                  |   Cast[int] - scope-58
                  |   |
                  |   |---Project[bytearray][1] - scope-57
                  |   |
                  |   Cast[int] - scope-60
                  |   |
                  |   |---Project[bytearray][2] - scope-59
                  |
                  |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage) - scope-54--------
      Reduce Plan
      D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
      |
      |---D: Filter[bag] - scope-32
          |   |
          |   Greater Than or Equal[boolean] - scope-35
          |   |
          |   |---Project[int][1] - scope-33
          |   |
          |   |---Project[int][4] - scope-34
          |
          |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24--------
      Global sort: false
      ----------------
      

      Attachments

        1. PIG-4871_2.patch
          12 kB
          liyunzhang

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kellyzly liyunzhang
            kellyzly liyunzhang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment