Pig
  1. Pig
  2. PIG-2661

Pig uses an extra job for loading data in Pigmix L9

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.9.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None
    1. PIG-2661.0.patch
      6 kB
      Jie Li
    2. PIG-2661.1.patch
      6 kB
      Jie Li
    3. PIG-2661.2.patch
      7 kB
      Jie Li
    4. PIG-2661.plan.txt
      2 kB
      Jie Li
    5. PIG-2661.3.patch
      10 kB
      Jie Li
    6. PIG-2661.4.patch
      11 kB
      Jie Li
    7. PIG-2661.5.patch
      11 kB
      Jie Li
    8. PIG-2661.6.patch
      11 kB
      Jie Li
    9. PIG-2661.7.patch
      14 kB
      Jie Li
    10. PIG-2661.8.patch
      14 kB
      Jie Li

      Issue Links

        Activity

        Hide
        Dmitriy V. Ryaboy added a comment -

        I can look at this after I wrap up skewed join issues, but Daniel and/or Thejas would probably be faster at figuring this one out .

        Show
        Dmitriy V. Ryaboy added a comment - I can look at this after I wrap up skewed join issues, but Daniel and/or Thejas would probably be faster at figuring this one out .
        Hide
        Daniel Dai added a comment -

        For order by job, we will first generate 3 MR jobs, the first job processes everything before order-by, the second is sample job and the third is a sort job. We try to drop the first job in SampleOptimizer. Currently we only drop the first job when the it is empty. If the first job is not empty, we may merge 1st job pipleline into 2nd/3th job, however, we need to make sure we sample the input after the pipeline, and WeightedRangePartitioner also partition the input after the pipeline. Seems there is some non-trivial work to do.

        Show
        Daniel Dai added a comment - For order by job, we will first generate 3 MR jobs, the first job processes everything before order-by, the second is sample job and the third is a sort job. We try to drop the first job in SampleOptimizer. Currently we only drop the first job when the it is empty. If the first job is not empty, we may merge 1st job pipleline into 2nd/3th job, however, we need to make sure we sample the input after the pipeline, and WeightedRangePartitioner also partition the input after the pipeline. Seems there is some non-trivial work to do.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Daniel,
        We didn't use to generate the first MR job prior to 9. THe change is that we now see loading with a schema (load foo as (a:int, b:chararray)) as a projection, and perform it prior to sampling. I think we can at least get this back (it can cost us a LOT of time – if all of your loaders provide a schema, this means piping the whole dataset through an MR job and out to disk) – by special casing the map-only, foreach + projection only job.

        D

        Show
        Dmitriy V. Ryaboy added a comment - Daniel, We didn't use to generate the first MR job prior to 9. THe change is that we now see loading with a schema (load foo as (a:int, b:chararray)) as a projection, and perform it prior to sampling. I think we can at least get this back (it can cost us a LOT of time – if all of your loaders provide a schema, this means piping the whole dataset through an MR job and out to disk) – by special casing the map-only, foreach + projection only job. D
        Hide
        Daniel Dai added a comment -

        I see, yes, it is the foreach inserted to enforce the number of columns, that's much easier. One solution is to let the loader enforce it.

        Show
        Daniel Dai added a comment - I see, yes, it is the foreach inserted to enforce the number of columns, that's much easier. One solution is to let the loader enforce it.
        Hide
        Jie Li added a comment -

        Here is the comparison between 0.8.1 and 0.9.0:

        types pig 0.8.1 pig 0.9.0
        No schema:
        A = load 'input';
        B = order A by $0;
        store B into 'output';
        2 jobs 2 jobs
        Schema without types:
        A = load 'input' as (a,b,c);
        B = order A by a;
        store B into 'output';
        2 jobs 3 jobs
        Schema with types:
        A = load 'input' as (a:chararray,b,c);
        B = order A by a;
        store B into 'output';
        3 jobs 3 jobs

        The difference between 0.8.1 and 0.9.0 is when a schema without types is provided (as in Pigmix L9), Pig 0.9.0 will use an extra job. This difference was introduced in PIG-1188 Padding nulls to the input tuple according to input schema, where a Foreach is inserted for untyped data in order to get the same behaviour of padding nulls as for typed data. Linked to PIG-1188.

        Daniel: As you said, we may merge 1st job pipleline into 2nd/3th job, which will make all the three cases have only 2 jobs. Can we implement it in SampleOptimizer by pushing the 1st job's foreach to the RandomSampleLoader?

        Show
        Jie Li added a comment - Here is the comparison between 0.8.1 and 0.9.0: types pig 0.8.1 pig 0.9.0 No schema: A = load 'input'; B = order A by $0; store B into 'output'; 2 jobs 2 jobs Schema without types: A = load 'input' as (a,b,c); B = order A by a; store B into 'output'; 2 jobs 3 jobs Schema with types: A = load 'input' as (a:chararray,b,c); B = order A by a; store B into 'output'; 3 jobs 3 jobs The difference between 0.8.1 and 0.9.0 is when a schema without types is provided (as in Pigmix L9), Pig 0.9.0 will use an extra job. This difference was introduced in PIG-1188 Padding nulls to the input tuple according to input schema , where a Foreach is inserted for untyped data in order to get the same behaviour of padding nulls as for typed data. Linked to PIG-1188 . Daniel: As you said, we may merge 1st job pipleline into 2nd/3th job, which will make all the three cases have only 2 jobs. Can we implement it in SampleOptimizer by pushing the 1st job's foreach to the RandomSampleLoader?
        Hide
        Daniel Dai added a comment -

        That's one option, but would involve many changes in WeightedRangePartitioner.

        Show
        Daniel Dai added a comment - That's one option, but would involve many changes in WeightedRangePartitioner.
        Hide
        Jie Li added a comment -

        Discussed with Daniel, and we want to merge the previous pipeline (if it's a chain) to both the sampling job and the next orderby/skewjoin job. Fortunately we don't need to change the WeightedRangePartitioner at all

        The advantage is that it's still optimizable even if there is a filter or something in the previous pipeline, which couldn't be optimized previously.

        Show
        Jie Li added a comment - Discussed with Daniel, and we want to merge the previous pipeline (if it's a chain) to both the sampling job and the next orderby/skewjoin job. Fortunately we don't need to change the WeightedRangePartitioner at all The advantage is that it's still optimizable even if there is a filter or something in the previous pipeline, which couldn't be optimized previously.
        Hide
        Jie Li added a comment -

        Submit a patch that works for both order-by and skew join.

        Show
        Jie Li added a comment - Submit a patch that works for both order-by and skew join.
        Hide
        Jie Li added a comment -

        Oops, the patch breaks Illustrate. Found a similar optimization in MultiQueryOptimizer#mergeDiamondMROper, rewriting the patch..

        Show
        Jie Li added a comment - Oops, the patch breaks Illustrate. Found a similar optimization in MultiQueryOptimizer#mergeDiamondMROper, rewriting the patch..
        Hide
        Jie Li added a comment -

        Attached another branch fixing the Illustrate.

        Show
        Jie Li added a comment - Attached another branch fixing the Illustrate.
        Hide
        Jie Li added a comment -

        An interesting problem:

        Previously for order-by, Pig will force any previous pipeline to finish and write to disk first, and then sample the data and sort it, so the sampler will see the same data that will be sorted. Now we want to merge the previous map-only pipeline into both the sampler and order-by. The sampler will sample the data before that pipeline, and pass the sample results through the pipeline to generate the partition file. See the query:

        a = load 'data' as (x,y)
        b = filter a by udf(x,y)
        c = foreach b generate udf(x,y)
        d = order c by x
        

        Here a->b->c is the pipeline before order-by. Previously Pig will write c to the disk first, and then the sampler will get samples from c; but now we want to avoid writing c to the disk, so the sampler will load a to get samples and pass them through b and c to generate the partition file. Here b and c can be projection, filter and any other non-blocking operators.

        One concern is, would the new way of sampling still capture the distribution of the data to be sorted?

        What we want What we have now What we'll have
        Distribution(a->b->c) Distribution(Sample(a->b->c)) Distribution(Sample(a)>b>c)

        It's clear that Sample will keep the original distribution, so the three distributions in the table would be equivalent.

        Another concern is the performance. With the patch, the sampler will do a full scan of the table before the filter, which might be slower than before if the filter is very selective. This might be acceptable considering that the sampler only parse a small percent of the data. Will do some benchmark.

        Show
        Jie Li added a comment - An interesting problem: Previously for order-by, Pig will force any previous pipeline to finish and write to disk first, and then sample the data and sort it, so the sampler will see the same data that will be sorted. Now we want to merge the previous map-only pipeline into both the sampler and order-by. The sampler will sample the data before that pipeline, and pass the sample results through the pipeline to generate the partition file. See the query: a = load 'data' as (x,y) b = filter a by udf(x,y) c = foreach b generate udf(x,y) d = order c by x Here a->b->c is the pipeline before order-by. Previously Pig will write c to the disk first, and then the sampler will get samples from c; but now we want to avoid writing c to the disk, so the sampler will load a to get samples and pass them through b and c to generate the partition file. Here b and c can be projection, filter and any other non-blocking operators. One concern is, would the new way of sampling still capture the distribution of the data to be sorted? What we want What we have now What we'll have Distribution(a->b->c) Distribution(Sample(a->b->c)) Distribution(Sample(a) >b >c) It's clear that Sample will keep the original distribution, so the three distributions in the table would be equivalent. Another concern is the performance. With the patch, the sampler will do a full scan of the table before the filter, which might be slower than before if the filter is very selective. This might be acceptable considering that the sampler only parse a small percent of the data. Will do some benchmark.
        Hide
        Jie Li added a comment -

        Some benchmark result using 1GB TPCH data lineitem:

        query trunk this patch
        load-orderby-store 1m41s (load) + 53s (sample) + 3m11s (orderby) 38s (sample) + 3m27s (orderby)
        load-orderby-filter-store 41s (load) + 32s (sample) + 35s (orderby) 38s (sample) + 50s (orderby)

        Note the filter is very selective but we didn't see the slowdown of the sample job. The slight slowdown of the orderby job might result from different serialization. In both query, we save one entire load job.

        But just another issue came into my mind: though the distribution won't change, the number of samples might change after the pipeline. If the pipeline decreases #records such as filter/limit/sample, then we'll have less samples at the end, but we also have a smaller order-by which doesn't need many samples. If the pipeline increases #records such as flatten/stream, then we may end up with having many samples at the end, which is likely to have poor performance. Therefore let's just disable the sample optimization if we find these "exploding" pipeline operators. (what else besides flatten/stream?)

        Show
        Jie Li added a comment - Some benchmark result using 1GB TPCH data lineitem: query trunk this patch load-orderby-store 1m41s (load) + 53s (sample) + 3m11s (orderby) 38s (sample) + 3m27s (orderby) load-orderby-filter-store 41s (load) + 32s (sample) + 35s (orderby) 38s (sample) + 50s (orderby) Note the filter is very selective but we didn't see the slowdown of the sample job. The slight slowdown of the orderby job might result from different serialization. In both query, we save one entire load job. But just another issue came into my mind: though the distribution won't change, the number of samples might change after the pipeline. If the pipeline decreases #records such as filter/limit/sample, then we'll have less samples at the end, but we also have a smaller order-by which doesn't need many samples. If the pipeline increases #records such as flatten/stream, then we may end up with having many samples at the end, which is likely to have poor performance. Therefore let's just disable the sample optimization if we find these "exploding" pipeline operators. (what else besides flatten/stream?)
        Hide
        Jie Li added a comment -

        Attached the patch that disables sample optimization if there is flatten/stream.

        Show
        Jie Li added a comment - Attached the patch that disables sample optimization if there is flatten/stream.
        Hide
        Dmitriy V. Ryaboy added a comment -

        I am not sure we should disable this due to concern of how expensive sampling is. Do you have examples that show this being worthwhile?

        Show
        Dmitriy V. Ryaboy added a comment - I am not sure we should disable this due to concern of how expensive sampling is. Do you have examples that show this being worthwhile?
        Hide
        Jie Li added a comment -
        a = load 'in' as (g:{});
        b = foreach a generate flatten(g);
        c = order b by $1;
        dump c;
        

        For this query, if we merge the flatten into the sample, then the sample will read 100 bags and flatten them to possibly unlimited number of records, all of which will flow through one single reducer of the sampling job.

        Show
        Jie Li added a comment - a = load 'in' as (g:{}); b = foreach a generate flatten(g); c = order b by $1; dump c; For this query, if we merge the flatten into the sample, then the sample will read 100 bags and flatten them to possibly unlimited number of records, all of which will flow through one single reducer of the sampling job.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Sampler is fast though. Like, really fast. It doesn't read most of the data it's sampling.

        Show
        Dmitriy V. Ryaboy added a comment - Sampler is fast though. Like, really fast. It doesn't read most of the data it's sampling.
        Hide
        Jie Li added a comment -

        In this case, it'll read up to 100 bags, which can be a lot of data?

        Show
        Jie Li added a comment - In this case, it'll read up to 100 bags, which can be a lot of data?
        Hide
        Dmitriy V. Ryaboy added a comment -

        I am easily convinced by numbers

        Show
        Dmitriy V. Ryaboy added a comment - I am easily convinced by numbers
        Hide
        Jie Li added a comment -

        Sure will post some numbers tomorrow.

        Show
        Jie Li added a comment - Sure will post some numbers tomorrow.
        Hide
        Jie Li added a comment -

        Here are some numbers for why we want to disable merging the pipeline into sample if there exist flatten/stream:

        Query:

        A = LOAD '$input/group' USING PigStorage('|') AS (a:int, b:{});
        B = foreach A generate a, flatten(b);
        ret = order B by $1; 
        STORE ret INTO '$output/out';
        

        Note there is a flatten. See attached PIG-2661.plan.txt for the query plan if we merge the pipeline.

        Test data:
        1GB data, grouped into three bags.

        Result:

        merge don't merge
        sample(17min) + orderby(14m) pipeline(11m) + sample(1m26s) + orderby(5m)

        We can see if we merge the pipeline to the sample job, it'll be very slow, due to several reasons:
        1) the sample job will sample all three bags, which contain all the 1GB data;
        2) the sample job requires a reduce phase to aggregate the sample information;
        3) the orderby job will need to re-parse the input data.

        We can imagine that if we have 10GB data, the difference will be more obvious as the 10GB data will go through one reducer of the sample job.

        Show
        Jie Li added a comment - Here are some numbers for why we want to disable merging the pipeline into sample if there exist flatten/stream: Query: A = LOAD '$input/group' USING PigStorage('|') AS (a: int , b:{}); B = foreach A generate a, flatten(b); ret = order B by $1; STORE ret INTO '$output/out'; Note there is a flatten. See attached PIG-2661 .plan.txt for the query plan if we merge the pipeline. Test data: 1GB data, grouped into three bags. Result: merge don't merge sample(17min) + orderby(14m) pipeline(11m) + sample(1m26s) + orderby(5m) We can see if we merge the pipeline to the sample job, it'll be very slow, due to several reasons: 1) the sample job will sample all three bags, which contain all the 1GB data; 2) the sample job requires a reduce phase to aggregate the sample information; 3) the orderby job will need to re-parse the input data. We can imagine that if we have 10GB data, the difference will be more obvious as the 10GB data will go through one reducer of the sample job.
        Hide
        Jie Li added a comment -

        Attached the latest patch containing two unit tests.

        Show
        Jie Li added a comment - Attached the latest patch containing two unit tests.
        Hide
        Dmitriy V. Ryaboy added a comment -

        I see what you mean now – yeah, our options in this case are to either not perform the optimization, or to push the operator chain above the sample loader, and sample its outputs instead of its inputs.

        But on the flatten thing, it occurs to me that we actually shouldn't allow the merge join here, as we can't guarantee sorted order after a flatten. Or can we? Is there a reason to believe order will stay the same after a flatten?

        Show
        Dmitriy V. Ryaboy added a comment - I see what you mean now – yeah, our options in this case are to either not perform the optimization, or to push the operator chain above the sample loader, and sample its outputs instead of its inputs. But on the flatten thing, it occurs to me that we actually shouldn't allow the merge join here, as we can't guarantee sorted order after a flatten. Or can we? Is there a reason to believe order will stay the same after a flatten?
        Hide
        Jie Li added a comment -

        I see what you mean now – yeah, our options in this case are to either not perform the optimization, or to push the operator chain above the sample loader, and sample its outputs instead of its inputs.

        I think simply not performing the optimization is better, as the order-by doesn't need to re-parse the data (see the results above); also it's easier to implement.

        But on the flatten thing, it occurs to me that we actually shouldn't allow the merge join here, as we can't guarantee sorted order after a flatten. Or can we? Is there a reason to believe order will stay the same after a flatten?

        The order-by is after the flatten, thus the flatten shouldn't affect the final order, right?

        Show
        Jie Li added a comment - I see what you mean now – yeah, our options in this case are to either not perform the optimization, or to push the operator chain above the sample loader, and sample its outputs instead of its inputs. I think simply not performing the optimization is better, as the order-by doesn't need to re-parse the data (see the results above); also it's easier to implement. But on the flatten thing, it occurs to me that we actually shouldn't allow the merge join here, as we can't guarantee sorted order after a flatten. Or can we? Is there a reason to believe order will stay the same after a flatten? The order-by is after the flatten, thus the flatten shouldn't affect the final order, right?
        Hide
        Jie Li added a comment -

        Attached PIG-2661.4.patch with tabs removed.

        Show
        Jie Li added a comment - Attached PIG-2661 .4.patch with tabs removed.
        Hide
        Jie Li added a comment -

        Attach PIG-2661.5.patch with doc improved.

        Show
        Jie Li added a comment - Attach PIG-2661 .5.patch with doc improved.
        Hide
        Jie Li added a comment -

        Attached PIG-2661.6.patch with some other docs improved.

        Show
        Jie Li added a comment - Attached PIG-2661 .6.patch with some other docs improved.
        Hide
        Daniel Dai added a comment -

        Patch 6 looks good to me. I am going to check in to trunk once tests pass. Any more concern, Dmitriy?

        Show
        Daniel Dai added a comment - Patch 6 looks good to me. I am going to check in to trunk once tests pass. Any more concern, Dmitriy?
        Hide
        Dmitriy V. Ryaboy added a comment -

        Thanks, I am satisfied

        Show
        Dmitriy V. Ryaboy added a comment - Thanks, I am satisfied
        Hide
        Daniel Dai added a comment -

        Hi, Jie, TestPigRunner and TestJobSubmission failed, can you take a look? Thanks.

        Show
        Daniel Dai added a comment - Hi, Jie, TestPigRunner and TestJobSubmission failed, can you take a look? Thanks.
        Hide
        Jie Li added a comment -

        Hi Daniel, could you check the PIG-2661.7.patch that should fix the failed tests?

        Show
        Jie Li added a comment - Hi Daniel, could you check the PIG-2661 .7.patch that should fix the failed tests?
        Hide
        Dmitriy V. Ryaboy added a comment -

        Bumping this patch up – Daniel, did your ever get a chance to run these tests?

        Show
        Dmitriy V. Ryaboy added a comment - Bumping this patch up – Daniel, did your ever get a chance to run these tests?
        Hide
        Jie Li added a comment -

        Thanks Dmitriy for bumping this up. The last status was the test case TestSkewedJoin#testSkewedJoinKeyPartition failed. It's not trivial to debug as it needs to run in the distributed mode. Do you happen to have any idea?

        Show
        Jie Li added a comment - Thanks Dmitriy for bumping this up. The last status was the test case TestSkewedJoin#testSkewedJoinKeyPartition failed. It's not trivial to debug as it needs to run in the distributed mode. Do you happen to have any idea?
        Hide
        Dmitriy V. Ryaboy added a comment -

        Hm I noticed testReducerNumEstimationForOrderBy in TestJobSubmission fails, as well – it still assumes 3 jobs. I'll look at TestSkewedJoin, can you see if you can find a fix for TestJobSubmission?

        Show
        Dmitriy V. Ryaboy added a comment - Hm I noticed testReducerNumEstimationForOrderBy in TestJobSubmission fails, as well – it still assumes 3 jobs. I'll look at TestSkewedJoin, can you see if you can find a fix for TestJobSubmission?
        Hide
        Jie Li added a comment -

        Update the patch for TestJobSubmission#testReducerNumEstimationForOrderBy. Removed some assertions for #reducers info in the configuration introduced in PIG-2779 (Sorry Bill!), as this case is too complicated, and those assertions are also available in many other test cases.

        Show
        Jie Li added a comment - Update the patch for TestJobSubmission#testReducerNumEstimationForOrderBy. Removed some assertions for #reducers info in the configuration introduced in PIG-2779 (Sorry Bill!), as this case is too complicated, and those assertions are also available in many other test cases.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Ok, for TestSkewedJoin, I think I know what's going on but not how to fix it.

        Here's the explain plan for the sampler job after this patch:

        MapReduce node scope-24
        Map Plan
        Local Rearrange[tuple]{tuple}(false) - scope-27
        |   |
        |   Constant(all) - scope-26
        |
        |---New For Each(true,true)[tuple] - scope-25
            |   |
            |   Project[bytearray][0] - scope-14
            |   |
            |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-22
            |   |
            |   |---Project[tuple][*] - scope-21
            |
            |---A: New For Each(false,false,false)[bag] - scope-55
                |   |
                |   Project[bytearray][0] - scope-52
                |   |
                |   Project[bytearray][1] - scope-53
                |   |
                |   Project[bytearray][2] - scope-54
                |
                |---Load(hdfs://localhost:58995/user/dmitriy/SkewedJoinInput1.txt:org.ap
        ache.pig.impl.builtin.PoissonSampleLoader('org.apache.pig.builtin.PigStorage','1
        00')) - scope-23--------
        

        Here are the corresponding bits prior to the patch:

        
        MapReduce node scope-18
        Map Plan
        Store(hdfs://localhost:59383/tmp/temp220048876/tmp99560328:org.apache.pig.impl.i
        o.InterStorage) - scope-20
        |
        |---A: New For Each(false,false,false)[bag] - scope-7
            |   |
            |   Project[bytearray][0] - scope-1
            |   |
            |   Project[bytearray][1] - scope-3
            |   |
            |   Project[bytearray][2] - scope-5
            |
            |---A: Load(hdfs://localhost:59383/user/dmitriy/SkewedJoinInput1.txt:org.apa
        che.pig.builtin.PigStorage) - scope-0--------
        Global sort: false
        ----------------
        
        MapReduce node scope-24
        Map Plan
        Local Rearrange[tuple]{tuple}(false) - scope-27
        |   |
        |   Constant(all) - scope-26
        |
        |---New For Each(true,true)[tuple] - scope-25
            |   |
            |   Project[bytearray][0] - scope-14
            |   |
            |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-22
            |   |
            |   |---Project[tuple][*] - scope-21
            |
            |---Load(hdfs://localhost:59383/tmp/temp220048876/tmp99560328:org.apache.pig
        .impl.builtin.PoissonSampleLoader('org.apache.pig.impl.io.InterStorage','100')) 
        
        

        What's happening is that the foreach to generate the first 3 columns, which Pig now adds to ensure types, etc, work, is happening between the Sample Loader and the GetMemNumRows udf. Sample Loader adds a couple of columns to the last tuple it outputs, with some stats about the dataset it saw. When we put the projection between it and the GetMemNumRows, those extra columns get dropped, and GetMemNumRows winds up completely breaking down, assuming that each sample occurs 0 times, and the whole skewed join thing just turns into a regular join. We have to either get rid of the foreach, or add the columns PoissonSampleLoader adds, to the foreach.

        Show
        Dmitriy V. Ryaboy added a comment - Ok, for TestSkewedJoin, I think I know what's going on but not how to fix it. Here's the explain plan for the sampler job after this patch: MapReduce node scope-24 Map Plan Local Rearrange[tuple]{tuple}( false ) - scope-27 | | | Constant(all) - scope-26 | |---New For Each( true , true )[tuple] - scope-25 | | | Project[bytearray][0] - scope-14 | | | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-22 | | | |---Project[tuple][*] - scope-21 | |---A: New For Each( false , false , false )[bag] - scope-55 | | | Project[bytearray][0] - scope-52 | | | Project[bytearray][1] - scope-53 | | | Project[bytearray][2] - scope-54 | |---Load(hdfs: //localhost:58995/user/dmitriy/SkewedJoinInput1.txt:org.ap ache.pig.impl.builtin.PoissonSampleLoader('org.apache.pig.builtin.PigStorage','1 00')) - scope-23-------- Here are the corresponding bits prior to the patch: MapReduce node scope-18 Map Plan Store(hdfs: //localhost:59383/tmp/temp220048876/tmp99560328:org.apache.pig.impl.i o.InterStorage) - scope-20 | |---A: New For Each( false , false , false )[bag] - scope-7 | | | Project[bytearray][0] - scope-1 | | | Project[bytearray][1] - scope-3 | | | Project[bytearray][2] - scope-5 | |---A: Load(hdfs: //localhost:59383/user/dmitriy/SkewedJoinInput1.txt:org.apa che.pig.builtin.PigStorage) - scope-0-------- Global sort: false ---------------- MapReduce node scope-24 Map Plan Local Rearrange[tuple]{tuple}( false ) - scope-27 | | | Constant(all) - scope-26 | |---New For Each( true , true )[tuple] - scope-25 | | | Project[bytearray][0] - scope-14 | | | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-22 | | | |---Project[tuple][*] - scope-21 | |---Load(hdfs: //localhost:59383/tmp/temp220048876/tmp99560328:org.apache.pig .impl.builtin.PoissonSampleLoader('org.apache.pig.impl.io.InterStorage','100')) What's happening is that the foreach to generate the first 3 columns, which Pig now adds to ensure types, etc, work, is happening between the Sample Loader and the GetMemNumRows udf. Sample Loader adds a couple of columns to the last tuple it outputs, with some stats about the dataset it saw. When we put the projection between it and the GetMemNumRows, those extra columns get dropped, and GetMemNumRows winds up completely breaking down, assuming that each sample occurs 0 times, and the whole skewed join thing just turns into a regular join. We have to either get rid of the foreach, or add the columns PoissonSampleLoader adds, to the foreach.
        Hide
        Jie Li added a comment -

        Nice analysis!

        As the foreach may or may not exist in the query, playing with it may be tricky. Maybe we can change the current way of passing the internal info from PoissonSampleLoader to the GetMemNumRows. What we need is to tag each tuple with an integer needed by the GetMemNumRows. Currently we simply append the tag to the tuple's data, which can confuse other operators. A more general method is to add a Map field in the tuple so operators in the pipeline can add into the Map whatever tags they want without interrupting the normal data processing.

        What do you think?

        Show
        Jie Li added a comment - Nice analysis! As the foreach may or may not exist in the query, playing with it may be tricky. Maybe we can change the current way of passing the internal info from PoissonSampleLoader to the GetMemNumRows. What we need is to tag each tuple with an integer needed by the GetMemNumRows. Currently we simply append the tag to the tuple's data, which can confuse other operators. A more general method is to add a Map field in the tuple so operators in the pipeline can add into the Map whatever tags they want without interrupting the normal data processing. What do you think?
        Hide
        Dmitriy V. Ryaboy added a comment -

        Looking at the code, I am not sure how you would make that work..

        But perhaps one could emit an extra tuple at the end instead of appending fields to a tuple? Data about size, etc would then be contained in that last tuple (in a single field – since we may be joining a relation of one-field tuples).

        This may still be problematic if type-specific tuples are generated and used for the loaders, as the type of the last tuple won't match the schema of the rest of the tuples.

        Considering all the options so far, I think it's still easiest to force a removal of the unnecessary foreach in just the sampler job.

        Show
        Dmitriy V. Ryaboy added a comment - Looking at the code, I am not sure how you would make that work.. But perhaps one could emit an extra tuple at the end instead of appending fields to a tuple? Data about size, etc would then be contained in that last tuple (in a single field – since we may be joining a relation of one-field tuples). This may still be problematic if type-specific tuples are generated and used for the loaders, as the type of the last tuple won't match the schema of the rest of the tuples. Considering all the options so far, I think it's still easiest to force a removal of the unnecessary foreach in just the sampler job.
        Hide
        Dmitriy V. Ryaboy added a comment -

        or, you know, stick a key in MemCache. #whyishadoopsohard

        Show
        Dmitriy V. Ryaboy added a comment - or, you know, stick a key in MemCache. #whyishadoopsohard
        Hide
        Dmitriy V. Ryaboy added a comment -

        Ok, some fresh thoughts rolling in after sleeping on this.

        Why do we have this foreach in the first place? It's inserted to achieve the following goals:

        • pad nulls (in PIG-2824, Jie saw perf problems from that, and I suggested we get rid of the foreach altogether, getting POLoad to do the null padding instead).
        • coerce tuples generated by the loader into schemas specified in the "load as.." statement
        • drop unneeded columns

        (please let me know if this list is incomplete)

        For padding nulls, I believe we can achieve the same effect much more cheaply, and without the side effect that's biting us here, by making basic modifications to POLoad.

        For coercing into schemas, we can do the same thing – copy all the fields from the incoming tuple (including excess ones), and only convert the ones we know something about. This can also be done directly in POLoad, and only be triggered if the loader doesn't already tell us what the schema is it's returning, or the schemas don't match type-wise.

        This leaves dropping columns. Since in that case the whole point is to not carry along unwanted columns, this use case is clearly in conflict with the way the PoissonSampleLoader wants to work, by inserting extra columns and sneaking them through to the UDF linked to it. Moreover, if we go the route of putting the plan between load and skewed join between the sample loader and the GetMemNumRows UDF, other things may also break the sampling – for example, filters that happen to filter out the specially marked tuples, by accident. This is telling us that messing with the tuples PSL returns is problematic. What if instead we created a UDF that was fed all the tuples from a regular loader, with the rest of the pipeline that gets inserted, but was able to signal to its consumers when it's done – thus effectively recreating PoissonSampleLoader's functionality in addition to GetMemNumRows ? It would output sample tuples or nulls, and we can add a null filter right above it. I believe that gives us everything we are looking for and simplifies the pipeline a fair bit. We'd have to add capability for UDFs to early-terminate, of course. That's already been done for Accumulative UDFs in PIG-2066 and I think should be straightforward to do for regular UDFs.

        Thoughts?

        Show
        Dmitriy V. Ryaboy added a comment - Ok, some fresh thoughts rolling in after sleeping on this. Why do we have this foreach in the first place? It's inserted to achieve the following goals: pad nulls (in PIG-2824 , Jie saw perf problems from that, and I suggested we get rid of the foreach altogether, getting POLoad to do the null padding instead). coerce tuples generated by the loader into schemas specified in the "load as.." statement drop unneeded columns (please let me know if this list is incomplete) For padding nulls, I believe we can achieve the same effect much more cheaply, and without the side effect that's biting us here, by making basic modifications to POLoad. For coercing into schemas, we can do the same thing – copy all the fields from the incoming tuple (including excess ones), and only convert the ones we know something about. This can also be done directly in POLoad, and only be triggered if the loader doesn't already tell us what the schema is it's returning, or the schemas don't match type-wise. This leaves dropping columns. Since in that case the whole point is to not carry along unwanted columns, this use case is clearly in conflict with the way the PoissonSampleLoader wants to work, by inserting extra columns and sneaking them through to the UDF linked to it. Moreover, if we go the route of putting the plan between load and skewed join between the sample loader and the GetMemNumRows UDF, other things may also break the sampling – for example, filters that happen to filter out the specially marked tuples, by accident. This is telling us that messing with the tuples PSL returns is problematic. What if instead we created a UDF that was fed all the tuples from a regular loader, with the rest of the pipeline that gets inserted, but was able to signal to its consumers when it's done – thus effectively recreating PoissonSampleLoader's functionality in addition to GetMemNumRows ? It would output sample tuples or nulls, and we can add a null filter right above it. I believe that gives us everything we are looking for and simplifies the pipeline a fair bit. We'd have to add capability for UDFs to early-terminate, of course. That's already been done for Accumulative UDFs in PIG-2066 and I think should be straightforward to do for regular UDFs. Thoughts?
        Hide
        Jie Li added a comment -

        Somehow I thought POLoad was not used in MR mode, so I'm not sure if we can push the Foreach into POLoad...

        Show
        Jie Li added a comment - Somehow I thought POLoad was not used in MR mode, so I'm not sure if we can push the Foreach into POLoad...
        Hide
        Dmitriy V. Ryaboy added a comment -

        Right, I forgot. We could push this into PigInputFormat / RecordReader, instead. Same thing.

        Show
        Dmitriy V. Ryaboy added a comment - Right, I forgot. We could push this into PigInputFormat / RecordReader, instead. Same thing.
        Hide
        Alan Gates added a comment -

        Canceling patch as we still seem to be debating the best route forward for this.

        Show
        Alan Gates added a comment - Canceling patch as we still seem to be debating the best route forward for this.

          People

          • Assignee:
            Jie Li
            Reporter:
            Jie Li
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:

              Development