diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7408a5a..d23fbae 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -188,7 +188,7 @@ COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false), COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""), COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""), - BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)), + BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1 * 1 * 1000)), MAXREDUCERS("hive.exec.reducers.max", 999), PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index ca48f5e..399ea3a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext.OpContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -1547,4 +1549,18 @@ static boolean toString(StringBuilder builder, Set visited, Operator } return false; } + + public void computePhysicalInfo(OpProcContext opProcCtx) throws SemanticException { + OpContext opCtx = opProcCtx.new OpContext(); + for (Operator parentOp : this.getParentOperators()) { + // sum up values from parent operators and add that as the context for this op. + OpContext parentCtx = opProcCtx.getOpContextMap().get(parentOp); + if (parentCtx != null) { + opCtx.addBucketedCols(parentCtx.getBucketedCols()); + opCtx.setSize(parentCtx.getSize()); + } + } + + opProcCtx.getOpContextMap().put(this, opCtx); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 6a538e8..8334168 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -335,4 +337,15 @@ public OperatorType getType() { public boolean opAllowedBeforeMapJoin() { return false; } + + @Override + public void computePhysicalInfo(OpProcContext opProcCtx) throws SemanticException { + super.computePhysicalInfo(opProcCtx); + HiveConf conf = opProcCtx.getParseContext().getConf(); + int numReducers = Utilities.estimateReducers(opProcCtx.getOpContextMap().get(this).getSize(), + conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER), + conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS), true); + + getConf().setNumReducers(numReducers); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 6ee13ec..ebec912 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -30,6 +30,9 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext.OpContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -314,4 +317,14 @@ public boolean supportSkewJoinOptimization() { public boolean supportAutomaticSortMergeJoin() { return true; } + + @Override + public void computePhysicalInfo (OpProcContext opProcCtx) throws SemanticException { + long size = 0; + size = Utilities.getTableSize(this, opProcCtx.getParseContext()); + + OpContext opCtx = opProcCtx.new OpContext(); + opCtx.setSize(size); + opProcCtx.getOpContextMap().put(this, opCtx); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index f3c34d1..e616e8f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -104,6 +104,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -116,6 +117,12 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.SamplePruner; +import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPruner; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -124,6 +131,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; @@ -1803,6 +1811,12 @@ public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) { */ public static ContentSummary getInputSummary(Context ctx, MapWork work, PathFilter filter) throws IOException { + return getInputSummary(ctx, work.getPathToAliases(), work.getPathToPartitionInfo(), filter); + } + + public static ContentSummary getInputSummary(Context ctx, + Map> pathToAliases, Map pathToPartInfo, + PathFilter filter) throws IOException { long[] summary = {0, 0, 0}; @@ -1812,7 +1826,7 @@ public static ContentSummary getInputSummary(Context ctx, MapWork work, PathFilt // this method will avoid number of threads out of control. synchronized (INPUT_SUMMARY_LOCK) { // For each input path, calculate the total size. - for (String path : work.getPathToAliases().keySet()) { + for (String path : pathToAliases.keySet()) { Path p = new Path(path); if (filter != null && !filter.accept(p)) { @@ -1868,7 +1882,7 @@ public void interrupt() { // is not correct. final Configuration myConf = conf; final JobConf myJobConf = jobConf; - final PartitionDesc partDesc = work.getPathToPartitionInfo().get( + final PartitionDesc partDesc = pathToPartInfo.get( p.toString()); Runnable r = new Runnable() { public void run() { @@ -2573,11 +2587,13 @@ public static String formatMsecToStr(long msec) { */ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary, MapWork work, boolean finalMapRed) throws IOException { + long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); double samplePercentage = getHighestSamplePercentage(work); - long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage); + long totalInputFileSize = getTotalInputFileSize(inputSummary.getLength(), work, + samplePercentage); // if all inputs are sampled, we should shrink the size of reducers accordingly. if (totalInputFileSize != inputSummary.getLength()) { @@ -2588,18 +2604,27 @@ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSu + maxReducers + " totalInputFileSize=" + totalInputFileSize); } + // If this map reduce job writes final data to a table and bucketing is being inferred, + // and the user has configured Hive to do this, make sure the number of reducers is a + // power of two + boolean powersOfTwo = + conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) && + finalMapRed && !work.getBucketedColsByDirectory().isEmpty(); + + return estimateReducers(totalInputFileSize, bytesPerReducer, maxReducers, powersOfTwo); + } + + public static int estimateReducers(long totalInputFileSize, long bytesPerReducer, + int maxReducers, boolean powersOfTwo) { + int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer); reducers = Math.max(1, reducers); reducers = Math.min(maxReducers, reducers); - // If this map reduce job writes final data to a table and bucketing is being inferred, - // and the user has configured Hive to do this, make sure the number of reducers is a - // power of two - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) && - finalMapRed && !work.getBucketedColsByDirectory().isEmpty()) { + int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; + int reducersPowerTwo = (int)Math.pow(2, reducersLog); - int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; - int reducersPowerTwo = (int)Math.pow(2, reducersLog); + if (powersOfTwo) { // If the original number of reducers was a power of two, use that if (reducersPowerTwo / 2 == reducers) { @@ -2622,14 +2647,13 @@ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSu * Computes the total input file size. If block sampling was used it will scale this * value by the highest sample percentage (as an estimate for input). * - * @param inputSummary + * @param totalInputFileSize the size of the file determined from inputSummary perhaps * @param work * @param highestSamplePercentage * @return estimated total input size for job */ - public static long getTotalInputFileSize (ContentSummary inputSummary, MapWork work, + public static long getTotalInputFileSize(long totalInputFileSize, MapWork work, double highestSamplePercentage) { - long totalInputFileSize = inputSummary.getLength(); if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) { // If percentage block sampling wasn't used, we don't need to do any estimation return totalInputFileSize; @@ -2970,5 +2994,269 @@ private static void createTmpDirs(Configuration conf, } } } + + public static long getTableSize (TableScanOperator tsOp, ParseContext parseCtx) throws SemanticException { + + ArrayList partDir = new ArrayList(); + ArrayList partDesc = new ArrayList(); + HashMap>pathToAlias = new HashMap>(); + HashMappathToPartInfo = new HashMap(); + + /* find the alias corresponding to this table scan op */ + String alias = tsOp.alias; + Iterator iter = parseCtx.getTopOps().keySet().iterator(); + while (iter.hasNext()) { + String tempAlias = (String)iter.next(); + if (parseCtx.getTopOps().get(tempAlias) == tsOp) { + alias = tempAlias; + break; + } + } // found the alias corresponding to the table scan op + + Path tblDir = null; + TableDesc tblDesc = null; + + PrunedPartitionList partsList = null; + Set inputs = parseCtx.getSemanticInputs(); + HiveConf conf = parseCtx.getConf(); + + // find the partitions corresponding to this table scan + if (partsList == null) { + try { + partsList = parseCtx.getOpToPartList().get((TableScanOperator) tsOp); + if (partsList == null) { + // no partitions found so far, find if any exists + partsList = PartitionPruner.prune(parseCtx.getTopToTable().get(tsOp), + parseCtx.getOpToPartPruner().get(tsOp), conf, + alias, parseCtx.getPrunedPartitions()); + parseCtx.getOpToPartList().put((TableScanOperator) tsOp, partsList); + } + } catch (SemanticException e) { + throw e; + } catch (HiveException e) { + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } + } // found corresponding partitions + + Set parts = null; + + // pass both confirmed and unknown partitions through the map-reduce + // framework + parts = partsList.getConfirmedPartns(); + parts.addAll(partsList.getUnknownPartns()); + PartitionDesc aliasPartnDesc = null; + try { + if (!parts.isEmpty()) { + aliasPartnDesc = Utilities.getPartitionDesc(parts.iterator().next()); + } + } catch (HiveException e) { + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } + + // The table does not have any partitions + if (aliasPartnDesc == null) { + aliasPartnDesc = new PartitionDesc(Utilities.getTableDesc(parseCtx + .getTopToTable().get(tsOp)), null); + } + + long sizeNeeded = Integer.MAX_VALUE; + int fileLimit = -1; + if (parseCtx.getGlobalLimitCtx().isEnable()) { + long sizePerRow = HiveConf.getLongVar(parseCtx.getConf(), + HiveConf.ConfVars.HIVELIMITMAXROWSIZE); + sizeNeeded = parseCtx.getGlobalLimitCtx().getGlobalLimit() * sizePerRow; + // for the optimization that reduce number of input file, we limit number + // of files allowed. If more than specific number of files have to be + // selected, we skip this optimization. Since having too many files as + // inputs can cause unpredictable latency. It's not necessarily to be + // cheaper. + fileLimit = + HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVELIMITOPTLIMITFILE); + + if (sizePerRow <= 0 || fileLimit <= 0) { + LOG.info("Skip optimization to reduce input size of 'limit'"); + parseCtx.getGlobalLimitCtx().disableOpt(); + } else if (parts.isEmpty()) { + LOG.info("Empty input: skip limit optimiztion"); + } else { + LOG.info("Try to reduce input size for 'limit' " + + "sizeNeeded: " + sizeNeeded + + " file limit : " + fileLimit); + } + } + boolean isFirstPart = true; + boolean emptyInput = true; + boolean singlePartition = (parts.size() == 1); + + // Track the dependencies for the view. Consider a query like: select * from V; + // where V is a view of the form: select * from T + // The dependencies should include V at depth 0, and T at depth 1 (inferred). + ReadEntity parentViewInfo = GenMapRedUtils.getParentViewInfo(alias, parseCtx.getViewAliasToInput()); + + // The table should also be considered a part of inputs, even if the table is a + // partitioned table and whether any partition is selected or not + PlanUtils.addInput(inputs, + new ReadEntity(parseCtx.getTopToTable().get(tsOp), parentViewInfo)); + + for (Partition part : parts) { + if (part.getTable().isPartitioned()) { + PlanUtils.addInput(inputs, new ReadEntity(part, parentViewInfo)); + } else { + PlanUtils.addInput(inputs, new ReadEntity(part.getTable(), parentViewInfo)); + } + + // Later the properties have to come from the partition as opposed + // to from the table in order to support versioning. + Path[] paths = null; + sampleDesc sampleDescr = parseCtx.getOpToSamplePruner().get(tsOp); + + // Lookup list bucketing pruner + Map partToPruner = parseCtx.getOpToPartToSkewedPruner().get(tsOp); + ExprNodeDesc listBucketingPruner = (partToPruner != null) ? partToPruner.get(part.getName()) + : null; + + if (sampleDescr != null) { + assert (listBucketingPruner == null) : "Sampling and list bucketing can't coexit."; + paths = SamplePruner.prune(part, sampleDescr); + parseCtx.getGlobalLimitCtx().disableOpt(); + } else if (listBucketingPruner != null) { + assert (sampleDescr == null) : "Sampling and list bucketing can't coexist."; + /* Use list bucketing prunner's path. */ + paths = ListBucketingPruner.prune(parseCtx, part, listBucketingPruner); + } else { + // Now we only try the first partition, if the first partition doesn't + // contain enough size, we change to normal mode. + if (parseCtx.getGlobalLimitCtx().isEnable()) { + if (isFirstPart) { + long sizeLeft = sizeNeeded; + ArrayList retPathList = new ArrayList(); + SamplePruner.LimitPruneRetStatus status = SamplePruner.limitPrune(part, sizeLeft, + fileLimit, retPathList); + if (status.equals(SamplePruner.LimitPruneRetStatus.NoFile)) { + continue; + } else if (status.equals(SamplePruner.LimitPruneRetStatus.NotQualify)) { + LOG.info("Use full input -- first " + fileLimit + " files are more than " + + sizeNeeded + + " bytes"); + + parseCtx.getGlobalLimitCtx().disableOpt(); + + } else { + emptyInput = false; + paths = new Path[retPathList.size()]; + int index = 0; + for (Path path : retPathList) { + paths[index++] = path; + } + if (status.equals(SamplePruner.LimitPruneRetStatus.NeedAllFiles) && singlePartition) { + // if all files are needed to meet the size limit, we disable + // optimization. It usually happens for empty table/partition or + // table/partition with only one file. By disabling this + // optimization, we can avoid retrying the query if there is + // not sufficient rows. + parseCtx.getGlobalLimitCtx().disableOpt(); + } + } + isFirstPart = false; + } else { + paths = new Path[0]; + } + } + if (!parseCtx.getGlobalLimitCtx().isEnable()) { + paths = part.getPath(); + } + } + + // is it a partitioned table ? + if (!part.getTable().isPartitioned()) { + assert ((tblDir == null) && (tblDesc == null)); + + tblDir = paths[0]; + tblDesc = Utilities.getTableDesc(part.getTable()); + } else if (tblDesc == null) { + tblDesc = Utilities.getTableDesc(part.getTable()); + } + + for (Path p : paths) { + if (p == null) { + continue; + } + String path = p.toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + path + " of table" + alias); + } + + partDir.add(p); + try { + if (part.getTable().isPartitioned()) { + partDesc.add(Utilities.getPartitionDesc(part)); + } + else { + partDesc.add(Utilities.getPartitionDescFromTableDesc(tblDesc, part)); + } + } catch (HiveException e) { + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new SemanticException(e.getMessage(), e); + } + } + } + if (emptyInput) { + parseCtx.getGlobalLimitCtx().disableOpt(); + } + + Iterator iterPath = partDir.iterator(); + Iterator iterPartnDesc = partDesc.iterator(); + + while (iterPath.hasNext()) { + assert iterPartnDesc.hasNext(); + String path = iterPath.next().toString(); + + PartitionDesc prtDesc = iterPartnDesc.next(); + + // Add the path to alias mapping + if (pathToAlias.get(path) == null) { + pathToAlias.put(path, new ArrayList()); + } + pathToAlias.get(path).add(alias); + pathToPartInfo.put(path, prtDesc); + if (LOG.isDebugEnabled()) { + LOG.debug("Information added for path " + path); + } + } + + MapaliasToSize = new HashMap(); + + try { + Context context = parseCtx.getContext(); + getInputSummary(context, pathToAlias, pathToPartInfo, null); + // go over all the input paths, and calculate a known total size, known + // size for each input alias. + long aliasTotalKnownInputSize = 0L; + for (Map.Entry> entry : pathToAlias.entrySet()) { + String path = entry.getKey(); + List aliasList = entry.getValue(); + ContentSummary cs = context.getCS(path); + if (cs != null) { + long size = cs.getLength(); + for (String name : aliasList) { + aliasTotalKnownInputSize += size; + Long es = aliasToSize.get(name); + if (es == null) { + es = new Long(0); + } + es += size; + aliasToSize.put(name, es); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + throw new SemanticException("Generate Map Join Task Error: " + e.getMessage()); + } + + return aliasToSize.get(alias); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index 49a0ee3..a561031 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -109,7 +108,7 @@ public int execute(DriverContext driverContext) { // set the values of totalInputFileSize and totalInputNumFiles, estimating them // if percentage block sampling is being used double samplePercentage = Utilities.getHighestSamplePercentage(work.getMapWork()); - totalInputFileSize = Utilities.getTotalInputFileSize(inputSummary, work.getMapWork(), samplePercentage); + totalInputFileSize = Utilities.getTotalInputFileSize(inputSummary.getLength(), work.getMapWork(), samplePercentage); totalInputNumFiles = Utilities.getTotalInputNumFiles(inputSummary, work.getMapWork(), samplePercentage); // at this point the number of reducers is precisely defined in the plan @@ -408,7 +407,7 @@ private void setNumberOfReducers() throws IOException { if (inputSummary == null) { inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null); } - int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), + int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), work.isFinalMapRed()); rWork.setNumReduceTasks(reducers); console diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 7433ddc..89f3d1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -398,7 +398,7 @@ public static void setTaskPlan(String alias_id, setTaskPlan(alias_id, topOp, task, local, opProcCtx, null); } - private static ReadEntity getParentViewInfo(String alias_id, + public static ReadEntity getParentViewInfo(String alias_id, Map viewAliasToInput) { String[] aliases = alias_id.split(":"); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/OpProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/OpProcessor.java new file mode 100644 index 0000000..8ba4dd5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/OpProcessor.java @@ -0,0 +1,34 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.Map; +import java.util.Stack; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext.OpContext; +import org.apache.hadoop.hive.ql.parse.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +public class OpProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + + OpProcContext opProcCtx = (OpProcContext)procCtx; + + @SuppressWarnings("unchecked") + Operator op = (Operator)nd; + + op.computePhysicalInfo(opProcCtx); + + Map, OpContext> opCtx = opProcCtx.getOpContextMap(); + + return null; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/OpProcContext.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/OpProcContext.java new file mode 100644 index 0000000..3ec2992 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/OpProcContext.java @@ -0,0 +1,61 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +public class OpProcContext implements NodeProcessorCtx { + private final ParseContext pCtx; + protected Map, OpContext> opContextMap; + + + public OpProcContext(ParseContext pCtx, + Map, OpContext> opContextMap) { + this.pCtx = pCtx; + this.opContextMap = opContextMap; + } + + public Map, OpContext> getOpContextMap () { + return opContextMap; + } + + public ParseContext getParseContext() { + return pCtx; + } + + public class OpContext { + long size; + List bucketedCols; + + public OpContext(long size, List bucketedCols) { + this.size = size; + this.bucketedCols = new ArrayList(bucketedCols); + } + + public OpContext() { + size = 0; + bucketedCols = new ArrayList(); + } + + public void setSize(long size) { + this.size += size; + } + + public void addBucketedCols (List bucketedCols) { + this.bucketedCols.addAll(bucketedCols); + } + + public long getSize() { + return this.size; + } + + public List getBucketedCols () { + return this.bucketedCols; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenOpGraphWalker.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenOpGraphWalker.java new file mode 100644 index 0000000..80f675f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenOpGraphWalker.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.List; + +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; + +/** + * Walks the operator tree in DFS fashion. + */ +public class GenOpGraphWalker extends DefaultGraphWalker { + + /** + * constructor of the walker - the dispatcher is passed. + * + * @param disp + * the dispatcher to be called for each node visited + */ + public GenOpGraphWalker(Dispatcher disp) { + super(disp); + } + + /** + * Walk the given operator. + * + * @param nd + * operator being walked + */ + @Override + public void walk(Node nd) throws SemanticException { + List children = nd.getChildren(); + + // maintain the stack of operators encountered + opStack.push(nd); + dispatchAndReturn(nd, opStack); + + // move all the children to the front of queue + for (Node ch : children) { + walk(ch); + } + + // done with this operator + opStack.pop(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index 827637a..4b2a764 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -46,7 +47,7 @@ */ public class GenTezProcContext implements NodeProcessorCtx{ - public final ParseContext parseContext; + public final OpProcContext opProcContext; public final HiveConf conf; public final List> moveTask; @@ -84,12 +85,12 @@ @SuppressWarnings("unchecked") - public GenTezProcContext(HiveConf conf, ParseContext parseContext, + public GenTezProcContext(HiveConf conf, OpProcContext opCtx, List> moveTask, List> rootTasks, Set inputs, Set outputs, Deque> rootOperators) { this.conf = conf; - this.parseContext = parseContext; + this.opProcContext = opCtx; this.moveTask = moveTask; this.rootTasks = rootTasks; this.inputs = inputs; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index ff8b17b..73666d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezWork; @@ -90,7 +91,9 @@ public Object process(Node nd, Stack stack, assert root instanceof TableScanOperator; String alias = ((TableScanOperator)root).getConf().getAlias(); - GenMapRedUtils.setMapWork(mapWork, context.parseContext, + ParseContext pCtx = context.opProcContext.getParseContext(); + + GenMapRedUtils.setMapWork(mapWork, pCtx, context.inputs, null, root, alias, context.conf, false); tezWork.add(mapWork); work = mapWork; @@ -112,12 +115,6 @@ public Object process(Node nd, Stack stack, // need to fill in information about the key and value in the reducer GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); - - // needs to be fixed in HIVE-5052. This should be driven off of stats - if (reduceWork.getNumReduceTasks() <= 0) { - reduceWork.setNumReduceTasks(1); - } - tezWork.add(reduceWork); tezWork.connect( context.preceedingWork, @@ -142,7 +139,7 @@ public Object process(Node nd, Stack stack, BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator); // need to add this branch to the key + value info - assert operator instanceof ReduceSinkOperator + assert operator instanceof ReduceSinkOperator && followingWork instanceof ReduceWork; ReduceSinkOperator rs = (ReduceSinkOperator) operator; ReduceWork rWork = (ReduceWork) followingWork; @@ -178,4 +175,18 @@ public Object process(Node nd, Stack stack, return null; } + /* + * find the number of reducers by going up the chain. + */ + private int getNumberOfReducers(ReduceSinkOperator reduceSink) { + while (reduceSink.getConf().getNumReducers() != -1) { + for (Operator op : reduceSink.getParentOperators()) { + if (op instanceof ReduceSinkOperator) { + + } + } + } + return -1; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index c1c1da5..0a83c2d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1; import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1; import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -323,11 +324,12 @@ public boolean accept(Path file) { @Override protected void optimizeTaskPlan(List> rootTasks, - ParseContext pCtx, Context ctx) throws SemanticException { + OpProcContext opCtx, Context ctx) throws SemanticException { // reduce sink does not have any kids - since the plan by now has been // broken up into multiple // tasks, iterate over all tasks. // For each task, go over all operators recursively + ParseContext pCtx = opCtx.getParseContext(); for (Task rootTask : rootTasks) { breakTaskTree(rootTask); } @@ -342,10 +344,11 @@ protected void optimizeTaskPlan(List> rootTasks, } @Override - protected void generateTaskTree(List> rootTasks, ParseContext pCtx, + protected void generateTaskTree(List> rootTasks, OpProcContext opCtx, List> mvTask, Set inputs, Set outputs) throws SemanticException { // generate map reduce plans + ParseContext pCtx = opCtx.getParseContext(); ParseContext tempParseContext = getParseContext(pCtx, rootTasks); GenMRProcContext procCtx = new GenMRProcContext( conf, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 248eb03..bbd856b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -20,8 +20,13 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -34,6 +39,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -41,10 +47,19 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.OpProcessor; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext.OpContext; import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; @@ -53,6 +68,7 @@ import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -93,7 +109,7 @@ public void compile(final ParseContext pCtx, final List> rootTa /* * Called at the beginning of the compile phase to have another chance to optimize the operator plan */ - protected void optimizeOperatorPlan(ParseContext pCtxSet, Set inputs, + protected OpProcContext optimizeOperatorPlan(ParseContext pCtx, Set inputs, Set outputs) throws SemanticException { + + List > topoList = topologicallySort(pCtx.getTopOps()); + + OpProcContext procCtx = new OpProcContext(pCtx, + new HashMap, OpContext>()); + Map opRules = new LinkedHashMap(); + Dispatcher disp = new DefaultRuleDispatcher(new OpProcessor(), opRules, procCtx); + ListtopNodes = new ArrayList(topoList); + GraphWalker ogw = new GenOpGraphWalker(disp); + ogw.startWalking(topNodes, null); + + return procCtx; + } + + public List> + getTopoSort(List> leaves) { + + List> result = + new LinkedList>(); + Set> seen = new HashSet>(); + + for (Operator leaf: leaves) { + // make sure all leaves are visited at least once + visit(leaf, seen, result); + } + + return result; + } + + private void visit(Operator child, + Set> seen, List> result) { + + if (seen.contains(child)) { + // don't visit multiple times + return; + } + + seen.add(child); + + for (Operator parent: child.getParentOperators()) { + if (!seen.contains(parent)) { + visit(parent, seen, result); + } + } + + result.add(child); + } + + + private List> + topologicallySort(Map> topOps) { + Set> hashSet = new HashSet>(); + List> sortList = new ArrayList>(); + + Iterator iter = topOps.keySet().iterator(); + while (iter.hasNext()) { + Operatorop = topOps.get(iter.next()); + while (op.getChildOperators() != null && (op.getChildOperators().size() != 0)) { + op = op.getChildOperators().get(0); + } + hashSet.add(op); + } + + sortList.addAll(hashSet); + + return getTopoSort(sortList); } /* * Called after the tasks have been generated to run another round of optimization */ protected abstract void optimizeTaskPlan(List> rootTasks, - ParseContext pCtx, Context ctx) throws SemanticException; + OpProcContext pCtx, Context ctx) throws SemanticException; /* * Called to set the appropriate input format for tasks @@ -357,7 +439,7 @@ protected abstract void optimizeTaskPlan(List> root /* * Called to generate the taks tree from the parse context/operator tree */ - protected abstract void generateTaskTree(List> rootTasks, ParseContext pCtx, + protected abstract void generateTaskTree(List> rootTasks, OpProcContext opCtx, List> mvTask, Set inputs, Set outputs) throws SemanticException; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 5abedfe..6b71e89 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.optimizer.physical.OpProcContext; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; @@ -62,18 +63,21 @@ public TezCompiler() { } @Override - protected void generateTaskTree(List> rootTasks, ParseContext pCtx, + protected void generateTaskTree(List> rootTasks, OpProcContext opCtx, List> mvTask, Set inputs, Set outputs) throws SemanticException { + ParseContext pCtx = opCtx.getParseContext(); + // generate map reduce plans ParseContext tempParseContext = getParseContext(pCtx, rootTasks); + OpProcContext opProcCtx = new OpProcContext(tempParseContext, opCtx.getOpContextMap()); Deque> deque = new LinkedList>(); deque.addAll(pCtx.getTopOps().values()); GenTezProcContext procCtx = new GenTezProcContext( - conf, tempParseContext, mvTask, rootTasks, inputs, outputs, deque); + conf, opProcCtx, mvTask, rootTasks, inputs, outputs, deque); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. @@ -190,7 +194,7 @@ protected void decideExecMode(List> rootTasks, Cont } @Override - protected void optimizeTaskPlan(List> rootTasks, ParseContext pCtx, + protected void optimizeTaskPlan(List> rootTasks, OpProcContext opCtx, Context ctx) throws SemanticException { // no additional optimization needed return; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 5fd8d828..0fbb530 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -271,7 +271,7 @@ public static TableDesc getDefaultQueryOutputTableDesc(String cols, String colTy tblDesc.getProperties().setProperty(serdeConstants.ESCAPE_CHAR, "\\"); //enable extended nesting levels tblDesc.getProperties().setProperty( - LazySimpleSerDe.SERIALIZATION_EXTEND_NESTING_LEVELS, "true"); + LazySimpleSerDe.SERIALIZATION_EXTEND_NESTING_LEVELS, "true"); return tblDesc; } @@ -830,7 +830,7 @@ private PlanUtils() { // Add the input 'newInput' to the set of inputs for the query. // The input may or may not be already present. - // The ReadEntity also contains the parents from it is derived (only populated + // The ReadEntity also contains the parents from which it is derived (only populated // in case of views). The equals method for ReadEntity does not compare the parents // so that the same input with different parents cannot be added twice. If the input // is already present, make sure the parents are added.