diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7408a5a..6e5ad50 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -188,7 +188,7 @@ COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false), COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""), COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""), - BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)), + BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1 * 100 * 1000)), MAXREDUCERS("hive.exec.reducers.max", 999), PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index edb55fa..5167e4d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2576,22 +2576,32 @@ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSu // power of two if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) && finalMapRed && !work.getBucketedColsByDirectory().isEmpty()) { + reducers = convertToPowersOfTwo(reducers, maxReducers); + } - int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; - int reducersPowerTwo = (int)Math.pow(2, reducersLog); - - // If the original number of reducers was a power of two, use that - if (reducersPowerTwo / 2 == reducers) { - return reducers; - } else if (reducersPowerTwo > maxReducers) { - // If the next power of two greater than the original number of reducers is greater - // than the max number of reducers, use the preceding power of two, which is strictly - // less than the original number of reducers and hence the max - reducers = reducersPowerTwo / 2; - } else { - // Otherwise use the smallest power of two greater than the original number of reducers - reducers = reducersPowerTwo; - } + return reducers; + } + + /** + * @param reducers - current number of reducers + * @param maxReducers - the maximum bound for converting to a power of 2 + * @return smallest power of 2 greater than current number less than max. + */ + public static int convertToPowersOfTwo(int reducers, int maxReducers) { + int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; + int reducersPowerTwo = (int)Math.pow(2, reducersLog); + + // If the original number of reducers was a power of two, use that + if (reducersPowerTwo / 2 == reducers) { + return reducers; + } else if (reducersPowerTwo > maxReducers) { + // If the next power of two greater than the original number of reducers is greater + // than the max number of reducers, use the preceding power of two, which is strictly + // less than the original number of reducers and hence the max + reducers = reducersPowerTwo / 2; + } else { + // Otherwise use the smallest power of two greater than the original number of reducers + reducers = reducersPowerTwo; } return reducers; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 48145ad..200fb73 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -18,20 +18,25 @@ package org.apache.hadoop.hive.ql.parse; +import java.io.IOException; import java.util.ArrayList; import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezWork; @@ -107,16 +112,43 @@ public Object process(Node nd, Stack stack, assert root.getParentOperators().get(0) instanceof ReduceSinkOperator; ReduceSinkOperator reduceSink = (ReduceSinkOperator)root.getParentOperators().get(0); - reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); - // need to fill in information about the key and value in the reducer GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); - // needs to be fixed in HIVE-5052. This should be driven off of stats - if (reduceWork.getNumReduceTasks() <= 0) { - reduceWork.setNumReduceTasks(1); + /* In order to find the number of reducers, the following algorithm is used: + * if preceding work is a map-work, we estimate the number of reducers based on + * input sizes. Else, it is a reduce-work. This reduce-work should have already had + * an estimate of the number of reducers. We can use that count for this reducer as well. + */ + int numReducers = reduceSink.getConf().getNumReducers(); + if (context.preceedingWork instanceof MapWork) { + try { + ContentSummary inpSummary = Utilities.getInputSummary(context.parseContext.getContext(), + (MapWork)context.preceedingWork, null); + int estimatedReducers = + Utilities.estimateNumberOfReducers(context.parseContext.getConf(), inpSummary, + (MapWork)context.preceedingWork, false); + if (estimatedReducers <= 0) { + estimatedReducers = 1; + } + if (numReducers <= 0) { + numReducers = estimatedReducers; + } else { + // there was a reducer count previously set for this work + numReducers += estimatedReducers; + } + } catch (IOException e) { + throw new SemanticException(e); + } + } else { // preceding work is a reduce-work. so we can use its estimate + numReducers += ((ReduceWork)(context.preceedingWork)).getNumReduceTasks(); } + // in tez we convert #reducers to power of 2 by default. + numReducers = Utilities.convertToPowersOfTwo(numReducers, + context.parseContext.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS)); + reduceWork.setNumReduceTasks(numReducers); + tezWork.add(reduceWork); tezWork.connect( context.preceedingWork, @@ -141,7 +173,7 @@ public Object process(Node nd, Stack stack, BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator); // need to add this branch to the key + value info - assert operator instanceof ReduceSinkOperator + assert operator instanceof ReduceSinkOperator && followingWork instanceof ReduceWork; ReduceSinkOperator rs = (ReduceSinkOperator) operator; ReduceWork rWork = (ReduceWork) followingWork; @@ -175,4 +207,18 @@ public Object process(Node nd, Stack stack, return null; } + /* + * find the number of reducers by going up the chain. + */ + private int getNumberOfReducers(ReduceSinkOperator reduceSink) { + while (reduceSink.getConf().getNumReducers() != -1) { + for (Operator op : reduceSink.getParentOperators()) { + if (op instanceof ReduceSinkOperator) { + + } + } + } + return -1; + } + }