Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7.0
    • Fix Version/s: 0.10.0
    • Component/s: impl
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      We can optimize limit operation by stopping early in PigRecordReader. In general, we need a way to communicate between PigRecordReader and execution pipeline. POLimit could instruct PigRecordReader that we have already had enough records and stop feeding more data.

      1. PIG-1270-4.patch
        10 kB
        Daniel Dai
      2. PIG-1270-3.patch
        10 kB
        Min Zhou
      3. PIG-1270-2.patch
        12 kB
        Daniel Dai
      4. PIG-1270-1.patch
        12 kB
        Daniel Dai

        Activity

        Hide
        Daniel Dai added a comment -

        PIG-1270-2.patch fix all unit tests. However, I didn't see noticeable performance improvement. The script I test is:

        a = load 'studenttab20m' as (name, age, gpa);
        b = limit a 10;
        dump b;

        Both in local mode and mapreduce mode.

        Need further investigation to find out why performance not improve.

        Show
        Daniel Dai added a comment - PIG-1270 -2.patch fix all unit tests. However, I didn't see noticeable performance improvement. The script I test is: a = load 'studenttab20m' as (name, age, gpa); b = limit a 10; dump b; Both in local mode and mapreduce mode. Need further investigation to find out why performance not improve.
        Hide
        Min Zhou added a comment -

        @Daniel

        It improves here, but with a bug. I did the test in a 25-nodes cluster which such script

        A = load '/tpch/orders' USING PigStorage('\u0001') AS (o_orderkey:int, o_custkey:int, o_orderstatus:chararray, o_totalprice:double, o_orderdate:chararray, o_orderpriority:chararray, o_clerk:chararray, o_shippriority:int, o_comment: chararray);
        F = FOREACH A GENERATE o_orderkey;
        L = LIMIT F 10;
        DUMP L; 
        
        case job cost time HDFS bytes read Average time taken by Map tasks Worst performing map task
        w/o optimization 26 sec 12,976,128 1 sec 1 sec
        with optimization 24 sec 19,347,931,305 3 sec 5 sec

        Since with your patch, the LimitOptimizer would remove LOLimit from logic plans after set the limit to LOLoad, this would generate a map-only job. Record number of the result would be map_num * 10, this is incorrect.

        I will submit a patch soon.

        Show
        Min Zhou added a comment - @Daniel It improves here, but with a bug. I did the test in a 25-nodes cluster which such script A = load '/tpch/orders' USING PigStorage('\u0001') AS (o_orderkey:int, o_custkey:int, o_orderstatus:chararray, o_totalprice:double, o_orderdate:chararray, o_orderpriority:chararray, o_clerk:chararray, o_shippriority:int, o_comment: chararray); F = FOREACH A GENERATE o_orderkey; L = LIMIT F 10; DUMP L; case job cost time HDFS bytes read Average time taken by Map tasks Worst performing map task w/o optimization 26 sec 12,976,128 1 sec 1 sec with optimization 24 sec 19,347,931,305 3 sec 5 sec Since with your patch, the LimitOptimizer would remove LOLimit from logic plans after set the limit to LOLoad, this would generate a map-only job. Record number of the result would be map_num * 10, this is incorrect. I will submit a patch soon.
        Hide
        Min Zhou added a comment -

        Here is the patch which would fix the bug.

        Show
        Min Zhou added a comment - Here is the patch which would fix the bug.
        Hide
        Min Zhou added a comment -

        sorry, some mistakes

        case job cost time HDFS bytes read Average time taken by Map tasks Worst performing map task
        w/o optimization 26 sec 19,347,931,305 3 sec 5 sec
        w/ optimization 24 sec 12,976,128 1 sec 1 sec
        Show
        Min Zhou added a comment - sorry, some mistakes case job cost time HDFS bytes read Average time taken by Map tasks Worst performing map task w/o optimization 26 sec 19,347,931,305 3 sec 5 sec w/ optimization 24 sec 12,976,128 1 sec 1 sec
        Hide
        Daniel Dai added a comment -

        Thanks Min, that's encouraging. Which version of Hadoop are you using?

        Also I discussed with Min in IM, it will be better to have a global flag to signal the sufficiency of records, so that we can address more cases for this optimization.

        Show
        Daniel Dai added a comment - Thanks Min, that's encouraging. Which version of Hadoop are you using? Also I discussed with Min in IM, it will be better to have a global flag to signal the sufficiency of records, so that we can address more cases for this optimization.
        Hide
        Min Zhou added a comment -

        We are using a modified version of 0.19.1. However, that internal version provide new MR API and is compatible with both hadoop clients under the versions of 0.19.x and 0.20.2. Our version doesn't change any logic of map phase from the community version, so this patch should improves the latter as well.

        That's a good attempt if we can address more cases like limit optimization on LOFilter.

        Show
        Min Zhou added a comment - We are using a modified version of 0.19.1. However, that internal version provide new MR API and is compatible with both hadoop clients under the versions of 0.19.x and 0.20.2. Our version doesn't change any logic of map phase from the community version, so this patch should improves the latter as well. That's a good attempt if we can address more cases like limit optimization on LOFilter.
        Hide
        Viraj Bhat added a comment -

        What version is this likely going to be fixed?

        Daniel in your original comment the script mentioned is similar to a "SELECT * .. LIMIT 10" Hive currently does not run a M/R job for these situations, it just reads the data and streams it to stdout. Can we do such an optimization for the query mentioned?

        Additionally can we use some optimizations that Hadoop 23 has such as running in an Uberized task rather than launch M/R jobs?

        Viraj

        Show
        Viraj Bhat added a comment - What version is this likely going to be fixed? Daniel in your original comment the script mentioned is similar to a "SELECT * .. LIMIT 10" Hive currently does not run a M/R job for these situations, it just reads the data and streams it to stdout. Can we do such an optimization for the query mentioned? Additionally can we use some optimizations that Hadoop 23 has such as running in an Uberized task rather than launch M/R jobs? Viraj
        Hide
        Dmitriy V. Ryaboy added a comment -

        Uberized will kick in automatically in 0.23, no need to do anything on the Pig side afaik.

        Show
        Dmitriy V. Ryaboy added a comment - Uberized will kick in automatically in 0.23, no need to do anything on the Pig side afaik.
        Hide
        Daniel Dai added a comment -

        @Viraj
        For the patch itself, I would like to commit it into trunk soon. For the direct hdfs access you mention, it will probably be part of the backend rework we planned, but I am not sure at the moment.

        Show
        Daniel Dai added a comment - @Viraj For the patch itself, I would like to commit it into trunk soon. For the direct hdfs access you mention, it will probably be part of the backend rework we planned, but I am not sure at the moment.
        Hide
        Viraj Bhat added a comment -

        Hi Daniel,
        Can we target this patch for Pig 0.10.1?
        Viraj

        Show
        Viraj Bhat added a comment - Hi Daniel, Can we target this patch for Pig 0.10.1? Viraj
        Hide
        Daniel Dai added a comment -

        Unit test pass. Plan to check it into 0.10, objection?

        Show
        Daniel Dai added a comment - Unit test pass. Plan to check it into 0.10, objection?
        Hide
        Thejas M Nair added a comment -

        +1. Can you make a minor change before checkin ? - Make the limit variable in PigRecordReader a final.

        Show
        Thejas M Nair added a comment - +1. Can you make a minor change before checkin ? - Make the limit variable in PigRecordReader a final.
        Hide
        Thejas M Nair added a comment -

        These are the results I got from running tests to check the performance on larger data.

        Query -

        grunt> l = load '/tmp/bigfile2' as (a,b,c);
        grunt> lim = limit l 10;
        grunt> dump lim;
        

        Ran on a cluster with 8 map slots.

        With 128MB block size , 499 Maps -

          trunk trunk+patch
        avg Run time 17 min 7 sec 6 min 44 sec
        avg run time of map 12 sec 4 sec

        With smaller number of splits the numbers are better -
        With 'set pig.maxCombinedSplitSize 1073741824' (ie split size of 1G) and 64 Maps -

          trunk trunk+patch
        avg Run time 15 min 19 sec 1 min 10 sec
        avg run time of map 106 sec 4 sec
        Show
        Thejas M Nair added a comment - These are the results I got from running tests to check the performance on larger data. Query - grunt> l = load '/tmp/bigfile2' as (a,b,c); grunt> lim = limit l 10; grunt> dump lim; Ran on a cluster with 8 map slots. With 128MB block size , 499 Maps -   trunk trunk+patch avg Run time 17 min 7 sec 6 min 44 sec avg run time of map 12 sec 4 sec With smaller number of splits the numbers are better - With 'set pig.maxCombinedSplitSize 1073741824' (ie split size of 1G) and 64 Maps -   trunk trunk+patch avg Run time 15 min 19 sec 1 min 10 sec avg run time of map 106 sec 4 sec
        Hide
        Daniel Dai added a comment -

        PIG-1270-4.patch resync with trunk and address Thejas's comment.

        Show
        Daniel Dai added a comment - PIG-1270 -4.patch resync with trunk and address Thejas's comment.
        Hide
        Daniel Dai added a comment -

        Patch committed to 0.10/trunk. Thanks Thejas and Min for testing!

        Show
        Daniel Dai added a comment - Patch committed to 0.10/trunk. Thanks Thejas and Min for testing!

          People

          • Assignee:
            Daniel Dai
            Reporter:
            Daniel Dai
          • Votes:
            1 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development