diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 27f68df..ca2fee5 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2195,7 +2195,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { new StringSet("throw", "skip", "ignore"), "The approach msck should take with HDFS " + "directories that are partition-like but contain unsupported characters. 'throw' (an " + "exception) is the default; 'skip' will skip the invalid directories and still repair the" + - " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"); + " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"), + HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", false, + "Enable memory manager for tez"); public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index c688092..dbd1f1d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -81,17 +81,17 @@ public void load(MapJoinTableContainer[] mapJoinTables, hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE); boolean useHybridGraceHashJoin = desc.isHybridHashJoin(); boolean isFirstKey = true; - // TODO remove this after memory manager is in - long noConditionalTaskThreshold = HiveConf.getLongVar( - hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + + // Get the total available memory from memory manager + long totalMapJoinMemory = desc.getMemoryNeeded(); long processMaxMemory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); - if (noConditionalTaskThreshold > processMaxMemory) { + if (totalMapJoinMemory > processMaxMemory) { float hashtableMemoryUsage = HiveConf.getFloatVar( hconf, HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE); - LOG.warn("noConditionalTaskThreshold value of " + noConditionalTaskThreshold + + LOG.warn("totalMapJoinMemory value of " + totalMapJoinMemory + " is greater than the max memory size of " + processMaxMemory); // Don't want to attempt to grab more memory than we have available .. percentage is a bit arbitrary - noConditionalTaskThreshold = (long) (processMaxMemory * hashtableMemoryUsage); + totalMapJoinMemory = (long) (processMaxMemory * hashtableMemoryUsage); } // Only applicable to n-way Hybrid Grace Hash Join @@ -118,7 +118,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, } tableMemorySizes = divideHybridHashTableMemory(mapJoinTables, desc, - totalSize, noConditionalTaskThreshold); + totalSize, totalMapJoinMemory); // Using biggest small table, calculate number of partitions to create for each small table long memory = tableMemorySizes.get(biggest); int numPartitions = 0; @@ -176,7 +176,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, if (mapJoinTables.length > 2) { memory = tableMemorySizes.get(pos); } else { // binary join - memory = noConditionalTaskThreshold; + memory = totalMapJoinMemory; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 5a87bd6..6ff932b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -70,6 +71,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName()); + private long totalMemorySize; @Override /* @@ -163,13 +165,13 @@ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 - mapJoinOp -.setOpTraits(new OpTraits(null, -1, null)); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator childOp : mapJoinOp.getChildOperators()) { setAllChildrenTraitsToNull(childOp); } + mapJoinOp.getConf().setMemoryNeeded(totalMemorySize); return null; } @@ -303,6 +305,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont // insert the dummy store operator here DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator(); + dummyStoreOp.setConf(new DummyStoreDesc()); dummyStoreOp.setParentOperators(new ArrayList>()); dummyStoreOp.setChildOperators(new ArrayList>()); dummyStoreOp.getChildOperators().add(mergeJoinOp); @@ -339,6 +342,7 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon } MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); + mapJoinOp.getConf().setMemoryNeeded(totalMemorySize); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); @@ -615,6 +619,9 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c } pos++; } + if (bigTablePosition != -1) { + this.totalMemorySize = (totalSize / buckets); + } return bigTablePosition; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index bca91dd..c0f45eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -219,7 +219,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } } } - TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets); + TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets, tableSize); if (mapJoinWork != null) { for (BaseWork myWork: mapJoinWork) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java new file mode 100644 index 0000000..90298c6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java @@ -0,0 +1,335 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.Stack; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.yarn.api.records.Resource; + +public class MemoryDecider implements PhysicalPlanResolver { + + public class MemoryCalculator implements Dispatcher { + + private final HiveConf conf; + private final Resource resourceAvailable; + private long inputOutputBufferLimit = 0; + private long parallelism = 0; + private final long onePercentMemory; + private long remainingMemory; + private long totalAvailableMemory; + private final PhysicalContext pctx; + private long incomingSize; + private long outgoingSize; + private static final long RESOURCE_LIMIT = 50; // buffer/operator resources limited to 50% + private static final long DEFAULT_RESOURCE_SIZE = 10; + private boolean justJoinComputation = false; + + public MemoryCalculator(PhysicalContext pctx) { + this.pctx = pctx; + this.conf = pctx.conf; + if (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)) { + justJoinComputation = true; + } + this.resourceAvailable = DagUtils.getContainerResource(conf); + this.totalAvailableMemory = resourceAvailable.getMemory() * 1024 * 1024; + this.onePercentMemory = totalAvailableMemory / 100; + } + + @SuppressWarnings("unchecked") + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + Task currTask = (Task) nd; + if (currTask instanceof StatsTask) { + currTask = ((StatsTask) currTask).getWork().getSourceTask(); + } + if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w : work.getAllWork()) { + evaluateWork(work, w); + } + } + return null; + } + + private void evaluateWork(TezWork work, BaseWork w) throws SemanticException { + + if (w instanceof MapWork) { + evaluateMapWork(work, (MapWork) w); + } else if (w instanceof ReduceWork) { + evaluateReduceWork(work, (ReduceWork) w); + } else if (w instanceof MergeJoinWork) { + evaluateMergeWork(work, (MergeJoinWork) w); + } else { + throw new SemanticException("Unknown work type: " + w); + } + } + + // this needs to account for the other work items embedded within the merge join work + private void evaluateMergeWork(TezWork work, MergeJoinWork w) throws SemanticException { + long totalAvailableMemory = this.totalAvailableMemory; + // TODO temporary fix + if (w.getBaseWorkList().size() == 0) { + this.totalAvailableMemory = totalAvailableMemory; + } else { + this.totalAvailableMemory = totalAvailableMemory / w.getBaseWorkList().size(); + } + long totalIncomingSize = getInputMemoryRequired(work, w); + long totalOutgoingSize = getOutputMemoryRequired(work, w); + for (BaseWork baseWork : w.getBaseWorkList()) { + computeMemoryForOperators(work, baseWork, totalIncomingSize, totalOutgoingSize); + } + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void evaluateReduceWork(TezWork work, ReduceWork w) throws SemanticException { + parallelism = w.getNumReduceTasks(); + if (parallelism <= 0) { + parallelism = 1; + } + long totalIncomingSize = getInputMemoryRequired(work, w); + + long totalOutgoingSize = getOutputMemoryRequired(work, w); + + computeMemoryForOperators(work, w, totalIncomingSize, totalOutgoingSize); + + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void evaluateMapWork(TezWork work, MapWork w) throws SemanticException { + long maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); + parallelism = w.getMemoryNeeded() / maxSplitSize; + if (parallelism <= 0) { + parallelism = 1; + } + long totalIncomingSize = getInputMemoryRequired(work, w); + + long totalOutgoingSize = getOutputMemoryRequired(work, w); + + computeMemoryForOperators(work, w, totalIncomingSize, totalOutgoingSize); + + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void recomputeMemoryForInputOutput(TezWork work, BaseWork w, long totalIncomingSize, + long totalOutgoingSize) { + List parentWorkList = work.getParents(w); + for (BaseWork parentWork : parentWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(parentWork, w); + if (parallelism != 0) { + totalIncomingSize = parallelism * totalIncomingSize; + } + } + + List childWorkList = work.getChildren(w); + if (childWorkList.isEmpty()) { + return; + } + for (BaseWork childWork : childWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(w, childWork); + } + } + + private long getInputMemoryRequired(TezWork work, BaseWork w) { + if (justJoinComputation) { + return 0; + } + long totalIncomingSize = 0L; + List parentWorkList = work.getParents(w); + if (parentWorkList.isEmpty()) { + totalIncomingSize = onePercentMemory * DEFAULT_RESOURCE_SIZE; + return totalIncomingSize; + } + for (BaseWork parentWork : parentWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(parentWork, w); + if (edgeProp.getEdgeType() == EdgeType.SIMPLE_EDGE) { + // we need buffer space for incoming shuffled data + totalIncomingSize += edgeProp.getEstimatedTransferBytes() / parallelism; + } else { + totalIncomingSize += onePercentMemory; + } + } + return totalIncomingSize; + } + + private long getOutputMemoryRequired(TezWork work, BaseWork w) { + if (justJoinComputation) { + return 0; + } + // 10% per input? + long totalOutgoingSize = onePercentMemory * DEFAULT_RESOURCE_SIZE; // default 10% + List childWorkList = work.getChildren(w); + for (BaseWork childWork : childWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(w, childWork); + EdgeType edgeType = edgeProp.getEdgeType(); + // we need estimate for only one outgoing edge because tez does replication of output. + // this breaks down if there is a case of different type of edges downstream - say + // sorted and unsorted edges. If we look at only the unsorted type of edge, we may end up + // underestimating the amount of memory required. + if (edgeType == EdgeType.SIMPLE_EDGE) { + totalOutgoingSize = edgeProp.getEstimatedTransferBytes(); + break; + } + } + + return totalOutgoingSize; + } + + private void computeMemoryForOperators(TezWork work, BaseWork w, long totalIncomingSize, + long totalOutgoingSize) throws SemanticException { + + // if the buffers require > 50% of memory, lets now limit the percent to 50% of memory. + // if the operator pipeline has excess, this value can grow + boolean capped = false; + incomingSize = totalIncomingSize; + outgoingSize = totalOutgoingSize; + if ((totalIncomingSize + totalOutgoingSize) > (totalAvailableMemory / 2)) { + // capped to 50% till we see the operator pipeline + capped = true; + this.inputOutputBufferLimit = onePercentMemory * RESOURCE_LIMIT; + incomingSize = + totalIncomingSize * inputOutputBufferLimit / (totalIncomingSize + totalOutgoingSize); + outgoingSize = inputOutputBufferLimit - incomingSize; + } else { + // it needs amount of memory less than or equal to 50% + this.inputOutputBufferLimit = totalIncomingSize + totalOutgoingSize; + } + + this.remainingMemory = totalAvailableMemory - (this.inputOutputBufferLimit); + // fill in the operator memory requirements and find total + evaluateOperators(w, pctx); + if ((remainingMemory > 0) && (capped)) { + // operator tree had excess memory. We can use the excess for the inputs/outputs + long incomingIncrease = + totalIncomingSize * remainingMemory / (totalIncomingSize + totalOutgoingSize); + incomingSize += incomingIncrease; + outgoingSize += (remainingMemory - incomingIncrease); + } + } + + private long evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException { + // lets take a look at the operator memory requirements. + Dispatcher disp = null; + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)) { + Map rules = new HashMap(); + rules.put(new RuleRegExp("Map join memory estimator", + MapJoinOperator.getOperatorName() + "%"), new DefaultRule()); + disp = new DefaultRuleDispatcher(null, rules, null); + } else { + disp = + new DefaultRuleDispatcher(new DefaultRule(), new HashMap(), + null); + } + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(w.getAllRootOperators()); + + LinkedHashMap nodeOutput = new LinkedHashMap(); + ogw.startWalking(topNodes, nodeOutput); + + return computeMemoryRequirements(nodeOutput.keySet()); + } + + private long computeMemoryRequirements(Set keySet) { + long retval = 0; + List> opList = new ArrayList>(); + long minMemory = + (keySet.size() > 100) ? totalAvailableMemory / keySet.size() : onePercentMemory; + long totalMemoryNeeded = 0; + for (Node nd : keySet) { + Operator op = (Operator) nd; + long memoryNeeded = 0; + + memoryNeeded = op.getConf().getMemoryNeeded(); + if (memoryNeeded == 0) { + memoryNeeded = minMemory; + } else { + memoryNeeded += minMemory; + opList.add(op); + } + + totalMemoryNeeded += memoryNeeded; + op.getConf().setMemoryNeeded(memoryNeeded); + } + + if (totalMemoryNeeded > remainingMemory) { + long minMemoryRequired = keySet.size() * minMemory; + remainingMemory -= minMemoryRequired; + totalMemoryNeeded -= minMemoryRequired; + for (Operator op : opList) { + long memNeeded = (op.getConf().getMemoryNeeded() * remainingMemory) / totalMemoryNeeded; + op.getConf().setMemoryNeeded(memNeeded); + retval += memNeeded; + } + remainingMemory -= retval; + retval += minMemoryRequired; + if (remainingMemory < 0) { + throw new IllegalStateException("Remaining memory cannot be negative"); + } + } else { + retval = totalMemoryNeeded; + remainingMemory -= totalMemoryNeeded; + } + return retval; + } + + public class DefaultRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return null; + } + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + + pctx.getConf(); + + // create dispatcher and graph walker + Dispatcher disp = new MemoryCalculator(pctx); + TaskGraphWalker ogw = new TaskGraphWalker(disp); + + // get all the tasks nodes from root task + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + + // begin to walk through the task tree. + ogw.startWalking(topNodes, null); + return pctx; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 11c1df6..61c7985 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -145,12 +145,13 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator root, tezWork.add(reduceWork); TezEdgeProperty edgeProp; + long dataSize = reduceSink.getConf().getStatistics() == null ? 0 : + reduceSink.getConf().getStatistics().getDataSize(); if (reduceWork.isAutoReduceParallelism()) { - edgeProp = - new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + edgeProp = new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, + reduceWork.getMinReduceTasks(), bytesPerReducer, dataSize); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE, dataSize); } tezWork.connect( diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 6db8220..12dfa8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -135,6 +135,10 @@ public Object process(Node nd, Stack stack, context.rootToWorkMap.put(root, work); } + if (operator.getStatistics() != null) { + work.setMemoryNeeded(operator.getStatistics().getDataSize()); + } + // this is where we set the sort columns that we will be using for KeyValueInputMerge if (operator instanceof DummyStoreOperator) { work.addSortCols(root.getOpTraits().getSortCols().get(0)); @@ -379,16 +383,19 @@ public Object process(Node nd, Stack stack, if (rWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + rWork.getMinReduceTasks(), bytesPerReducer, rs + .getConf().getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = + new TezEdgeProperty(EdgeType.SIMPLE_EDGE, rs.getConf().getStatistics() + .getDataSize()); } tezWork.connect(work, followingWork, edgeProp); context.connectedReduceSinks.add(rs); } } } else { - LOG.debug("First pass. Leaf operator: "+operator); + LOG.debug("First pass. Leaf operator: " + operator); } // No children means we're at the bottom. If there are more operators to scan diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 56707af..7d25774 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; +import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -472,6 +473,11 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } else { LOG.debug("Skipping stage id rearranger"); } + + if ((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) + || (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { + physicalCtx = new MemoryDecider().resolve(physicalCtx); + } return; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index bc34710..463da5d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -30,6 +30,7 @@ protected Statistics statistics; protected transient OpTraits opTraits; protected transient Map opProps; + protected long memNeeded = 0; static { PTFUtils.makeTransient(AbstractOperatorDesc.class, "opProps"); @@ -59,14 +60,17 @@ public void setVectorMode(boolean vm) { this.vectorMode = vm; } + @Override public OpTraits getTraits() { return opTraits; } + @Override public void setTraits(OpTraits opTraits) { this.opTraits = opTraits; } + @Override public Map getOpProps() { return opProps; } @@ -74,4 +78,14 @@ public void setTraits(OpTraits opTraits) { public void setOpProps(Map props) { this.opProps = props; } + + @Override + public long getMemoryNeeded() { + return memNeeded; + } + + @Override + public void setMemoryNeeded(long memNeeded) { + this.memNeeded = memNeeded; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java index fb4d3b4..16be499 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java @@ -28,4 +28,6 @@ public OpTraits getTraits(); public void setTraits(OpTraits opTraits); public Map getOpProps(); + public long getMemoryNeeded(); + public void setMemoryNeeded(long memoryNeeded); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index a3aa12f..62caa52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -30,33 +30,38 @@ CUSTOM_SIMPLE_EDGE, } - private HiveConf hiveConf; - private EdgeType edgeType; - private int numBuckets; + private final HiveConf hiveConf; + private final EdgeType edgeType; + private final int numBuckets; private boolean isAutoReduce; private int minReducer; - private int maxReducer; private long inputSizePerReducer; + private final long estimatedTransferBytes; - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, - int buckets) { + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets, + long estimatedTransferBytes) { this.hiveConf = hiveConf; this.edgeType = edgeType; this.numBuckets = buckets; + this.estimatedTransferBytes = estimatedTransferBytes; } public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, - int minReducer, int maxReducer, long bytesPerReducer) { - this(hiveConf, edgeType, -1); + int minReducer, long bytesPerReducer, long estimatedTransferBytes) { + this(hiveConf, edgeType, -1, estimatedTransferBytes); this.minReducer = minReducer; - this.maxReducer = maxReducer; this.isAutoReduce = isAutoReduce; this.inputSizePerReducer = bytesPerReducer; } + public TezEdgeProperty(EdgeType edgeType, long estimatedTransferBytes) { + this(null, edgeType, -1, estimatedTransferBytes); + } + + // called by test code only. public TezEdgeProperty(EdgeType edgeType) { - this(null, edgeType, -1); + this(edgeType, 0); } public EdgeType getEdgeType() { @@ -79,11 +84,11 @@ public int getMinReducer() { return minReducer; } - public int getMaxReducer() { - return maxReducer; - } - public long getInputSizePerReducer() { return inputSizePerReducer; } + + public long getEstimatedTransferBytes() { + return estimatedTransferBytes; + } } diff --git ql/src/test/results/clientpositive/tez/tez_join.q.out ql/src/test/results/clientpositive/tez/tez_join.q.out index 7b22996..d1cde57 100644 --- ql/src/test/results/clientpositive/tez/tez_join.q.out +++ ql/src/test/results/clientpositive/tez/tez_join.q.out @@ -89,6 +89,7 @@ STAGE PLANS: expressions: KEY.reducesinkkey0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Dummy Store Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string)