diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ad469c2..6d0cf15 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2216,7 +2216,11 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { new StringSet("throw", "skip", "ignore"), "The approach msck should take with HDFS " + "directories that are partition-like but contain unsupported characters. 'throw' (an " + "exception) is the default; 'skip' will skip the invalid directories and still repair the" + - " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"); + " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"), + HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", true, + "Enable memory manager for tez"), + HIVE_HASH_TABLE_INFLATION_FACTOR("hive.hash.table.inflation.factor", (float) 2.0, + "Expected inflation factor between disk/in memory representation of hash tables"); public final String varname; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index c688092..51ce226 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -81,17 +81,22 @@ public void load(MapJoinTableContainer[] mapJoinTables, hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE); boolean useHybridGraceHashJoin = desc.isHybridHashJoin(); boolean isFirstKey = true; - // TODO remove this after memory manager is in - long noConditionalTaskThreshold = HiveConf.getLongVar( + + // Get the total available memory from memory manager + long totalMapJoinMemory = desc.getMemoryNeeded(); + if (totalMapJoinMemory <= 0) { + totalMapJoinMemory = HiveConf.getLongVar( hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + } + long processMaxMemory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); - if (noConditionalTaskThreshold > processMaxMemory) { + if (totalMapJoinMemory > processMaxMemory) { float hashtableMemoryUsage = HiveConf.getFloatVar( hconf, HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE); - LOG.warn("noConditionalTaskThreshold value of " + noConditionalTaskThreshold + + LOG.warn("totalMapJoinMemory value of " + totalMapJoinMemory + " is greater than the max memory size of " + processMaxMemory); // Don't want to attempt to grab more memory than we have available .. percentage is a bit arbitrary - noConditionalTaskThreshold = (long) (processMaxMemory * hashtableMemoryUsage); + totalMapJoinMemory = (long) (processMaxMemory * hashtableMemoryUsage); } // Only applicable to n-way Hybrid Grace Hash Join @@ -118,7 +123,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, } tableMemorySizes = divideHybridHashTableMemory(mapJoinTables, desc, - totalSize, noConditionalTaskThreshold); + totalSize, totalMapJoinMemory); // Using biggest small table, calculate number of partitions to create for each small table long memory = tableMemorySizes.get(biggest); int numPartitions = 0; @@ -176,7 +181,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, if (mapJoinTables.length > 2) { memory = tableMemorySizes.get(pos); } else { // binary join - memory = noConditionalTaskThreshold; + memory = totalMapJoinMemory; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java new file mode 100644 index 0000000..448643e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java @@ -0,0 +1,265 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.Stack; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MergeJoinWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; +import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.hive.ql.plan.TezWork; + +/** + * MemoryDecider is a simple physical optimizer that adjusts the memory layout of tez tasks. + * Currently it only cares about hash table sizes for the graceful hash join. + * It tried to keep hashtables that are small and early in the operator pipeline completely + * in memory. + */ +public class MemoryDecider implements PhysicalPlanResolver { + + protected static transient final Log LOG = LogFactory.getLog(MemoryDecider.class); + + public class MemoryCalculator implements Dispatcher { + + private final long totalAvailableMemory; // how much to we have + private final long minimumHashTableSize; // minimum size of ht completely in memory + private final double inflationFactor; // blowout factor datasize -> memory size + private final PhysicalContext pctx; + + public MemoryCalculator(PhysicalContext pctx) { + this.pctx = pctx; + this.totalAvailableMemory = HiveConf.getLongVar(pctx.conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + this.minimumHashTableSize = HiveConf.getIntVar(pctx.conf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS) + * HiveConf.getIntVar(pctx.conf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE); + this.inflationFactor = HiveConf.getFloatVar(pctx.conf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + } + + @SuppressWarnings("unchecked") + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + Task currTask = (Task) nd; + if (currTask instanceof StatsTask) { + currTask = ((StatsTask) currTask).getWork().getSourceTask(); + } + if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w : work.getAllWork()) { + evaluateWork(w); + } + } + return null; + } + + private void evaluateWork(BaseWork w) throws SemanticException { + + if (w instanceof MapWork) { + evaluateMapWork((MapWork) w); + } else if (w instanceof ReduceWork) { + evaluateReduceWork((ReduceWork) w); + } else if (w instanceof MergeJoinWork) { + evaluateMergeWork((MergeJoinWork) w); + } else { + LOG.info("We are not going to evaluate this work type: " + w.getClass().getCanonicalName()); + } + } + + private void evaluateMergeWork(MergeJoinWork w) throws SemanticException { + for (BaseWork baseWork : w.getBaseWorkList()) { + evaluateOperators(baseWork, pctx); + } + } + + private void evaluateReduceWork(ReduceWork w) throws SemanticException { + evaluateOperators(w, pctx); + } + + private void evaluateMapWork(MapWork w) throws SemanticException { + evaluateOperators(w, pctx); + } + + private void evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException { + // lets take a look at the operator memory requirements. + Dispatcher disp = null; + final Set mapJoins = new HashSet(); + + Map rules = new HashMap(); + rules.put(new RuleRegExp("Map join memory estimator", + MapJoinOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) { + mapJoins.add((MapJoinOperator) nd); + return null; + } + }); + disp = new DefaultRuleDispatcher(null, rules, null); + + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(w.getAllRootOperators()); + + LinkedHashMap nodeOutput = new LinkedHashMap(); + ogw.startWalking(topNodes, nodeOutput); + + if (mapJoins.size() == 0) { + return; + } + + try { + long total = 0; + final Map sizes = new HashMap(); + + for (MapJoinOperator mj : mapJoins) { + long size = computeInputSize(mj); + sizes.put(mj, size); + } + + Comparator comp = new Comparator() { + public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { + return Long.compare(sizes.get(mj1), sizes.get(mj2)); + } + }; + + SortedSet sortedMapJoins = new TreeSet(comp); + sortedMapJoins.addAll(mapJoins); + + long remainingSize = totalAvailableMemory / 2; + + Iterator it = sortedMapJoins.iterator(); + + while (it.hasNext()) { + MapJoinOperator mj = it.next(); + long size = computeSizeToFitInMem(mj, sizes); + if (LOG.isDebugEnabled()) { + LOG.debug("MapJoin: " + mj + ", size: " + size + ", remaining: " + remainingSize); + } + + if (size < remainingSize) { + if (LOG.isInfoEnabled()) { + LOG.info("Setting " + size + " bytes needed for " + mj + " (in-mem)"); + } + + mj.getConf().setMemoryNeeded(size); + remainingSize -= size; + it.remove(); + } else { + total += sizes.get(mj); + } + } + + if (sortedMapJoins.isEmpty()) { + return; + } + + // we used half the mem for small joins, now let's scale the rest + double weight = (remainingSize + totalAvailableMemory / 2) / (double) total; + + for (MapJoinOperator mj : sortedMapJoins) { + long size = (long)(weight * sizes.get(mj)); + if (LOG.isInfoEnabled()) { + LOG.info("Setting " + size + " bytes needed for " + mj + " (spills)"); + } + + mj.getConf().setMemoryNeeded(size); + } + } catch (HiveException e) { + // if we have issues with stats, just scale linearily + long size = totalAvailableMemory / mapJoins.size(); + if (LOG.isInfoEnabled()) { + LOG.info("Scaling mapjoin memory w/o stats"); + } + + for (MapJoinOperator mj : mapJoins) { + if (LOG.isInfoEnabled()) { + LOG.info("Setting " + size + " bytes needed for " + mj + " (fallback)"); + } + mj.getConf().setMemoryNeeded(size); + } + } + } + + private long computeSizeToFitInMem(MapJoinOperator mj, Map sizeMap) { + return Math.max(this.minimumHashTableSize, (long) (sizeMap.get(mj) * this.inflationFactor)); + } + + private long computeInputSize(MapJoinOperator mj) throws HiveException { + long size = 0; + + if (mj.getConf() != null && mj.getConf().getParentDataSizes() != null) { + for (long l: mj.getConf().getParentDataSizes().values()) { + size += l; + } + } + + if (size == 0) { + throw new HiveException("No data sizes"); + } + return size; + } + + public class DefaultRule implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return null; + } + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + + pctx.getConf(); + + // create dispatcher and graph walker + Dispatcher disp = new MemoryCalculator(pctx); + TaskGraphWalker ogw = new TaskGraphWalker(disp); + + // get all the tasks nodes from root task + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + + // begin to walk through the task tree. + ogw.startWalking(topNodes, null); + return pctx; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 03d41f2..8ab7cd4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; +import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -475,6 +476,11 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } else { LOG.debug("Skipping stage id rearranger"); } + + if ((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) + && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { + physicalCtx = new MemoryDecider().resolve(physicalCtx); + } return; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index bc34710..463da5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -30,6 +30,7 @@ protected Statistics statistics; protected transient OpTraits opTraits; protected transient Map opProps; + protected long memNeeded = 0; static { PTFUtils.makeTransient(AbstractOperatorDesc.class, "opProps"); @@ -59,14 +60,17 @@ public void setVectorMode(boolean vm) { this.vectorMode = vm; } + @Override public OpTraits getTraits() { return opTraits; } + @Override public void setTraits(OpTraits opTraits) { this.opTraits = opTraits; } + @Override public Map getOpProps() { return opProps; } @@ -74,4 +78,14 @@ public void setTraits(OpTraits opTraits) { public void setOpProps(Map props) { this.opProps = props; } + + @Override + public long getMemoryNeeded() { + return memNeeded; + } + + @Override + public void setMemoryNeeded(long memNeeded) { + this.memNeeded = memNeeded; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java index fb4d3b4..16be499 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java @@ -28,4 +28,6 @@ public OpTraits getTraits(); public void setTraits(OpTraits opTraits); public Map getOpProps(); + public long getMemoryNeeded(); + public void setMemoryNeeded(long memoryNeeded); } diff --git a/ql/src/test/queries/clientpositive/mrr.q b/ql/src/test/queries/clientpositive/mrr.q index 6960547..4d3aebe 100644 --- a/ql/src/test/queries/clientpositive/mrr.q +++ b/ql/src/test/queries/clientpositive/mrr.q @@ -1,4 +1,6 @@ set hive.explain.user=false; +set hive.auto.convert.join.noconditionaltask.size=30000000; + -- simple query with multiple reduce stages -- SORT_QUERY_RESULTS