diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a724fd1..965837c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2195,7 +2195,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { new StringSet("throw", "skip", "ignore"), "The approach msck should take with HDFS " + "directories that are partition-like but contain unsupported characters. 'throw' (an " + "exception) is the default; 'skip' will skip the invalid directories and still repair the" + - " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"); + " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"), + HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", false, + "Enable memory manager for tez"); public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index bcffdbc..9276ff6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -69,6 +70,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName()); + private long totalMemorySize; @Override /* @@ -162,13 +164,13 @@ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 - mapJoinOp -.setOpTraits(new OpTraits(null, -1, null)); + mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator childOp : mapJoinOp.getChildOperators()) { setAllChildrenTraitsToNull(childOp); } + mapJoinOp.getConf().setMemoryNeeded(totalMemorySize); return null; } @@ -302,6 +304,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont // insert the dummy store operator here DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator(); + dummyStoreOp.setConf(new DummyStoreDesc()); dummyStoreOp.setParentOperators(new ArrayList>()); dummyStoreOp.setChildOperators(new ArrayList>()); dummyStoreOp.getChildOperators().add(mergeJoinOp); @@ -338,6 +341,7 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon } MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); + mapJoinOp.getConf().setMemoryNeeded(totalMemorySize); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); @@ -594,6 +598,9 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c } pos++; } + if (bigTablePosition != -1) { + this.totalMemorySize = (totalSize / buckets); + } return bigTablePosition; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java new file mode 100644 index 0000000..3529f42 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java @@ -0,0 +1,333 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.Stack; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.yarn.api.records.Resource; + +public class MemoryDecider implements PhysicalPlanResolver { + + public class MemoryCalculator implements Dispatcher { + + private final HiveConf conf; + private final Resource resourceAvailable; + private long inputOutputBufferLimit = 0; + private long parallelism = 0; + private final long onePercentMemory; + private long remainingMemory; + private long totalAvailableMemory; + private final PhysicalContext pctx; + private long incomingSize; + private long outgoingSize; + private static final long RESOURCE_LIMIT = 50; // buffer/operator resources limited to 50% + private static final long DEFAULT_RESOURCE_SIZE = 10; + private boolean justJoinComputation = false; + + public MemoryCalculator(PhysicalContext pctx) { + this.pctx = pctx; + this.conf = pctx.conf; + if (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)) { + justJoinComputation = true; + } + this.resourceAvailable = DagUtils.getContainerResource(conf); + this.totalAvailableMemory = resourceAvailable.getMemory() * 1024 * 1024; + this.onePercentMemory = totalAvailableMemory / 100; + } + + @SuppressWarnings("unchecked") + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + Task currTask = (Task) nd; + if (currTask instanceof StatsTask) { + currTask = ((StatsTask) currTask).getWork().getSourceTask(); + } + if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w : work.getAllWork()) { + evaluateWork(work, w); + } + } + return null; + } + + private void evaluateWork(TezWork work, BaseWork w) throws SemanticException { + + if (w instanceof MapWork) { + evaluateMapWork(work, (MapWork) w); + } else if (w instanceof ReduceWork) { + evaluateReduceWork(work, (ReduceWork) w); + } else if (w instanceof MergeJoinWork) { + evaluateMergeWork(work, (MergeJoinWork) w); + } else { + throw new SemanticException("Unknown work type: " + w); + } + } + + // this needs to account for the other work items embedded within the merge join work + private void evaluateMergeWork(TezWork work, MergeJoinWork w) throws SemanticException { + long totalAvailableMemory = this.totalAvailableMemory; + this.totalAvailableMemory = totalAvailableMemory / w.getBaseWorkList().size(); + long totalIncomingSize = getInputMemoryRequired(work, w); + long totalOutgoingSize = getOutputMemoryRequired(work, w); + for (BaseWork baseWork : w.getBaseWorkList()) { + computeMemoryForOperators(work, baseWork, totalIncomingSize, totalOutgoingSize); + } + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void evaluateReduceWork(TezWork work, ReduceWork w) throws SemanticException { + parallelism = w.getNumReduceTasks(); + if (parallelism <= 0) { + parallelism = 1; + } + long totalIncomingSize = getInputMemoryRequired(work, w); + + long totalOutgoingSize = getOutputMemoryRequired(work, w); + + computeMemoryForOperators(work, w, totalIncomingSize, totalOutgoingSize); + + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void evaluateMapWork(TezWork work, MapWork w) throws SemanticException { + long maxSplitSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); + parallelism = w.getMemoryNeeded() / maxSplitSize; + if (parallelism <= 0) { + parallelism = 1; + } + long totalIncomingSize = getInputMemoryRequired(work, w); + + long totalOutgoingSize = getOutputMemoryRequired(work, w); + + computeMemoryForOperators(work, w, totalIncomingSize, totalOutgoingSize); + + recomputeMemoryForInputOutput(work, w, incomingSize, outgoingSize); + } + + private void recomputeMemoryForInputOutput(TezWork work, BaseWork w, long totalIncomingSize, + long totalOutgoingSize) { + List parentWorkList = work.getParents(w); + for (BaseWork parentWork : parentWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(parentWork, w); + if (parallelism != 0) { + totalIncomingSize = parallelism * totalIncomingSize; + } + edgeProp.setInputMemoryNeededFraction(parentWork.getMemoryNeeded() / totalIncomingSize); + } + + List childWorkList = work.getChildren(w); + if (childWorkList.isEmpty()) { + return; + } + for (BaseWork childWork : childWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(w, childWork); + // one size fits all? + edgeProp.setOutputMemoryNeeded(outgoingSize / childWorkList.size()); + } + } + + private long getInputMemoryRequired(TezWork work, BaseWork w) { + if (justJoinComputation) { + return 0; + } + long totalIncomingSize = 0L; + List parentWorkList = work.getParents(w); + if (parentWorkList.isEmpty()) { + totalIncomingSize = onePercentMemory * DEFAULT_RESOURCE_SIZE; + return totalIncomingSize; + } + for (BaseWork parentWork : parentWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(parentWork, w); + if (edgeProp.getEdgeType() == EdgeType.SIMPLE_EDGE) { + // we need buffer space for incoming shuffled data + totalIncomingSize += edgeProp.getEstimatedTransferBytes() / parallelism; + } else { + totalIncomingSize += onePercentMemory; + } + } + return totalIncomingSize; + } + + private long getOutputMemoryRequired(TezWork work, BaseWork w) { + if (justJoinComputation) { + return 0; + } + // 10% per input? + long totalOutgoingSize = onePercentMemory * DEFAULT_RESOURCE_SIZE; // default 10% + List childWorkList = work.getChildren(w); + for (BaseWork childWork : childWorkList) { + TezEdgeProperty edgeProp = work.getEdgeProperty(w, childWork); + EdgeType edgeType = edgeProp.getEdgeType(); + // we need estimate for only one outgoing edge because tez does replication of output. + // this breaks down if there is a case of different type of edges downstream - say + // sorted and unsorted edges. If we look at only the unsorted type of edge, we may end up + // underestimating the amount of memory required. + if (edgeType == EdgeType.SIMPLE_EDGE) { + totalOutgoingSize = edgeProp.getEstimatedTransferBytes(); + break; + } + } + + return totalOutgoingSize; + } + + private void computeMemoryForOperators(TezWork work, BaseWork w, long totalIncomingSize, + long totalOutgoingSize) throws SemanticException { + + // if the buffers require > 50% of memory, lets now limit the percent to 50% of memory. + // if the operator pipeline has excess, this value can grow + boolean capped = false; + incomingSize = totalIncomingSize; + outgoingSize = totalOutgoingSize; + if ((totalIncomingSize + totalOutgoingSize) > (totalAvailableMemory / 2)) { + // capped to 50% till we see the operator pipeline + capped = true; + this.inputOutputBufferLimit = onePercentMemory * RESOURCE_LIMIT; + incomingSize = + totalIncomingSize * inputOutputBufferLimit / (totalIncomingSize + totalOutgoingSize); + outgoingSize = inputOutputBufferLimit - incomingSize; + } else { + // it needs amount of memory less than or equal to 50% + this.inputOutputBufferLimit = totalIncomingSize + totalOutgoingSize; + } + + this.remainingMemory = totalAvailableMemory - (this.inputOutputBufferLimit); + // fill in the operator memory requirements and find total + evaluateOperators(w, pctx); + if ((remainingMemory > 0) && (capped)) { + // operator tree had excess memory. We can use the excess for the inputs/outputs + long incomingIncrease = + totalIncomingSize * remainingMemory / (totalIncomingSize + totalOutgoingSize); + incomingSize += incomingIncrease; + outgoingSize += (remainingMemory - incomingIncrease); + } + } + + private long evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException { + // lets take a look at the operator memory requirements. + Dispatcher disp = null; + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)) { + Map rules = new HashMap(); + rules.put(new RuleRegExp("Map join memory estimator", + MapJoinOperator.getOperatorName() + "%"), new DefaultRule()); + disp = new DefaultRuleDispatcher(null, rules, null); + } else { + disp = + new DefaultRuleDispatcher(new DefaultRule(), new HashMap(), + null); + } + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(w.getAllRootOperators()); + + LinkedHashMap nodeOutput = new LinkedHashMap(); + ogw.startWalking(topNodes, nodeOutput); + + return computeMemoryRequirements(nodeOutput.keySet()); + } + + private long computeMemoryRequirements(Set keySet) { + long retval = 0; + List> opList = new ArrayList>(); + long minMemory = + (keySet.size() > 100) ? totalAvailableMemory / keySet.size() : onePercentMemory; + long totalMemoryNeeded = 0; + for (Node nd : keySet) { + Operator op = (Operator) nd; + long memoryNeeded = 0; + + memoryNeeded = op.getConf().getMemoryNeeded(); + if (memoryNeeded == 0) { + memoryNeeded = minMemory; + } else { + memoryNeeded += minMemory; + opList.add(op); + } + + totalMemoryNeeded += memoryNeeded; + op.getConf().setMemoryNeeded(memoryNeeded); + } + + if (totalMemoryNeeded > remainingMemory) { + long minMemoryRequired = keySet.size() * minMemory; + remainingMemory -= minMemoryRequired; + totalMemoryNeeded -= minMemoryRequired; + for (Operator op : opList) { + long memNeeded = (op.getConf().getMemoryNeeded() * remainingMemory) / totalMemoryNeeded; + op.getConf().setMemoryNeeded(memNeeded); + retval += memNeeded; + } + remainingMemory -= retval; + retval += minMemoryRequired; + if (remainingMemory < 0) { + throw new IllegalStateException("Remaining memory cannot be negative"); + } + } else { + retval = totalMemoryNeeded; + remainingMemory -= totalMemoryNeeded; + } + return retval; + } + + public class DefaultRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return null; + } + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + + pctx.getConf(); + + // create dispatcher and graph walker + Dispatcher disp = new MemoryCalculator(pctx); + TaskGraphWalker ogw = new TaskGraphWalker(disp); + + // get all the tasks nodes from root task + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + + // begin to walk through the task tree. + ogw.startWalking(topNodes, null); + return pctx; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 0edfc5d..a62b4e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -147,9 +147,12 @@ public ReduceWork createReduceWork(GenTezProcContext context, Operator root, if (reduceWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer, + reduceSink.getConf().getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = + new TezEdgeProperty(EdgeType.SIMPLE_EDGE, reduceSink.getConf().getStatistics() + .getDataSize()); } tezWork.connect( diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 6db8220..1b243be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -135,6 +135,10 @@ public Object process(Node nd, Stack stack, context.rootToWorkMap.put(root, work); } + if (operator.getStatistics() != null) { + work.setMemoryNeeded(operator.getStatistics().getDataSize()); + } + // this is where we set the sort columns that we will be using for KeyValueInputMerge if (operator instanceof DummyStoreOperator) { work.addSortCols(root.getOpTraits().getSortCols().get(0)); @@ -379,9 +383,12 @@ public Object process(Node nd, Stack stack, if (rWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true, - rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer, rs + .getConf().getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE); + edgeProp = + new TezEdgeProperty(EdgeType.SIMPLE_EDGE, rs.getConf().getStatistics() + .getDataSize()); } tezWork.connect(work, followingWork, edgeProp); context.connectedReduceSinks.add(rs); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 56707af..7d25774 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; +import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -472,6 +473,11 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } else { LOG.debug("Skipping stage id rearranger"); } + + if ((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) + || (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { + physicalCtx = new MemoryDecider().resolve(physicalCtx); + } return; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index bc34710..544854a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -30,6 +30,8 @@ protected Statistics statistics; protected transient OpTraits opTraits; protected transient Map opProps; + protected long memNeeded = 0; + protected transient long memoryInUse = 0; static { PTFUtils.makeTransient(AbstractOperatorDesc.class, "opProps"); @@ -59,14 +61,17 @@ public void setVectorMode(boolean vm) { this.vectorMode = vm; } + @Override public OpTraits getTraits() { return opTraits; } + @Override public void setTraits(OpTraits opTraits) { this.opTraits = opTraits; } + @Override public Map getOpProps() { return opProps; } @@ -74,4 +79,14 @@ public void setTraits(OpTraits opTraits) { public void setOpProps(Map props) { this.opProps = props; } + + @Override + public long getMemoryNeeded() { + return memNeeded; + } + + @Override + public void setMemoryNeeded(long memNeeded) { + this.memNeeded = memNeeded; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java index fb4d3b4..16be499 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java @@ -28,4 +28,6 @@ public OpTraits getTraits(); public void setTraits(OpTraits opTraits); public Map getOpProps(); + public long getMemoryNeeded(); + public void setMemoryNeeded(long memoryNeeded); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index a3aa12f..3546cad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -30,33 +30,46 @@ CUSTOM_SIMPLE_EDGE, } - private HiveConf hiveConf; - private EdgeType edgeType; - private int numBuckets; + private static final long ONE_MB = 1024 * 1024; + + private final HiveConf hiveConf; + private final EdgeType edgeType; + private final int numBuckets; private boolean isAutoReduce; private int minReducer; private int maxReducer; private long inputSizePerReducer; + private final long estimatedTransferBytes; + private float inputMemoryNeededFraction; + private long outputMemoryNeeded; - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, - int buckets) { + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets, + long estimatedTransferBytes) { this.hiveConf = hiveConf; this.edgeType = edgeType; this.numBuckets = buckets; + this.estimatedTransferBytes = estimatedTransferBytes; + this.setInputMemoryNeededFraction(0.0f); + this.setOutputMemoryNeeded(0); } public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, - int minReducer, int maxReducer, long bytesPerReducer) { - this(hiveConf, edgeType, -1); + int minReducer, int maxReducer, long bytesPerReducer, long estimatedTransferBytes) { + this(hiveConf, edgeType, -1, estimatedTransferBytes); this.minReducer = minReducer; this.maxReducer = maxReducer; this.isAutoReduce = isAutoReduce; this.inputSizePerReducer = bytesPerReducer; } + public TezEdgeProperty(EdgeType edgeType, long estimatedTransferBytes) { + this(null, edgeType, -1, estimatedTransferBytes); + } + + // called by test code only. public TezEdgeProperty(EdgeType edgeType) { - this(null, edgeType, -1); + this(edgeType, 0); } public EdgeType getEdgeType() { @@ -86,4 +99,36 @@ public int getMaxReducer() { public long getInputSizePerReducer() { return inputSizePerReducer; } + + public long getEstimatedTransferBytes() { + return estimatedTransferBytes; + } + + public void setInputMemoryNeededFraction(float inputMemoryNeededFraction) { + if (inputMemoryNeededFraction < 0 || inputMemoryNeededFraction > 1) { + this.inputMemoryNeededFraction = 1.0f; + return; + } + this.inputMemoryNeededFraction = inputMemoryNeededFraction; + } + + public float getInputMemoryNeededFraction() { + return inputMemoryNeededFraction; + } + + public long getOutputMemoryNeeded() { + return outputMemoryNeeded; + } + + public long getOutputMemoryNeededMB() { + if (outputMemoryNeeded < ONE_MB) { + return 1; + } else { + return outputMemoryNeeded / ONE_MB; + } + } + + public void setOutputMemoryNeeded(long outputMemoryNeeded) { + this.outputMemoryNeeded = outputMemoryNeeded; + } }