Details

    • Type: Sub-task Sub-task
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: tez-branch
    • Fix Version/s: tez-branch
    • Component/s: tez
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      PIG-3743 implements union using VertexGroup. But there are a couple of optimizations that we can apply to it.

      • Union followed by store
        Union is a blocking operator meaning that a new vertex is added for its succeeding operators. But if there is only one store in the succeeding vertex, MROutput could be directly attached to VertexGroup instead of adding a new vertex for it. Then, each union source vertex will write directly to the destination, and therefore, it will be faster.
      • Replace POLocalRearrangeTez with POValueOutputTez
        Union uses POLocalRearrange by setting the whole record as key. But since union only needs to partition records evenly across tasks, it might make more sense to use POValueOutputTez with RR partitioner instead.
      1. PIG-3835-2.patch
        151 kB
        Rohini Palaniswamy
      2. PIG-3835-3.patch
        152 kB
        Rohini Palaniswamy
      3. PIG-3835-addendum-1.patch
        9 kB
        Rohini Palaniswamy
      4. PIG-3835-Initial-1.patch
        121 kB
        Rohini Palaniswamy

        Issue Links

          Activity

          Hide
          Rohini Palaniswamy added a comment -

          Changes done:

          • Changed POLocalRearrangeTez to POValueOutputTez
          • Wrote a UnionOptimizer
          • Got union followed by store working using vertexgroup.
          • Also implemented the case where union followed by group by or join only has 3 vertices instead of 4 using vertexgroup. But that is not working because ConcatenatedMergedKeyValuesInput is not working as expected. It does not group values from the two input together. The values are grouped only within each input. Will file a Tez bug on that.

          Putting up the patch to get approach vetted. Still to run unit and e2e tests and some minor cleanup pending.

          Reviewboard link - https://reviews.apache.org/r/19836/

          Show
          Rohini Palaniswamy added a comment - Changes done: Changed POLocalRearrangeTez to POValueOutputTez Wrote a UnionOptimizer Got union followed by store working using vertexgroup. Also implemented the case where union followed by group by or join only has 3 vertices instead of 4 using vertexgroup. But that is not working because ConcatenatedMergedKeyValuesInput is not working as expected. It does not group values from the two input together. The values are grouped only within each input. Will file a Tez bug on that. Putting up the patch to get approach vetted. Still to run unit and e2e tests and some minor cleanup pending. Reviewboard link - https://reviews.apache.org/r/19836/
          Hide
          Rohini Palaniswamy added a comment - - edited

          To give more details

          • Without the UnionOptimizer, there will be a union vertex which will just merge the two inputs.

          UnionOptimizer:
          This is modeled similar to MultiQueryOptimizer. It creates a number of vertex groups equal to the number of outputs from the union vertex. It merges the plan of union vertex with its predecessor vertices and connects the predecessor and successor vertex to appropriate vertex groups. And it finally removes the union vertex.
          For eg:
          1) Union followed by store
          Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + Store) will be optimized to
          Vertex 1 (Load + Store), Vertex 2 (Load + Store). Both the vertices will be writing output to same store location directly which is supported by Tez.
          2) Union followed by groupby
          Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + POLocalRearrange) -> Vertex 4 (Group by)
          will be optimized to (Vertex 1 (Load + POLR), Vertex 2 (Load + POLR) as Vertex Group)-> Vertex 3 (Group by)

          In the case of 1), plan is reduced from 3 to 2 vertices. And in case of 2), plan is reduced from 4 to 3 vertices. This enables to save 1 vertex worth of tasks (scheduling/CPU/IO).

          Will do some basic performance runs after TEZ-1003 is fixed.

          Show
          Rohini Palaniswamy added a comment - - edited To give more details Without the UnionOptimizer, there will be a union vertex which will just merge the two inputs. UnionOptimizer: This is modeled similar to MultiQueryOptimizer. It creates a number of vertex groups equal to the number of outputs from the union vertex. It merges the plan of union vertex with its predecessor vertices and connects the predecessor and successor vertex to appropriate vertex groups. And it finally removes the union vertex. For eg: 1) Union followed by store Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + Store) will be optimized to Vertex 1 (Load + Store), Vertex 2 (Load + Store). Both the vertices will be writing output to same store location directly which is supported by Tez. 2) Union followed by groupby Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + POLocalRearrange) -> Vertex 4 (Group by) will be optimized to (Vertex 1 (Load + POLR), Vertex 2 (Load + POLR) as Vertex Group)-> Vertex 3 (Group by) In the case of 1), plan is reduced from 3 to 2 vertices. And in case of 2), plan is reduced from 4 to 3 vertices. This enables to save 1 vertex worth of tasks (scheduling/CPU/IO). Will do some basic performance runs after TEZ-1003 is fixed.
          Hide
          Rohini Palaniswamy added a comment -

          Turned of UnionOptimizer for now by default till TEZ-1003 is fixed. Would like to get this patch in without waiting for TEZ-1003 as it has the POLocalRearrange->POValueOutputTez POPackage->POShuffledValueInputTez optimization. Once a new Input is added to Tez, will switch to that and make the UnionOptimizer turned on by default and also add new e2e tests for union in PIG-3855.

          Show
          Rohini Palaniswamy added a comment - Turned of UnionOptimizer for now by default till TEZ-1003 is fixed. Would like to get this patch in without waiting for TEZ-1003 as it has the POLocalRearrange->POValueOutputTez POPackage->POShuffledValueInputTez optimization. Once a new Input is added to Tez, will switch to that and make the UnionOptimizer turned on by default and also add new e2e tests for union in PIG-3855 .
          Hide
          Rohini Palaniswamy added a comment -

          Final patch.

          Changes :

          • return value to 0 in EmptyWritableComparator
          • fixes RoundRobinPartitioner for Writable instead of PigNullableWritable
          Show
          Rohini Palaniswamy added a comment - Final patch. Changes : return value to 0 in EmptyWritableComparator fixes RoundRobinPartitioner for Writable instead of PigNullableWritable
          Hide
          Rohini Palaniswamy added a comment -

          Committed to tez branch. Thanks Cheolsoo for the review.

          Show
          Rohini Palaniswamy added a comment - Committed to tez branch. Thanks Cheolsoo for the review.
          Hide
          Rohini Palaniswamy added a comment -

          Daniel found some issues when the UnionOptimizer is turned on. PIG-3835-addendum-1.patch fixes those issues.

          Show
          Rohini Palaniswamy added a comment - Daniel found some issues when the UnionOptimizer is turned on. PIG-3835 -addendum-1.patch fixes those issues.
          Hide
          Daniel Dai added a comment -

          Thanks for a quick respons. PIG-3835-addendum-1.patch works for me. The script throw exception pass now, though the correct result will depend on TEZ-1003.

          rmf output
          a = load '1.txt' as (x:int, y:chararray);
          b = load '2.txt' as (y:chararray, x:int);
          c = union onschema a, b;
          d = group c by x;
          – store c into 'output';
          store d into 'output';

          +1

          Show
          Daniel Dai added a comment - Thanks for a quick respons. PIG-3835 -addendum-1.patch works for me. The script throw exception pass now, though the correct result will depend on TEZ-1003 . rmf output a = load '1.txt' as (x:int, y:chararray); b = load '2.txt' as (y:chararray, x:int); c = union onschema a, b; d = group c by x; – store c into 'output'; store d into 'output'; +1
          Hide
          Rohini Palaniswamy added a comment -

          Committed the additional patch to tez-branch. Thanks Daniel for the review.

          Show
          Rohini Palaniswamy added a comment - Committed the additional patch to tez-branch. Thanks Daniel for the review.
          Hide
          Cheolsoo Park added a comment -

          FYI-

          1. This patch makes my test job run about 7% faster.
              w/o PIG-3835 w/ PIG-3835
            HH:mm:ss 00:15:32 00:14:33
          2. Regarding EmptyWritableComparator, 0 is faster than -1 and 1.
              -1 0 1
            HH:mm:ss 00:16:21 00:14:30 0:15:10
          Show
          Cheolsoo Park added a comment - FYI- This patch makes my test job run about 7% faster.   w/o PIG-3835 w/ PIG-3835 HH:mm:ss 00:15:32 00:14:33 Regarding EmptyWritableComparator, 0 is faster than -1 and 1.   -1 0 1 HH:mm:ss 00:16:21 00:14:30 0:15:10

            People

            • Assignee:
              Rohini Palaniswamy
              Reporter:
              Cheolsoo Park
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development