Pig
  1. Pig
  2. PIG-483

PERFORMANCE: different strategies for large and small order bys

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.2.0
    • Fix Version/s: None
    • Component/s: None

      Description

      Currently pig always does a multi-pass order by where it first determines a distribution for the keys and then orders in a second pass. This avoids the necessity of having a single reducer. However, in cases where the data is small enough to fit into a single reducer, this is inefficient. For small data sets it would be good to realize the small size of the set and do the order by in a single pass with a single reducer.

      This is a candidate project for Google summer of code 2011. More information about the program can be found at http://wiki.apache.org/pig/GSoc2011

      1. PIG-483.1.patch
        10 kB
        Jie Li
      2. PIG-483.0.patch
        6 kB
        Jie Li

        Issue Links

          Activity

          Hide
          Dmitriy V. Ryaboy added a comment -

          I think we all like Bill's proposal, but would like to get your work in, in the meantime, if possible.

          Show
          Dmitriy V. Ryaboy added a comment - I think we all like Bill's proposal, but would like to get your work in, in the meantime, if possible.
          Hide
          Jie Li added a comment -

          Have we agree on this approach yet? I like Bill's proposal but I'm afraid I don't have time now to explore it...

          Show
          Jie Li added a comment - Have we agree on this approach yet? I like Bill's proposal but I'm afraid I don't have time now to explore it...
          Bill Graham made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Bill Graham added a comment -

          Canceling patch until the unit tests are working.

          Show
          Bill Graham added a comment - Canceling patch until the unit tests are working.
          Hide
          Dmitriy V. Ryaboy added a comment -

          bump – Jie, any chance you have some time to work on this?

          Show
          Dmitriy V. Ryaboy added a comment - bump – Jie, any chance you have some time to work on this?
          Hide
          Dmitriy V. Ryaboy added a comment -

          I think Bill's suggestion is cleaner, but in the spirit of not letting the best be an enemy of the good – can we try to get tests to pass with the existing patch Jie wrote?

          Show
          Dmitriy V. Ryaboy added a comment - I think Bill's suggestion is cleaner, but in the spirit of not letting the best be an enemy of the good – can we try to get tests to pass with the existing patch Jie wrote?
          Dmitriy V. Ryaboy made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Assignee Jie Li [ jay23jack ]
          Hide
          Bill Graham added a comment -

          Instead of modifying the job graph at runtime (in this case to swap in a SkipJob), a better approach would be to build the DAG with multiple possible paths of execution, connected by something like a new RuntimeDecision operator. RuntimeDecision decides which subpath should be executed and marks the other path as IGNORED or some other new state. Each subpath would then be connected back to something like a RuntimeDecisionSink operator, which knows how to pipe the data to the next operator.

          The main advantage of this approach would be that we could support other types of non-skip optimizations like join selection. Also, the graph is built with all possible paths represented and is still immutable. This is similar to the approach Hive took AFAIK.

          The multiple paths would also make sense when viewed through Ambrose.

          Show
          Bill Graham added a comment - Instead of modifying the job graph at runtime (in this case to swap in a SkipJob ), a better approach would be to build the DAG with multiple possible paths of execution, connected by something like a new RuntimeDecision operator. RuntimeDecision decides which subpath should be executed and marks the other path as IGNORED or some other new state. Each subpath would then be connected back to something like a RuntimeDecisionSink operator, which knows how to pipe the data to the next operator. The main advantage of this approach would be that we could support other types of non-skip optimizations like join selection. Also, the graph is built with all possible paths represented and is still immutable. This is similar to the approach Hive took AFAIK. The multiple paths would also make sense when viewed through Ambrose.
          Jie Li made changes -
          Attachment PIG-483.1.patch [ 12538634 ]
          Hide
          Jie Li added a comment -

          Now with PIG-2779 fixed, before submitting the sample job we have the runtime #reducers of orderby/skewjoin job. If they're using only one reducer then we can then skip the sample job. Some changes of the orderby/skewjoin in case of skipping sample:

          • do not add to distributed cache the partition file, as there is no such file.
          • do not set the specialized Partitioner, i.e. WeightedRangePartitioner for orderby and SkewedPartitioner for skewjoin
          • for skew join, do not load partition file in POPartitionRearrange.

          We then return the sample job as a SkipJob, whose status is set to successful so JobControl directly puts it in the successful job queue without submitting it. Then the SkipJob is processed just the same as regular jobs.

          Any comment on this approach? Will work on unit tests if it looks good.

          Show
          Jie Li added a comment - Now with PIG-2779 fixed, before submitting the sample job we have the runtime #reducers of orderby/skewjoin job. If they're using only one reducer then we can then skip the sample job. Some changes of the orderby/skewjoin in case of skipping sample: do not add to distributed cache the partition file, as there is no such file. do not set the specialized Partitioner, i.e. WeightedRangePartitioner for orderby and SkewedPartitioner for skewjoin for skew join, do not load partition file in POPartitionRearrange. We then return the sample job as a SkipJob, whose status is set to successful so JobControl directly puts it in the successful job queue without submitting it. Then the SkipJob is processed just the same as regular jobs. Any comment on this approach? Will work on unit tests if it looks good.
          Jie Li made changes -
          Link This issue relates to PIG-2784 [ PIG-2784 ]
          Jie Li made changes -
          Link This issue is blocked by PIG-2779 [ PIG-2779 ]
          Jie Li made changes -
          Link This issue is blocked by PIG-2652 [ PIG-2652 ]
          Jie Li made changes -
          Link This issue is blocked by PIG-2652 [ PIG-2652 ]
          Hide
          Jie Li added a comment -

          Ooops, forgot that the sample job will always use 1 reducer instead of the estimated #reducer, so we don't have the information to decide whether to skip it.

          One option is to add a field in MapReduceOper to store the estimated #reducer?

          Show
          Jie Li added a comment - Ooops, forgot that the sample job will always use 1 reducer instead of the estimated #reducer, so we don't have the information to decide whether to skip it. One option is to add a field in MapReduceOper to store the estimated #reducer?
          Jie Li made changes -
          Attachment PIG-483.0.patch [ 12533767 ]
          Hide
          Jie Li added a comment -

          Attached a patch that introduced SkipJob. The output of order-by on small dataset would look like:

          Job Stats (time in seconds):
          JobId Alias Feature Outputs
          job_local_0001 a MAP_ONLY
          job_local_0002 b ORDER_BY file:/tmp/temp-107984693/tmp2050404975,
          skipped_job b SAMPLER

          Input(s):
          Successfully read records from: "file:///Users/JieLi/git/pig-git/1.txt"

          Output(s):
          Successfully stored records in: "file:/tmp/temp-107984693/tmp2050404975"

          Job DAG:
          job_local_0001 -> skipped_job,
          skipped_job -> job_local_0002,
          job_local_0002

          Show
          Jie Li added a comment - Attached a patch that introduced SkipJob. The output of order-by on small dataset would look like: Job Stats (time in seconds): JobId Alias Feature Outputs job_local_0001 a MAP_ONLY job_local_0002 b ORDER_BY file:/tmp/temp-107984693/tmp2050404975 , skipped_job b SAMPLER Input(s): Successfully read records from: "file:///Users/JieLi/git/pig-git/1.txt" Output(s): Successfully stored records in: "file:/tmp/temp-107984693/tmp2050404975" Job DAG: job_local_0001 -> skipped_job, skipped_job -> job_local_0002, job_local_0002
          Jie Li made changes -
          Link This issue relates to PIG-2772 [ PIG-2772 ]
          Hide
          Jie Li added a comment -

          For the skew join, if the partition table turns out to be small, then we can convert it to a normal join, which doesn't need the sampler either. Would open another jira for that.

          Show
          Jie Li added a comment - For the skew join, if the partition table turns out to be small, then we can convert it to a normal join, which doesn't need the sampler either. Would open another jira for that.
          Hide
          Jie Li added a comment -

          Thanks Bill! Maybe we can implement a SkipJob for the moment? I can have a try.

          Show
          Jie Li added a comment - Thanks Bill! Maybe we can implement a SkipJob for the moment? I can have a try.
          Hide
          Bill Graham added a comment -

          Good point. In the current design the job graph is assumed to be immutable. Here are a few options we can consider:

          1. Change that design to allow modification of the graph at runtime. Ambrose would need to adapt. Ambrose aside, this would probably produce additional complexity to other parts of the Pig execution engine that would need to be worked out.

          2. We could introduce a notion of a skipped job, i.e. one that's been optimized out. This would work in this situation, but wouldn't work if we have future optimizations that add jobs (i.e., auto-detecting skew and changing to a skew join).

          Can anyone comment on the expected complexity of adapting the physical plan to accomodate either of these approached?

          Show
          Bill Graham added a comment - Good point. In the current design the job graph is assumed to be immutable. Here are a few options we can consider: 1. Change that design to allow modification of the graph at runtime. Ambrose would need to adapt. Ambrose aside, this would probably produce additional complexity to other parts of the Pig execution engine that would need to be worked out. 2. We could introduce a notion of a skipped job, i.e. one that's been optimized out. This would work in this situation, but wouldn't work if we have future optimizations that add jobs (i.e., auto-detecting skew and changing to a skew join). Can anyone comment on the expected complexity of adapting the physical plan to accomodate either of these approached?
          Hide
          Jie Li added a comment -

          As Dmitriy pointed out, we need to optimize this at runtime. For order-by, we can simply remove the sampling job.

          One problem is that PigStats has a copy of the job graph before the jobs run, so if we remove any job at runtime, we may need to update the job graph info in PigStats. Will that affect any external tools, like Ambrose?

          Show
          Jie Li added a comment - As Dmitriy pointed out, we need to optimize this at runtime. For order-by, we can simply remove the sampling job. One problem is that PigStats has a copy of the job graph before the jobs run, so if we remove any job at runtime, we may need to update the job graph info in PigStats. Will that affect any external tools, like Ambrose?
          Jie Li made changes -
          Labels gsoc2011 gsoc2011 performance
          Hide
          Dmitriy V. Ryaboy added a comment -

          Note that since parallelism can be determined at runtime, this improvement needs to happen after the plan is compiled, right before the sample job is run.

          Also Note that Skewed Join has the same issue (and in fact, uses the same indexing job...)

          Skewed Join should be converted to a normal join, and order-by should be converted to a naive single-reducer order.

          Show
          Dmitriy V. Ryaboy added a comment - Note that since parallelism can be determined at runtime, this improvement needs to happen after the plan is compiled, right before the sample job is run. Also Note that Skewed Join has the same issue (and in fact, uses the same indexing job...) Skewed Join should be converted to a normal join, and order-by should be converted to a naive single-reducer order.
          Dmitriy V. Ryaboy made changes -
          Link This issue is related to PIG-2675 [ PIG-2675 ]
          Hide
          Daniel Dai added a comment -

          No, the number of reducers are currently determined before you launch Pig jobs. Data distribution does not affects the number of reduces. It is determined by "PARALLEL" statement, default_parallel level, hadoop config entry and input data size. If Pig decide to use only one reduce, there is no need for the sampling job.

          Show
          Daniel Dai added a comment - No, the number of reducers are currently determined before you launch Pig jobs. Data distribution does not affects the number of reduces. It is determined by "PARALLEL" statement, default_parallel level, hadoop config entry and input data size. If Pig decide to use only one reduce, there is no need for the sampling job.
          Hide
          Zubair Nabi added a comment -

          But how can one make the call that the data is small enough to apply a single reduce 'order-by'. As I understand, the distribution helps in proper load-balancing in case of skewed datasets. The first MapReduce pass or sampling is used to built a partitioner and in the second pass, that partitioner is used in conjunction with the order-by key as the grouping key. This ensures that every reduce gets a fair workload. So, without any a-priori knowledge, how can we determine whether we need a two-stage order-by or a single stage order-by with a single reduce?

          Show
          Zubair Nabi added a comment - But how can one make the call that the data is small enough to apply a single reduce 'order-by'. As I understand, the distribution helps in proper load-balancing in case of skewed datasets. The first MapReduce pass or sampling is used to built a partitioner and in the second pass, that partitioner is used in conjunction with the order-by key as the grouping key. This ensures that every reduce gets a fair workload. So, without any a-priori knowledge, how can we determine whether we need a two-stage order-by or a single stage order-by with a single reduce?
          Hide
          Daniel Dai added a comment -

          Here the focus is one single reduce not the size of data. Currently when doing sorting, Pig will sample the data in the first map-reduce job, then doing the sort in the second. However, If we detects order by only use one reduce, sampling is not necessary.

          Show
          Daniel Dai added a comment - Here the focus is one single reduce not the size of data. Currently when doing sorting, Pig will sample the data in the first map-reduce job, then doing the sort in the second. However, If we detects order by only use one reduce, sampling is not necessary.
          Hide
          Zubair Nabi added a comment -

          Hi,

          I would like more information about this project. To determine the small size of the set, is sampling an option?

          Thanks,
          Zubair

          Show
          Zubair Nabi added a comment - Hi, I would like more information about this project. To determine the small size of the set, is sampling an option? Thanks, Zubair
          Daniel Dai made changes -
          Labels gsoc2011
          Description Currently pig always does a multi-pass order by where it first determines a distribution for the keys and then orders in a second pass. This avoids the necessity of having a single reducer. However, in cases where the data is small enough to fit into a single reducer, this is inefficient. For small data sets it would be good to realize the small size of the set and do the order by in a single pass with a single reducer. Currently pig always does a multi-pass order by where it first determines a distribution for the keys and then orders in a second pass. This avoids the necessity of having a single reducer. However, in cases where the data is small enough to fit into a single reducer, this is inefficient. For small data sets it would be good to realize the small size of the set and do the order by in a single pass with a single reducer.

          This is a candidate project for Google summer of code 2011. More information about the program can be found at http://wiki.apache.org/pig/GSoc2011
          Nigel Daley made changes -
          Affects Version/s 0.2.0 [ 12313783 ]
          Affects Version/s 1.0.0 [ 12313288 ]
          Nigel Daley made changes -
          Field Original Value New Value
          Fix Version/s 1.0.0 [ 12313288 ]
          Olga Natkovich created issue -

            People

            • Assignee:
              Jie Li
              Reporter:
              Olga Natkovich
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:

                Development