Pig
  1. Pig
  2. PIG-2652

Skew join and order by don't trigger reducer estimation

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.11
    • Component/s: None
    • Labels:
      None
    • Release Note:
      Fix how reducers are estimated for skew join and order operators.

      Description

      If neither PARALLEL, default parallel or mapred.reduce.tasks are set, the number of reducers is not estimated based on input size for skew joins or order by. Instead, these jobs get only 1 reducer.

      1. PIG-2652_1.patch
        0.8 kB
        Bill Graham
      2. PIG-2652_2.patch
        2 kB
        Daniel Dai
      3. PIG-2652_3_10.patch
        9 kB
        Daniel Dai
      4. PIG-2652_3.patch
        10 kB
        Daniel Dai
      5. PIG-2652_4.patch
        9 kB
        Dmitriy V. Ryaboy
      6. PIG-2652_5.patch
        11 kB
        Dmitriy V. Ryaboy
      7. PIG-2652_6.patch
        15 kB
        Dmitriy V. Ryaboy
      8. PIG-2652_7.patch
        24 kB
        Dmitriy V. Ryaboy

        Issue Links

          Activity

          Hide
          Rohini Palaniswamy added a comment -

          Correcting the fixed version. Removing 0.9.3 and 0.10.1

          Show
          Rohini Palaniswamy added a comment - Correcting the fixed version. Removing 0.9.3 and 0.10.1
          Hide
          Jie Li added a comment -

          Then the skewed keys will only go to some of the reducers, and ideally we want to distribute them across all reducers right? I can fix it in PIG-2779.

          Show
          Jie Li added a comment - Then the skewed keys will only go to some of the reducers, and ideally we want to distribute them across all reducers right? I can fix it in PIG-2779 .
          Hide
          Dmitriy V. Ryaboy added a comment -

          I believe the non-skewed keys will get hashed to all reducers, not just those used for sampling.

          Show
          Dmitriy V. Ryaboy added a comment - I believe the non-skewed keys will get hashed to all reducers, not just those used for sampling.
          Hide
          Jie Li added a comment -

          Still estimating based on the left side only, but running using an estimate for both left and right for skewed joins.

          Does this means the skew join will allocate more reducers than what the sampler assumes, and those extra reducers will have nothing to do?

          Show
          Jie Li added a comment - Still estimating based on the left side only, but running using an estimate for both left and right for skewed joins. Does this means the skew join will allocate more reducers than what the sampler assumes, and those extra reducers will have nothing to do?
          Hide
          Daniel Dai added a comment -

          I don't feel it's too hard to fix. I can take a look.

          Show
          Daniel Dai added a comment - I don't feel it's too hard to fix. I can take a look.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Created PIG-2675.
          I am not sure I want to commit to doing that as a blocker for 0.10.1. It's a very small job, there are more fundamental improvements to be had for the same amount of effort, I think. But we should still do it at some point.

          Show
          Dmitriy V. Ryaboy added a comment - Created PIG-2675 . I am not sure I want to commit to doing that as a blocker for 0.10.1. It's a very small job, there are more fundamental improvements to be had for the same amount of effort, I think. But we should still do it at some point.
          Hide
          Daniel Dai added a comment -

          If we can fix the extra job before 0.10.1 release, I am fine to commit it.

          Show
          Daniel Dai added a comment - If we can fix the extra job before 0.10.1 release, I am fine to commit it.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Committed to 0.11

          Can I commit to 0.10.1 or do you feel the extra MR job is too much?

          Show
          Dmitriy V. Ryaboy added a comment - Committed to 0.11 Can I commit to 0.10.1 or do you feel the extra MR job is too much?
          Hide
          Daniel Dai added a comment -

          Unit tests pass. +1 for commit. But we need to fix the extra job issue before next release.

          Show
          Daniel Dai added a comment - Unit tests pass. +1 for commit. But we need to fix the extra job issue before next release.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Fixed tests. Please try again.

          Show
          Dmitriy V. Ryaboy added a comment - Fixed tests. Please try again.
          Hide
          Daniel Dai added a comment -

          Yes, I get the following failures, most are not surprising:
          TestExampleGenerator.testLimit
          TestJobSubmission.testReducerNumEstimationForOrderBy
          TestPigRunner.orderByTest
          TestPigStats.testPigStatsAlias

          Show
          Daniel Dai added a comment - Yes, I get the following failures, most are not surprising: TestExampleGenerator.testLimit TestJobSubmission.testReducerNumEstimationForOrderBy TestPigRunner.orderByTest TestPigStats.testPigStatsAlias
          Hide
          Dmitriy V. Ryaboy added a comment -

          Daniel, did you get a chance to run this?

          Show
          Dmitriy V. Ryaboy added a comment - Daniel, did you get a chance to run this?
          Hide
          Daniel Dai added a comment -

          Yes, I will run the test.

          Show
          Daniel Dai added a comment - Yes, I will run the test.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Actually, it passes if I clean the environment. TestSkewedJoin also passed. I spot-checked a few others and they pass as well. A grep through tests and golden files didn't show anything that's specifically testing the added (or not-added) limiter job, just that the results are correct.

          Currently test-commit fails on trunk for me on hudson (org.apache.pig.test.TestStore.testSuccessFileCreation1 fails), so running the whole test suite doesn't seem possible.. do you have a more stable testing environment you could try this on?

          Show
          Dmitriy V. Ryaboy added a comment - Actually, it passes if I clean the environment. TestSkewedJoin also passed. I spot-checked a few others and they pass as well. A grep through tests and golden files didn't show anything that's specifically testing the added (or not-added) limiter job, just that the results are correct. Currently test-commit fails on trunk for me on hudson (org.apache.pig.test.TestStore.testSuccessFileCreation1 fails), so running the whole test suite doesn't seem possible.. do you have a more stable testing environment you could try this on?
          Hide
          Dmitriy V. Ryaboy added a comment -

          Unsurprisingly, org.apache.pig.test.TestLimitAdjuster failed. I'll adjust tests as appropriate.

          Show
          Dmitriy V. Ryaboy added a comment - Unsurprisingly, org.apache.pig.test.TestLimitAdjuster failed. I'll adjust tests as appropriate.
          Hide
          Dmitriy V. Ryaboy added a comment -

          I haven't had any luck getting hudson to actually finish a full unit test run.. but let me try to set that up again.

          Show
          Dmitriy V. Ryaboy added a comment - I haven't had any luck getting hudson to actually finish a full unit test run.. but let me try to set that up again.
          Hide
          Daniel Dai added a comment -

          Agree it's more important to fix big job. Did you run through unit test? I am afraid some tests will fail due to the extra job.

          Show
          Daniel Dai added a comment - Agree it's more important to fix big job. Did you run through unit test? I am afraid some tests will fail due to the extra job.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Ok, attaching latest.

          This always adds the limiting MR job, even though it's not always strictly necessary. I feel that getting number of reducers estimated well for big jobs is more important than saving a tiny MR job (and this job will be tiny – if it's unnecessary, by definition all that it does is read a LIMIT-ed number of rows, and output them back out).

          We will create a separate ticket to apply an optimization that eliminates this extra limit job when possible.

          Show
          Dmitriy V. Ryaboy added a comment - Ok, attaching latest. This always adds the limiting MR job, even though it's not always strictly necessary. I feel that getting number of reducers estimated well for big jobs is more important than saving a tiny MR job (and this job will be tiny – if it's unnecessary, by definition all that it does is read a LIMIT-ed number of rows, and output them back out). We will create a separate ticket to apply an optimization that eliminates this extra limit job when possible.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Ok, not that simple. The adjuster messes with the inputs / outputs of the limiting job in pretty complex ways, so one would have to unroll all of that before running the pre-limit job.

          Also, apparently one can't simply run the adjuster inside the JobControlCompiler – it does correctly add a new job, but that job fails with

          12/04/22 17:27:31 INFO mapred.TaskInProgress: Error from attempt_20120422172532429_0004_m_000000_0: org.apache.pig.backend.executionengine.ExecException: ERROR 2044: The type null cannot be collected as a Key type
          

          Moreover, the extra job is not accounted for in stats, progress, etc.

          Looking at how we can do this better.

          Show
          Dmitriy V. Ryaboy added a comment - Ok, not that simple. The adjuster messes with the inputs / outputs of the limiting job in pretty complex ways, so one would have to unroll all of that before running the pre-limit job. Also, apparently one can't simply run the adjuster inside the JobControlCompiler – it does correctly add a new job, but that job fails with 12/04/22 17:27:31 INFO mapred.TaskInProgress: Error from attempt_20120422172532429_0004_m_000000_0: org.apache.pig.backend.executionengine.ExecException: ERROR 2044: The type null cannot be collected as a Key type Moreover, the extra job is not accounted for in stats, progress, etc. Looking at how we can do this better.
          Hide
          Daniel Dai added a comment -

          Yes, it's doable. We can mark the job SKIP when we find the job is no longer needed. When JobControlCompiler see a SKIP job, simply discard.

          Show
          Daniel Dai added a comment - Yes, it's doable. We can mark the job SKIP when we find the job is no longer needed. When JobControlCompiler see a SKIP job, simply discard.
          Hide
          Dmitriy V. Ryaboy added a comment -

          I see.
          I think the way to solve this is to always produce the third job in physical compilation, and remove it if it's not necessary in LimitAdjuster as not running this is an optimization (correctness won't suffer from running the extra MR). Agreed?

          Show
          Dmitriy V. Ryaboy added a comment - I see. I think the way to solve this is to always produce the third job in physical compilation, and remove it if it's not necessary in LimitAdjuster as not running this is an optimization (correctness won't suffer from running the extra MR). Agreed?
          Hide
          Daniel Dai added a comment -

          Seems it still does not solve the LimitAdjuster issue. Imagine the following script:

          A = load '1.txt';
          B = order A by a0;
          C = limit B 100;
          dump C;
          

          It will generate 2 jobs, sampler job and order by job. When we launch the first job, we check the size of input file, and realize we need N>1 reducer, so we adjust both jobs to set #reducer to N. But since there is a limit operator beneath, it then needs to add a third job with #reducer=1 to impose the limit 100. LimitAdjuster is assumed to add the third job, but it runs before JobControlCompiler, so it cannot see #reducers=N.

          There is a testcase TestEvalPipeline2.testLimitAutoReducer, and it fails because of the above reason.

          Show
          Daniel Dai added a comment - Seems it still does not solve the LimitAdjuster issue. Imagine the following script: A = load '1.txt'; B = order A by a0; C = limit B 100; dump C; It will generate 2 jobs, sampler job and order by job. When we launch the first job, we check the size of input file, and realize we need N>1 reducer, so we adjust both jobs to set #reducer to N. But since there is a limit operator beneath, it then needs to add a third job with #reducer=1 to impose the limit 100. LimitAdjuster is assumed to add the third job, but it runs before JobControlCompiler, so it cannot see #reducers=N. There is a testcase TestEvalPipeline2.testLimitAutoReducer, and it fails because of the above reason.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Ok this works, I think. TestSampleOptimizer passes. I don't have the infra set up at the moment to run the rest (currently running TestFRJoin on my laptop..). Manual skewed join job run did the right thing.

          Still estimating based on the left side only, but running using an estimate for both left and right for skewed joins.

          Show
          Dmitriy V. Ryaboy added a comment - Ok this works, I think. TestSampleOptimizer passes. I don't have the infra set up at the moment to run the rest (currently running TestFRJoin on my laptop..). Manual skewed join job run did the right thing. Still estimating based on the left side only, but running using an estimate for both left and right for skewed joins.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Visitor is back in . Duh. Patch coming.

          Show
          Dmitriy V. Ryaboy added a comment - Visitor is back in . Duh. Patch coming.
          Hide
          Dmitriy V. Ryaboy added a comment -

          note that I completely yanked that visitor. Don't think we need it anymore.

          Show
          Dmitriy V. Ryaboy added a comment - note that I completely yanked that visitor. Don't think we need it anymore.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Ok I have a much cleaner version of the patch, but it still suffers from a critical flaw – the sampler needs to know the number of buckets to get samples for, and if it's not explicitly specified, and the SampleOptimizer stuff doesn't kick in, it still gets a single bucket to "distribute" skewed keys between.

          I think we have to go ahead with the approach of always sampling for a large number of buckets (10000?), and distributing them evenly in the partitioner.

          Daniel – comments? Will attach my work-in-progress patch.

          Show
          Dmitriy V. Ryaboy added a comment - Ok I have a much cleaner version of the patch, but it still suffers from a critical flaw – the sampler needs to know the number of buckets to get samples for, and if it's not explicitly specified, and the SampleOptimizer stuff doesn't kick in, it still gets a single bucket to "distribute" skewed keys between. I think we have to go ahead with the approach of always sampling for a large number of buckets (10000?), and distributing them evenly in the partitioner. Daniel – comments? Will attach my work-in-progress patch.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Could we make the Limit job a special case of MapReduce job (extends basemr) that, prior to kicking off, checks the number of input files and quietly returns if there is only a single file? Then we can always add it without performance overhead.

          Show
          Dmitriy V. Ryaboy added a comment - Could we make the Limit job a special case of MapReduce job (extends basemr) that, prior to kicking off, checks the number of input files and quietly returns if there is only a single file? Then we can always add it without performance overhead.
          Hide
          Daniel Dai added a comment -

          Here is LimitAdjuster does: If the last job has more than 1 reducer, and it contains a limit, we will add a final MR job with 1 reducer. Otherwise we get N records per reducer rather than N records total. LimitAdjuster depends on #reducer of the MR Job.

          Show
          Daniel Dai added a comment - Here is LimitAdjuster does: If the last job has more than 1 reducer, and it contains a limit, we will add a final MR job with 1 reducer. Otherwise we get N records per reducer rather than N records total. LimitAdjuster depends on #reducer of the MR Job.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Agreed with unlinking from 0.10, this is clearly becoming a major patch rather than a minor one. 0.10.1, maybe.. crossing fingers. We should document this for 0.10, at least.

          Interesting about LimitAdjuster. Separate jira or do you think we can kill both birds with one stone?

          I really think avoiding having to know # of reducers in any optimizers will serve us better in the long term. Can LimitAdjuster be done without this knowledge?

          Re: size estimation for skewed join, yes, I mean the "big" table – except it's not the big table, it's the one with data skew. The other table might be the same size, or even bigger!

          Show
          Dmitriy V. Ryaboy added a comment - Agreed with unlinking from 0.10, this is clearly becoming a major patch rather than a minor one. 0.10.1, maybe.. crossing fingers. We should document this for 0.10, at least. Interesting about LimitAdjuster. Separate jira or do you think we can kill both birds with one stone? I really think avoiding having to know # of reducers in any optimizers will serve us better in the long term. Can LimitAdjuster be done without this knowledge? Re: size estimation for skewed join, yes, I mean the "big" table – except it's not the big table, it's the one with data skew. The other table might be the same size, or even bigger!
          Hide
          Daniel Dai added a comment -

          Also, I try to starting rolling 0.10.0 tomorrow, may we unlink it from 0.10.0?

          Show
          Daniel Dai added a comment - Also, I try to starting rolling 0.10.0 tomorrow, may we unlink it from 0.10.0?
          Hide
          Daniel Dai added a comment -

          Do you mean in skewed join, we only estimate the size based on the big table? I can see that, and it happens even before the patch. But since usually the skewed side is larger, so this might be acceptable.

          I find another issue however, some rules such as LimitAdjuster depends on the right #reducer, and those rules are triggered before job launch. Seems we need a hook to adjust plan after finishing one job.

          Show
          Daniel Dai added a comment - Do you mean in skewed join, we only estimate the size based on the big table? I can see that, and it happens even before the patch. But since usually the skewed side is larger, so this might be acceptable. I find another issue however, some rules such as LimitAdjuster depends on the right #reducer, and those rules are triggered before job launch. Seems we need a hook to adjust plan after finishing one job.
          Hide
          Dmitriy V. Ryaboy added a comment -

          I've verified that the same bug (undercounting the input when estimating reducers) is in effect on trunk, when SampleOptimizer is able to estimate reducers.

          Starting to uncover quite a few issues in skewed join implementation.. for example even if I explicitly set parallelism to 56, and have a half-dozen unique values in the skewed relation, the output is only split across 16 reducers.

          Show
          Dmitriy V. Ryaboy added a comment - I've verified that the same bug (undercounting the input when estimating reducers) is in effect on trunk, when SampleOptimizer is able to estimate reducers. Starting to uncover quite a few issues in skewed join implementation.. for example even if I explicitly set parallelism to 56, and have a half-dozen unique values in the skewed relation, the output is only split across 16 reducers.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Spent some time debugging my refactoring and decided maybe there's a bug in your patch, Daniel. As written, we look at the inputs to the sampling job and estimating reducers for the successor based on those inputs. However, the successor actually has two inputs – the sampled dataset, and the second joined relation. That means the earlier estimate is incorrect.

          I tried running the estimator on the post-sample job, but there doesn't seem to be a way to connect the plan to its predecessor – the plan passed in is already trimmed at the top. I'll try the following instead: identify a sampling job's children, and set them aside somewhere; then check against the saved list of known post-sample jobs and re-run the estimator for them if parallelism is set to 1.

          Show
          Dmitriy V. Ryaboy added a comment - Spent some time debugging my refactoring and decided maybe there's a bug in your patch, Daniel. As written, we look at the inputs to the sampling job and estimating reducers for the successor based on those inputs. However, the successor actually has two inputs – the sampled dataset, and the second joined relation. That means the earlier estimate is incorrect. I tried running the estimator on the post-sample job, but there doesn't seem to be a way to connect the plan to its predecessor – the plan passed in is already trimmed at the top. I'll try the following instead: identify a sampling job's children, and set them aside somewhere; then check against the saved list of known post-sample jobs and re-run the estimator for them if parallelism is set to 1.
          Hide
          Daniel Dai added a comment -

          The same patch for 0.10 branch. One side affect is explain will see different number of reducer cuz intermediate file is not available at explain time.

          Show
          Daniel Dai added a comment - The same patch for 0.10 branch. One side affect is explain will see different number of reducer cuz intermediate file is not available at explain time.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Awesome, Daniel, thanks.
          This last version looks much less fragile, and more easy to understand.

          I will test (more thoroughly this time, maybe add a test case or two even..).

          Stylistically, I'd prefer the code that specifically deals with sampling jobs to be moved out into its own function called from adjustNumReducers. I'll post my version once I'm done with tests (and taxes...).

          Show
          Dmitriy V. Ryaboy added a comment - Awesome, Daniel, thanks. This last version looks much less fragile, and more easy to understand. I will test (more thoroughly this time, maybe add a test case or two even..). Stylistically, I'd prefer the code that specifically deals with sampling jobs to be moved out into its own function called from adjustNumReducers. I'll post my version once I'm done with tests (and taxes...).
          Hide
          Dmitriy V. Ryaboy added a comment -

          oh yeah.. we should reopen this

          Show
          Dmitriy V. Ryaboy added a comment - oh yeah.. we should reopen this
          Hide
          Daniel Dai added a comment -

          SampleOptimizer is not a good place to do #reducer adjust. It is done once before the job launch, intermediate file is not yet generated, there is no way to get the size of job input. We shall move this logic to JobControlCompiler. Attach PIG-2652_3.patch.

          Show
          Daniel Dai added a comment - SampleOptimizer is not a good place to do #reducer adjust. It is done once before the job launch, intermediate file is not yet generated, there is no way to get the size of job input. We shall move this logic to JobControlCompiler. Attach PIG-2652 _3.patch.
          Hide
          Daniel Dai added a comment -

          I agree SampleOptimizer did more than it suppose to be, better to separate into two rule.

          As Bill observes, the rule does not proceed because some precondition fail. For now we can adjust the precondition check to solve some problem. I attach a patch for it. It solves Dmitriy's test case, however, Bill's test case is more involved. It is also related to plan merge of MultQuery. If I rewrite the query to get rid of the alias reuse, I can make it work:

          L = LOAD '1.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray);
          LN = LOAD '1.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray);
          R = LOAD '2.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray);
          
          L2 = FILTER L BY ((int)age > 0);
          UNIONED = UNION LN, L2;
          JOINED = JOIN UNIONED BY owner, R BY owner USING 'skewed';
          
          dump JOINED;
          
          Show
          Daniel Dai added a comment - I agree SampleOptimizer did more than it suppose to be, better to separate into two rule. As Bill observes, the rule does not proceed because some precondition fail. For now we can adjust the precondition check to solve some problem. I attach a patch for it. It solves Dmitriy's test case, however, Bill's test case is more involved. It is also related to plan merge of MultQuery. If I rewrite the query to get rid of the alias reuse, I can make it work: L = LOAD '1.txt' AS (owner:chararray,pet:chararray,age: int ,phone:chararray); LN = LOAD '1.txt' AS (owner:chararray,pet:chararray,age: int ,phone:chararray); R = LOAD '2.txt' AS (owner:chararray,pet:chararray,age: int ,phone:chararray); L2 = FILTER L BY (( int )age > 0); UNIONED = UNION LN, L2; JOINED = JOIN UNIONED BY owner, R BY owner USING 'skewed'; dump JOINED;
          Hide
          Dmitriy V. Ryaboy added a comment -

          Looks like the third rule isn't correct either.

          The problems seems to be that the SampleOptimizer used to do one thing, but now does (at least) two things. It used to remove an unnecessary MR job, as described in the class javadoc. As of PIG-1642, though, it's also responsible for reducer estimation. However, that optimization is not always possible – which means reducer estimation also doesn't happen.

          I think we should separate the two functionalities, either by reworking the SampleOptimizer code, or changing how WeightedPartitioner works. The former is less intrusive, the latter is probably a more architecturally sound solution.

          Opinions?

          Show
          Dmitriy V. Ryaboy added a comment - Looks like the third rule isn't correct either. The problems seems to be that the SampleOptimizer used to do one thing, but now does (at least) two things. It used to remove an unnecessary MR job, as described in the class javadoc. As of PIG-1642 , though, it's also responsible for reducer estimation. However, that optimization is not always possible – which means reducer estimation also doesn't happen. I think we should separate the two functionalities, either by reworking the SampleOptimizer code, or changing how WeightedPartitioner works. The former is less intrusive, the latter is probably a more architecturally sound solution. Opinions?
          Hide
          Bill Graham added a comment -

          FYI, here's my script that reproduces with an initial Map-only job:

          L = LOAD 'data1.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray);
          R = LOAD 'data2.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray);
          
          L2 = FILTER L BY ((int)age > 0);
          UNIONED = UNION L, L2;
          JOINED = JOIN UNIONED BY owner, R BY owner USING 'skewed';
          
          STORE JOINED INTO 'tmp/skew_join_union';
          
          Show
          Bill Graham added a comment - FYI, here's my script that reproduces with an initial Map-only job: L = LOAD 'data1.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray); R = LOAD 'data2.txt' AS (owner:chararray,pet:chararray,age:int,phone:chararray); L2 = FILTER L BY ((int)age > 0); UNIONED = UNION L, L2; JOINED = JOIN UNIONED BY owner, R BY owner USING 'skewed'; STORE JOINED INTO 'tmp/skew_join_union';
          Hide
          Bill Graham added a comment -

          I was able to reproduce with a similar script that didn't have a reducer in the first MR job. The code in questions is this block in SampleOptimizer. It returns in the second conditional with Predecessor should be a root of the plan before reducers can be estimated.

          noformat
          // Get this job's predecessor. There should be exactly one.;
          List<MapReduceOper> preds = mPlan.getPredecessors(mr);
          if (preds.size() != 1)

          { log.debug("Too many predecessors to sampling job."); return; }

          MapReduceOper pred = preds.get(0);

          // The predecessor should be a root.
          List<MapReduceOper> predPreds = mPlan.getPredecessors(pred);
          if (predPreds != null && predPreds.size() > 0)

          { log.debug("Predecessor should be a root of the plan"); return; }

          // The predecessor should have just a load and store in the map, and nothing
          // in the combine or reduce.
          if ( !(pred.reducePlan.isEmpty() && pred.combinePlan.isEmpty()))

          { log.debug("Predecessor has a combine or reduce plan"); return; }

          noformat

          Show
          Bill Graham added a comment - I was able to reproduce with a similar script that didn't have a reducer in the first MR job. The code in questions is this block in SampleOptimizer . It returns in the second conditional with Predecessor should be a root of the plan before reducers can be estimated. noformat // Get this job's predecessor. There should be exactly one.; List<MapReduceOper> preds = mPlan.getPredecessors(mr); if (preds.size() != 1) { log.debug("Too many predecessors to sampling job."); return; } MapReduceOper pred = preds.get(0); // The predecessor should be a root. List<MapReduceOper> predPreds = mPlan.getPredecessors(pred); if (predPreds != null && predPreds.size() > 0) { log.debug("Predecessor should be a root of the plan"); return; } // The predecessor should have just a load and store in the map, and nothing // in the combine or reduce. if ( !(pred.reducePlan.isEmpty() && pred.combinePlan.isEmpty())) { log.debug("Predecessor has a combine or reduce plan"); return; } noformat
          Hide
          Dmitriy V. Ryaboy added a comment -

          Ok, I have a test case. Estimation isn't triggered when the skewed join is preceded by another join (and perhaps anything else that has a reduce phase?).

          Try this script:

          -- lower this so that multiple reducers are forced
          set pig.exec.reducers.bytes.per.reducer 118024;
          
          x = load 'tmp/camac10/part*' as (foo:chararray);
          y = load 'tmp/camac10/part*' as (foo:chararray);
          x2 = load 'tmp/camac10/part*' as (bar:chararray);
          x = join x by $0, x2 by $0;
          z = join x by $0, y by $0;
          store z into 'tmp/x11';
          

          With both joins being regular hash joins, the stats look like this:

          JobId	Maps	Reduces	MaxMapTime	MinMapTIme	AvgMapTime	MaxReduceTime	MinReduceTime	AvgReduceTime	Alias	Feature	Outputs
          job_201204041958_154682	2	12	6	6	6	19	19	19	x2,x	HASH_JOIN	
          job_201204041958_154690	2	9	6	6	6	19	19	19	y,z	HASH_JOIN	hdfs://hadoop-nn/user/dmitriy/tmp/x11,
          

          Note that 9 reducers were used by the second join.

          Now let's make the second join skewed (just add "using 'skewed'" to the second join statement).
          New stats:

          JobId	Maps	Reduces	MaxMapTime	MinMapTIme	AvgMapTime	MaxReduceTime	MinReduceTime	AvgReduceTime	Alias	Feature	Outputs
          job_201204041958_154662	2	12	6	6	6	26	19	23	x2,x	HASH_JOIN	
          job_201204041958_154664	1	1	6	6	6	19	19	19		SAMPLER	
          job_201204041958_154667	2	1	6	6	6	19	19	19	y	SKEWED_JOIN	hdfs://hadoop-nn/user/dmitriy/tmp/x10,
          

          (by the way – I now notice that the z alias doesn't show up..).

          Note a single reducer being used for the skewed join job.

          Here's the plan:

          #--------------------------------------------------
          # Map Reduce Plan                                  
          #--------------------------------------------------
          MapReduce node scope-32
          Map Plan
          Union[tuple] - scope-33
          |
          |---x: Local Rearrange[tuple]{chararray}(false) - scope-14
          |   |   |
          |   |   Project[chararray][0] - scope-15
          |   |
          |   |---x: New For Each(false)[bag] - scope-4
          |       |   |
          |       |   Cast[chararray] - scope-2
          |       |   |
          |       |   |---Project[bytearray][0] - scope-1
          |       |
          |       |---x: Load(hdfs://hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage) - scope-0
          |
          |---x: Local Rearrange[tuple]{chararray}(false) - scope-16
              |   |
              |   Project[chararray][0] - scope-17
              |
              |---x2: New For Each(false)[bag] - scope-9
                  |   |
                  |   Cast[chararray] - scope-7
                  |   |
                  |   |---Project[bytearray][0] - scope-6
                  |
                  |---x2: Load(hdfs://hadoop-dw-nn.smf1.twitter.com/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage) - scope-5--------
          Reduce Plan
          Store(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage) - scope-35
          |
          |---POJoinPackage(true,true)[tuple] - scope-64--------
          Global sort: false
          ----------------
          
          MapReduce node scope-39
          Map Plan
          Local Rearrange[tuple]{tuple}(false) - scope-42
          |   |
          |   Constant(all) - scope-41
          |
          |---New For Each(true,true)[tuple] - scope-40
              |   |
              |   Project[chararray][0] - scope-26
              |   |
              |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-37
              |   |
              |   |---Project[tuple][*] - scope-36
              |
              |---Load(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.builtin.PoissonSampleLoader('org.apache.pig.impl.io.InterStorage','100')) - scope-38--------
          Reduce Plan
          Store(hdfs://hadoop-nn/tmp/temp1728845767/tmp-1828327704:org.apache.pig.impl.io.InterStorage) - scope-51
          |
          |---New For Each(false)[tuple] - scope-50
              |   |
              |   POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - scope-49
              |   |
              |   |---Project[tuple][*] - scope-48
              |
              |---New For Each(false,false)[tuple] - scope-47
                  |   |
                  |   Constant(1) - scope-46
                  |   |
                  |   Project[bag][1] - scope-44
                  |
                  |---Package[tuple]{chararray} - scope-43--------
          Global sort: false
          Secondary sort: true
          ----------------
          
          MapReduce node scope-57
          Map Plan
          Union[tuple] - scope-58
          |
          |---Local Rearrange[tuple]{chararray}(false) - scope-54
          |   |   |
          |   |   Project[chararray][0] - scope-26
          |   |
          |   |---Load(hdfs://hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage) - scope-52
          |
          |---Partition rearrange [bag]{chararray}(false) - scope-55
              |   |
              |   Project[chararray][0] - scope-27
              |
              |---y: New For Each(false)[bag] - scope-25
                  |   |
                  |   Cast[chararray] - scope-23
                  |   |
                  |   |---Project[bytearray][0] - scope-22
                  |
                  |---y: Load(hdfs://hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage) - scope-21--------
          Reduce Plan
          z: Store(hdfs://hadoop-nn/user/dmitriy/tmp/x11:org.apache.pig.builtin.PigStorage) - scope-29
          |
          |---POJoinPackage(true,true)[tuple] - scope-66--------
          Global sort: false
          ----------------
          

          Daniel, what do you think about fixing this in SampleOptimizer vs making the multi-partition change I proposed (make it unnecessary to push the constant around, always generate stats for a large number of partitions and distributed them in WeightedPartitioner)?

          Show
          Dmitriy V. Ryaboy added a comment - Ok, I have a test case. Estimation isn't triggered when the skewed join is preceded by another join (and perhaps anything else that has a reduce phase?). Try this script: -- lower this so that multiple reducers are forced set pig.exec.reducers.bytes.per.reducer 118024; x = load 'tmp/camac10/part*' as (foo:chararray); y = load 'tmp/camac10/part*' as (foo:chararray); x2 = load 'tmp/camac10/part*' as (bar:chararray); x = join x by $0, x2 by $0; z = join x by $0, y by $0; store z into 'tmp/x11'; With both joins being regular hash joins, the stats look like this: JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs job_201204041958_154682 2 12 6 6 6 19 19 19 x2,x HASH_JOIN job_201204041958_154690 2 9 6 6 6 19 19 19 y,z HASH_JOIN hdfs: //hadoop-nn/user/dmitriy/tmp/x11, Note that 9 reducers were used by the second join. Now let's make the second join skewed (just add "using 'skewed'" to the second join statement). New stats: JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs job_201204041958_154662 2 12 6 6 6 26 19 23 x2,x HASH_JOIN job_201204041958_154664 1 1 6 6 6 19 19 19 SAMPLER job_201204041958_154667 2 1 6 6 6 19 19 19 y SKEWED_JOIN hdfs: //hadoop-nn/user/dmitriy/tmp/x10, (by the way – I now notice that the z alias doesn't show up..). Note a single reducer being used for the skewed join job. Here's the plan: #-------------------------------------------------- # Map Reduce Plan #-------------------------------------------------- MapReduce node scope-32 Map Plan Union[tuple] - scope-33 | |---x: Local Rearrange[tuple]{chararray}( false ) - scope-14 | | | | | Project[chararray][0] - scope-15 | | | |---x: New For Each( false )[bag] - scope-4 | | | | | Cast[chararray] - scope-2 | | | | | |---Project[bytearray][0] - scope-1 | | | |---x: Load(hdfs: //hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage) - scope-0 | |---x: Local Rearrange[tuple]{chararray}( false ) - scope-16 | | | Project[chararray][0] - scope-17 | |---x2: New For Each( false )[bag] - scope-9 | | | Cast[chararray] - scope-7 | | | |---Project[bytearray][0] - scope-6 | |---x2: Load(hdfs: //hadoop-dw-nn.smf1.twitter.com/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage) - scope-5-------- Reduce Plan Store(hdfs: //hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage) - scope-35 | |---POJoinPackage( true , true )[tuple] - scope-64-------- Global sort: false ---------------- MapReduce node scope-39 Map Plan Local Rearrange[tuple]{tuple}( false ) - scope-42 | | | Constant(all) - scope-41 | |---New For Each( true , true )[tuple] - scope-40 | | | Project[chararray][0] - scope-26 | | | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-37 | | | |---Project[tuple][*] - scope-36 | |---Load(hdfs: //hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.builtin.PoissonSampleLoader('org.apache.pig.impl.io.InterStorage','100')) - scope-38-------- Reduce Plan Store(hdfs: //hadoop-nn/tmp/temp1728845767/tmp-1828327704:org.apache.pig.impl.io.InterStorage) - scope-51 | |---New For Each( false )[tuple] - scope-50 | | | POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - scope-49 | | | |---Project[tuple][*] - scope-48 | |---New For Each( false , false )[tuple] - scope-47 | | | Constant(1) - scope-46 | | | Project[bag][1] - scope-44 | |---Package[tuple]{chararray} - scope-43-------- Global sort: false Secondary sort: true ---------------- MapReduce node scope-57 Map Plan Union[tuple] - scope-58 | |---Local Rearrange[tuple]{chararray}( false ) - scope-54 | | | | | Project[chararray][0] - scope-26 | | | |---Load(hdfs: //hadoop-nn/tmp/temp1728845767/tmp973408893:org.apache.pig.impl.io.InterStorage) - scope-52 | |---Partition rearrange [bag]{chararray}( false ) - scope-55 | | | Project[chararray][0] - scope-27 | |---y: New For Each( false )[bag] - scope-25 | | | Cast[chararray] - scope-23 | | | |---Project[bytearray][0] - scope-22 | |---y: Load(hdfs: //hadoop-nn/user/dmitriy/tmp/camac10/part*:org.apache.pig.builtin.PigStorage) - scope-21-------- Reduce Plan z: Store(hdfs: //hadoop-nn/user/dmitriy/tmp/x11:org.apache.pig.builtin.PigStorage) - scope-29 | |---POJoinPackage( true , true )[tuple] - scope-66-------- Global sort: false ---------------- Daniel, what do you think about fixing this in SampleOptimizer vs making the multi-partition change I proposed (make it unnecessary to push the constant around, always generate stats for a large number of partitions and distributed them in WeightedPartitioner)?
          Hide
          Dmitriy V. Ryaboy added a comment -

          Rolled back for 0.9.3 and 0.10.

          Thanks for the pointer to SampleOptimizer.

          I'll produce a reproducible test case.

          Show
          Dmitriy V. Ryaboy added a comment - Rolled back for 0.9.3 and 0.10. Thanks for the pointer to SampleOptimizer. I'll produce a reproducible test case.
          Hide
          Daniel Dai added a comment -

          In which context do you see the issue stated in the Jira? Seems order by with PigStorage set the right #reduce even before the patch. Sample job get initial value 1, but get reset in SampleOptimizer with the right value.

          Show
          Daniel Dai added a comment - In which context do you see the issue stated in the Jira? Seems order by with PigStorage set the right #reduce even before the patch. Sample job get initial value 1, but get reset in SampleOptimizer with the right value.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Patching things up in JobControlCompiler, or getting JCC to run before MRCompiler appears to require a significant rewrite.

          Alternate proposal:
          If parallelism is not set explicitly, and no default is specified, set the number of quantiles to pig.exec.reducers.max. WeightedPartitioner will then need to look at its actual parallelism and evenly distribute the (up to max-reducers) quantiles among partitions. We'd need to do something like that anyway if we used LoadFunc-reported histograms, or existing samples, to do the weighted partitioning, instead of running a sampling job every time.

          Thoughts?

          Show
          Dmitriy V. Ryaboy added a comment - Patching things up in JobControlCompiler, or getting JCC to run before MRCompiler appears to require a significant rewrite. Alternate proposal: If parallelism is not set explicitly, and no default is specified, set the number of quantiles to pig.exec.reducers.max. WeightedPartitioner will then need to look at its actual parallelism and evenly distribute the (up to max-reducers) quantiles among partitions. We'd need to do something like that anyway if we used LoadFunc-reported histograms, or existing samples, to do the weighted partitioning, instead of running a sampling job every time. Thoughts?
          Hide
          Dmitriy V. Ryaboy added a comment -

          Figured it out.
          The parallelism from MRCompiler is used to set the number of quantiles that FindQuantiles needs to produce. If we set it to -1, nothing gets generated. We need to somehow do the proper, stat-guided estimation prior to getSamplingJob getting called (or fix it up in JobControlCompiler).

          Show
          Dmitriy V. Ryaboy added a comment - Figured it out. The parallelism from MRCompiler is used to set the number of quantiles that FindQuantiles needs to produce. If we set it to -1, nothing gets generated. We need to somehow do the proper, stat-guided estimation prior to getSamplingJob getting called (or fix it up in JobControlCompiler).
          Hide
          Dmitriy V. Ryaboy added a comment -

          Ok, TestCounters appears to have a problem rooted in this exception:

          
          12/04/14 13:50:10 INFO mapred.TaskInProgress: Error from attempt_20120414134535045_0008_m_000000_0: java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
                  at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner.setConf(WeightedRangePartitioner.java:157)
                  at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:62)
                  at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
                  at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:677)
                  at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
                  at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
                  at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
                  at java.security.AccessController.doPrivileged(Native Method)
                  at javax.security.auth.Subject.doAs(Subject.java:396)
                  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1083)
                  at org.apache.hadoop.mapred.Child.main(Child.java:249)
          Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
                  at java.util.ArrayList.RangeCheck(ArrayList.java:547)
                  at java.util.ArrayList.get(ArrayList.java:322)
                  at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner.convertToArray(WeightedRangePartitioner.java:199)
                  at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner.setConf(WeightedRangePartitioner.java:142)
                  ... 10 more
          
          Show
          Dmitriy V. Ryaboy added a comment - Ok, TestCounters appears to have a problem rooted in this exception: 12/04/14 13:50:10 INFO mapred.TaskInProgress: Error from attempt_20120414134535045_0008_m_000000_0: java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner.setConf(WeightedRangePartitioner.java:157) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:62) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:677) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1083) at org.apache.hadoop.mapred.Child.main(Child.java:249) Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.RangeCheck(ArrayList.java:547) at java.util.ArrayList.get(ArrayList.java:322) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner.convertToArray(WeightedRangePartitioner.java:199) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner.setConf(WeightedRangePartitioner.java:142) ... 10 more
          Hide
          Dmitriy V. Ryaboy added a comment -

          These tests take forever. While I am waiting, Daniel, do you mind attaching the error logs?

          Show
          Dmitriy V. Ryaboy added a comment - These tests take forever. While I am waiting, Daniel, do you mind attaching the error logs?
          Hide
          Dmitriy V. Ryaboy added a comment -

          I am guessing the tests need to be adjusted. Sorry we rushed this out. Looking.

          Show
          Dmitriy V. Ryaboy added a comment - I am guessing the tests need to be adjusted. Sorry we rushed this out. Looking.
          Hide
          Daniel Dai added a comment -

          Get couple of unit test failures:
          [junit] Test org.apache.pig.test.TestCounters FAILED
          [junit] Test org.apache.pig.test.TestEvalPipeline2 FAILED
          [junit] Test org.apache.pig.test.TestFRJoin FAILED
          [junit] Test org.apache.pig.test.TestGrunt FAILED
          [junit] Test org.apache.pig.test.TestJobSubmission FAILED
          [junit] Test org.apache.pig.test.TestJoin FAILED
          [junit] Test org.apache.pig.test.TestLimitVariable FAILED
          [junit] Test org.apache.pig.test.TestMultiQueryLocal FAILED
          [junit] Test org.apache.pig.test.TestPigRunner FAILED
          [junit] Test org.apache.pig.test.TestPigSplit FAILED
          [junit] Test org.apache.pig.test.TestSampleOptimizer FAILED

          Show
          Daniel Dai added a comment - Get couple of unit test failures: [junit] Test org.apache.pig.test.TestCounters FAILED [junit] Test org.apache.pig.test.TestEvalPipeline2 FAILED [junit] Test org.apache.pig.test.TestFRJoin FAILED [junit] Test org.apache.pig.test.TestGrunt FAILED [junit] Test org.apache.pig.test.TestJobSubmission FAILED [junit] Test org.apache.pig.test.TestJoin FAILED [junit] Test org.apache.pig.test.TestLimitVariable FAILED [junit] Test org.apache.pig.test.TestMultiQueryLocal FAILED [junit] Test org.apache.pig.test.TestPigRunner FAILED [junit] Test org.apache.pig.test.TestPigSplit FAILED [junit] Test org.apache.pig.test.TestSampleOptimizer FAILED
          Hide
          Dmitriy V. Ryaboy added a comment -

          Committed to 0.9.3, 0.10.0, 0.11

          Show
          Dmitriy V. Ryaboy added a comment - Committed to 0.9.3, 0.10.0, 0.11
          Hide
          Dmitriy V. Ryaboy added a comment -

          +1.
          This code is quite the mess. At some point we need to refactor it.

          Not running sampling for skewed / merge / order jobs when parallelism is set to 1 should be a separate ticket (I think I filed it already? Maybe just meant to).

          Please commit to 10 and 11.

          Show
          Dmitriy V. Ryaboy added a comment - +1. This code is quite the mess. At some point we need to refactor it. Not running sampling for skewed / merge / order jobs when parallelism is set to 1 should be a separate ticket (I think I filed it already? Maybe just meant to). Please commit to 10 and 11.
          Hide
          Bill Graham added a comment -

          Here's a patch that sets the number of reducers to -1 in MRController for sampled operations if it hasn't been set larger than 1. This will then trigger the reducer estimator in JobControlCompiler.

          A related fix would be to not do the sampling if someone has set number of reducers explicitly to 1.

          Show
          Bill Graham added a comment - Here's a patch that sets the number of reducers to -1 in MRController for sampled operations if it hasn't been set larger than 1. This will then trigger the reducer estimator in JobControlCompiler . A related fix would be to not do the sampling if someone has set number of reducers explicitly to 1.

            People

            • Assignee:
              Dmitriy V. Ryaboy
              Reporter:
              Bill Graham
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development