diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e86f779..2b7a36d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2216,7 +2216,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { new StringSet("throw", "skip", "ignore"), "The approach msck should take with HDFS " + "directories that are partition-like but contain unsupported characters. 'throw' (an " + "exception) is the default; 'skip' will skip the invalid directories and still repair the" + - " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"); + " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"), + HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", true, + "Enable memory manager for tez"); public final String varname; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index c688092..6c8d688 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -81,17 +81,22 @@ public void load(MapJoinTableContainer[] mapJoinTables, hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE); boolean useHybridGraceHashJoin = desc.isHybridHashJoin(); boolean isFirstKey = true; - // TODO remove this after memory manager is in - long noConditionalTaskThreshold = HiveConf.getLongVar( + + // Get the total available memory from memory manager + long totalMapJoinMemory = desc.getMemoryNeeded(); + if (totalMapJoinMemory <= 0) { + totalMapJoinMemory = HiveConf.getLongVar( hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + } + long processMaxMemory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); - if (noConditionalTaskThreshold > processMaxMemory) { + if (totalMapJoinMemory > processMaxMemory) { float hashtableMemoryUsage = HiveConf.getFloatVar( hconf, HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE); - LOG.warn("noConditionalTaskThreshold value of " + noConditionalTaskThreshold + + LOG.warn("totalMapJoinMemory value of " + totalMapJoinMemory + " is greater than the max memory size of " + processMaxMemory); // Don't want to attempt to grab more memory than we have available .. percentage is a bit arbitrary - noConditionalTaskThreshold = (long) (processMaxMemory * hashtableMemoryUsage); + totalMapJoinMemory = (long) (processMaxMemory * hashtableMemoryUsage); } // Only applicable to n-way Hybrid Grace Hash Join @@ -118,7 +123,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, } tableMemorySizes = divideHybridHashTableMemory(mapJoinTables, desc, - totalSize, noConditionalTaskThreshold); + totalSize, totalMapJoinMemory); // Using biggest small table, calculate number of partitions to create for each small table long memory = tableMemorySizes.get(biggest); int numPartitions = 0; @@ -176,7 +181,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, if (mapJoinTables.length > 2) { memory = tableMemorySizes.get(pos); } else { // binary join - memory = noConditionalTaskThreshold; + memory = totalMapJoinMemory; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java new file mode 100644 index 0000000..5d74636 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java @@ -0,0 +1,208 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.hive.ql.plan.TezWork; + +public class MemoryDecider implements PhysicalPlanResolver { + + protected static transient final Log LOG = LogFactory.getLog(MemoryDecider.class); + + public class MemoryCalculator implements Dispatcher { + + private long totalAvailableMemory; + private final PhysicalContext pctx; + + public MemoryCalculator(PhysicalContext pctx) { + this.pctx = pctx; + this.totalAvailableMemory = HiveConf.getLongVar( + pctx.conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + } + + @SuppressWarnings("unchecked") + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + Task currTask = (Task) nd; + if (currTask instanceof StatsTask) { + currTask = ((StatsTask) currTask).getWork().getSourceTask(); + } + if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w : work.getAllWork()) { + evaluateWork(w); + } + } + return null; + } + + private void evaluateWork(BaseWork w) throws SemanticException { + + if (w instanceof MapWork) { + evaluateMapWork((MapWork) w); + } else if (w instanceof ReduceWork) { + evaluateReduceWork((ReduceWork) w); + } else if (w instanceof MergeJoinWork) { + evaluateMergeWork((MergeJoinWork) w); + } else { + throw new SemanticException("Unknown work type: " + w); + } + } + + private void evaluateMergeWork(MergeJoinWork w) throws SemanticException { + for (BaseWork baseWork : w.getBaseWorkList()) { + evaluateOperators(baseWork, pctx); + } + } + + private void evaluateReduceWork(ReduceWork w) throws SemanticException { + evaluateOperators(w, pctx); + } + + private void evaluateMapWork(MapWork w) throws SemanticException { + evaluateOperators(w, pctx); + } + + private void evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException { + // lets take a look at the operator memory requirements. + Dispatcher disp = null; + final Set mapJoins = new HashSet(); + + Map rules = new HashMap(); + rules.put(new RuleRegExp("Map join memory estimator", + MapJoinOperator.getOperatorName() + "%"), new NodeProcessor() { + public Object process (Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) { + mapJoins.add((MapJoinOperator)nd); + return null; + } + }); + disp = new DefaultRuleDispatcher(null, rules, null); + + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(w.getAllRootOperators()); + + LinkedHashMap nodeOutput = new LinkedHashMap(); + ogw.startWalking(topNodes, nodeOutput); + + if (mapJoins.size() == 0) { + return; + } + + try { + Map sizes = new HashMap(); + long total = 0; + + for (MapJoinOperator mj : mapJoins) { + long size = computeInputSize(mj); + sizes.put(mj, size); + total += size; + } + + double weight = totalAvailableMemory/(double)total; + + for (MapJoinOperator mj : mapJoins) { + long size = (long)(weight * sizes.get(mj)); + if (LOG.isInfoEnabled()) { + LOG.info("Setting " + size + " bytes needed for " + mj); + } + + mj.getConf().setMemoryNeeded(size); + } + } catch (HiveException e) { + // if we have issues with stats, just scale linearily + long size = totalAvailableMemory / mapJoins.size(); + if (LOG.isInfoEnabled()) { + LOG.info("Scaling mapjoin memory w/o stats"); + } + + for (MapJoinOperator mj : mapJoins) { + if (LOG.isInfoEnabled()) { + LOG.info("Setting " + size + " bytes needed for " + mj); + } + mj.getConf().setMemoryNeeded(size); + } + } + } + + private long computeInputSize(MapJoinOperator mj) throws HiveException { + long size = 0; + + if (mj.getConf() != null && mj.getConf().getParentDataSizes() != null) { + for (long l: mj.getConf().getParentDataSizes().values()) { + size += l; + } + } + + if (size == 0) { + throw new HiveException("No data sizes"); + } + return size; + } + + public class DefaultRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return null; + } + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + + pctx.getConf(); + + // create dispatcher and graph walker + Dispatcher disp = new MemoryCalculator(pctx); + TaskGraphWalker ogw = new TaskGraphWalker(disp); + + // get all the tasks nodes from root task + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + + // begin to walk through the task tree. + ogw.startWalking(topNodes, null); + return pctx; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 03d41f2..8ab7cd4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; +import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -475,6 +476,11 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } else { LOG.debug("Skipping stage id rearranger"); } + + if ((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) + && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { + physicalCtx = new MemoryDecider().resolve(physicalCtx); + } return; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index bc34710..463da5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -30,6 +30,7 @@ protected Statistics statistics; protected transient OpTraits opTraits; protected transient Map opProps; + protected long memNeeded = 0; static { PTFUtils.makeTransient(AbstractOperatorDesc.class, "opProps"); @@ -59,14 +60,17 @@ public void setVectorMode(boolean vm) { this.vectorMode = vm; } + @Override public OpTraits getTraits() { return opTraits; } + @Override public void setTraits(OpTraits opTraits) { this.opTraits = opTraits; } + @Override public Map getOpProps() { return opProps; } @@ -74,4 +78,14 @@ public void setTraits(OpTraits opTraits) { public void setOpProps(Map props) { this.opProps = props; } + + @Override + public long getMemoryNeeded() { + return memNeeded; + } + + @Override + public void setMemoryNeeded(long memNeeded) { + this.memNeeded = memNeeded; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java index fb4d3b4..16be499 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java @@ -28,4 +28,6 @@ public OpTraits getTraits(); public void setTraits(OpTraits opTraits); public Map getOpProps(); + public long getMemoryNeeded(); + public void setMemoryNeeded(long memoryNeeded); }