diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b5aee00..e47f0bd 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2561,8 +2561,6 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_LOG_TRACE_ID("hive.log.trace.id", "", "Log tracing id that can be used by upstream clients for tracking respective logs. " + "Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use auto-generated session id."), - - HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role", "Comma separated list of configuration options which are immutable at runtime"), @@ -2571,7 +2569,14 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Comma separated list of configuration options which should not be read by normal user like passwords"), HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list", "hive.added.files.path,hive.added.jars.path,hive.added.archives.path", - "Comma separated list of variables which are used internally and should not be configurable."); + "Comma separated list of variables which are used internally and should not be configurable."), + HIVE_TEZ_ENABLE_MEMORY_MANAGER_FOR_HASH_JOIN("hive.tez.enable.memory.manager.for.hash.join", + true, "Enable memory manager for hash join operators in tez"), + HIVE_TEZ_DEFAULT_BUFFER_SIZE("hive.tez.default.buffer.size", 32 * 1024 * 1024L, + "Tez default sort buffer size. Default is 32 MB. Set this appropriately to " + + "number of tables in your query * largest row size among these tables." + + "For e.g if you have a 2 way join and the row size of the 2 tables is 1 MB and 10KB," + + "set this value to 2 * 1MB in the minimum. Highly recommend to keep this value above 32 MB"); public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 6e196e6..44e2b06 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -144,6 +144,7 @@ "hive.tez.current.merge.file.prefix"; // "A comma separated list of work names used as prefix. public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes"; + private static final int ONE_MB = 1024 * 1024; private void addCredentials(MapWork mapWork, DAG dag) { Set paths = mapWork.getPathToAliases().keySet(); @@ -338,7 +339,7 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgePr * Helper function to create an edge property from an edge type. */ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) - throws IOException { + throws IOException { MRHelpers.translateMRConfToTez(conf); String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); @@ -348,22 +349,23 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { case BROADCAST_EDGE: - UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig - .newBuilder(keyClass, valClass) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + UnorderedKVEdgeConfig et1Conf = + UnorderedKVEdgeConfig.newBuilder(keyClass, valClass).setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); return et1Conf.createDefaultBroadcastEdgeProperty(); + case CUSTOM_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + UnorderedPartitionedKVEdgeConfig et2Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); EdgeManagerPluginDescriptor edgeDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = @@ -373,27 +375,41 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration byte[] userPayload = dob.getData(); edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); + case CUSTOM_SIMPLE_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + UnorderedPartitionedKVEdgeConfig et3Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); return et3Conf.createDefaultEdgeProperty(); + case SIMPLE_EDGE: default: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig + OrderedPartitionedKVEdgeConfig.Builder builder = + OrderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) .setFromConfiguration(conf) .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), TezBytesComparator.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null); + + boolean memoryManagerEnabled = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER); + if (memoryManagerEnabled) { + LOG.info( + "Setting the sort buffer memory required to " + edgeProp.getSortBufferMemRequiredMB()); + builder = builder.configureOutput() + .setSortBufferSize((int) edgeProp.getSortBufferMemRequiredMB()).done(); + } + + OrderedPartitionedKVEdgeConfig et4Conf = builder.build(); return et4Conf.createDefaultEdgeProperty(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index ea89cf0..3503280 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -305,6 +306,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont dummyStoreOp.getParentOperators().add(parentOp); mergeJoinOp.getParentOperators().remove(parentIndex); mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp); + dummyStoreOp.setConf(new DummyStoreDesc()); } } mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index d5c3a2d..8e614cf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -280,7 +280,8 @@ public static Object processReduceSinkToHashJoin(ReduceSinkOperator parentRS, Ma // disable auto parallelism for bucket map joins parentRS.getConf().setReducerTraits(EnumSet.of(FIXED)); } - TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets); + TezEdgeProperty edgeProp = + new TezEdgeProperty(null, edgeType, numBuckets, parentRS.getStatistics().getDataSize()); if (mapJoinWork != null) { for (BaseWork myWork: mapJoinWork) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java index 7149f5c..79b882b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java @@ -284,7 +284,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List> bucketColsList = new ArrayList>(); List> sortColsList = new ArrayList>(); byte pos = 0; - int numReduceSinks = 0; // will be set to the larger of the parents for (Operator parentOp : joinOp.getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { // can be mux operator diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java index 3a20cfe..64e3adb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java @@ -22,7 +22,6 @@ import java.util.Comparator; import java.util.Iterator; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -35,12 +34,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -55,12 +52,15 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; /** * MemoryDecider is a simple physical optimizer that adjusts the memory layout of tez tasks. @@ -71,20 +71,28 @@ public class MemoryDecider implements PhysicalPlanResolver { protected static transient final Logger LOG = LoggerFactory.getLogger(MemoryDecider.class); + private TezWork tezWork = null; public class MemoryCalculator implements Dispatcher { - private final long totalAvailableMemory; // how much to we have + private final long mapJoinTotalAvailableMemory; // how much to we have private final long minimumHashTableSize; // minimum size of ht completely in memory private final double inflationFactor; // blowout factor datasize -> memory size private final PhysicalContext pctx; + private final long defaultBufferSize; + private final long ioMemory; public MemoryCalculator(PhysicalContext pctx) { this.pctx = pctx; - this.totalAvailableMemory = HiveConf.getLongVar(pctx.conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + this.mapJoinTotalAvailableMemory = HiveConf.getLongVar(pctx.conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); this.minimumHashTableSize = HiveConf.getIntVar(pctx.conf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS) * HiveConf.getIntVar(pctx.conf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE); this.inflationFactor = HiveConf.getFloatVar(pctx.conf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + this.defaultBufferSize = + HiveConf.getLongVar(pctx.conf, HiveConf.ConfVars.HIVE_TEZ_DEFAULT_BUFFER_SIZE); + LOG.info("Hive tez default buffer size: " + defaultBufferSize); + this.ioMemory = this.pctx.getConf().getLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, + defaultBufferSize); } @SuppressWarnings("unchecked") @@ -96,8 +104,8 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) currTask = ((StatsTask) currTask).getWork().getSourceTask(); } if (currTask instanceof TezTask) { - TezWork work = ((TezTask) currTask).getWork(); - for (BaseWork w : work.getAllWork()) { + tezWork = ((TezTask) currTask).getWork(); + for (BaseWork w : tezWork.getAllWork()) { evaluateWork(w); } } @@ -105,18 +113,47 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) } private void evaluateWork(BaseWork w) throws SemanticException { - + for (TezEdgeProperty edge : getInputEdges(w)) { + if (edge.getEdgeType() == EdgeType.SIMPLE_EDGE) { + if ((edge.getSortBufferMemRequiredMB() * 1024 * 1024) < defaultBufferSize) { + // hasn't been set. Default to configuration. + LOG.info("Setting the sort buffer memory required to " + defaultBufferSize); + edge.setSortBufferMemRequired(defaultBufferSize); + } + } + } if (w instanceof MapWork) { evaluateMapWork((MapWork) w); } else if (w instanceof ReduceWork) { evaluateReduceWork((ReduceWork) w); } else if (w instanceof MergeJoinWork) { evaluateMergeWork((MergeJoinWork) w); + } else if (w instanceof UnionWork) { + evaluateUnionWork((UnionWork) w, getInputEdges(w), getOutputEdges(w)); } else { LOG.info("We are not going to evaluate this work type: " + w.getClass().getCanonicalName()); } } + private void evaluateUnionWork(UnionWork w, List inputEdges, + List outputEdges) { + List realOutputEdges = new ArrayList(); + long totalInputSize = 0; + for (TezEdgeProperty edge : outputEdges) { + if (edge.getEdgeType() == TezEdgeProperty.EdgeType.CONTAINS) { + totalInputSize += edge.getDataFlowSize(); + } else { + realOutputEdges.add(edge); + } + } + + LOG.info("Real total input size for union " + w.getName() + " is " + totalInputSize); + for (TezEdgeProperty edge : realOutputEdges) { + edge.setDataFlowSize(totalInputSize); + } + return; + } + private void evaluateMergeWork(MergeJoinWork w) throws SemanticException { for (BaseWork baseWork : w.getBaseWorkList()) { evaluateOperators(baseWork, pctx); @@ -146,6 +183,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } }); + final Set gbyOps = new LinkedHashSet(); + rules.put(new RuleRegExp("Sort buffer estimator", GroupByOperator.getOperatorName() + "%"), + new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) { + gbyOps.add((GroupByOperator) nd); + return null; + } + }); disp = new DefaultRuleDispatcher(null, rules, null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -156,6 +203,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LinkedHashMap nodeOutput = new LinkedHashMap(); ogw.startWalking(topNodes, nodeOutput); + if (gbyOps.size() != 0) { + evaluateGbyOps(w, gbyOps); + } + if (mapJoins.size() == 0) { return; } @@ -174,6 +225,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } Comparator comp = new Comparator() { + @Override public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { if (mj1 == null || mj2 == null) { throw new NullPointerException(); @@ -190,7 +242,7 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { SortedSet sortedMapJoins = new TreeSet(comp); sortedMapJoins.addAll(mapJoins); - long remainingSize = totalAvailableMemory / 2; + long remainingSize = mapJoinTotalAvailableMemory / 2; Iterator it = sortedMapJoins.iterator(); @@ -221,15 +273,15 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { sortedMapJoins.addAll(mapJoins); totalLargeJoins = total; - if (totalLargeJoins > totalAvailableMemory) { + if (totalLargeJoins > mapJoinTotalAvailableMemory) { // this shouldn't happen throw new HiveException(); } - remainingSize = totalAvailableMemory / 2; + remainingSize = mapJoinTotalAvailableMemory / 2; } // we used half the mem for small joins, now let's scale the rest - double weight = (remainingSize + totalAvailableMemory / 2) / (double) totalLargeJoins; + double weight = (remainingSize + mapJoinTotalAvailableMemory / 2) / (double) totalLargeJoins; for (MapJoinOperator mj : sortedMapJoins) { long size = (long)(weight * sizes.get(mj)); @@ -241,7 +293,7 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { } } catch (HiveException e) { // if we have issues with stats, just scale linearily - long size = totalAvailableMemory / mapJoins.size(); + long size = mapJoinTotalAvailableMemory / mapJoins.size(); if (LOG.isInfoEnabled()) { LOG.info("Scaling mapjoin memory w/o stats"); } @@ -255,6 +307,49 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { } } + private List getOutputEdges(BaseWork w) { + List outputEdges = new ArrayList(); + + for (BaseWork b : tezWork.getChildren(w)) { + outputEdges.add(tezWork.getEdgeProperty(w, b)); + } + + return outputEdges; + } + + private List getInputEdges(BaseWork w) { + List inputEdges = new ArrayList(); + for (BaseWork b : tezWork.getParents(w)) { + inputEdges.add(tezWork.getEdgeProperty(b, w)); + } + + return inputEdges; + } + + private void evaluateGbyOps(BaseWork w, Set gbyOps) { + for (GroupByOperator gbyOp : gbyOps) { + if (gbyOp.getConf().getMode() == GroupByDesc.Mode.HASH) { + // if it is a map side aggregation, lower the io memory to the stats of the gby. + // otherwise do nothing. By experiments that we have run with TPCDS data, this + // gives us the right amount of memory. + long memoryNeeded = gbyOp.getConf().getStatistics().getDataSize(); + if (this.ioMemory > memoryNeeded) { + // find the outgoing edges from this work item and set the memory required to the data + // size in this case. + if (memoryNeeded < defaultBufferSize) { + memoryNeeded = defaultBufferSize; + } + List outputEdges = getOutputEdges(w); + for (TezEdgeProperty edge : outputEdges) { + LOG.info("Setting the memory needed for edge " + edge.getEdgeType() + " between " + + tezWork.getWorkConnectedByEdge(edge) + " to " + memoryNeeded); + edge.setSortBufferMemRequired(memoryNeeded); + } + } + } + } + } + private long computeSizeToFitInMem(MapJoinOperator mj) throws HiveException { return (long) (Math.max(this.minimumHashTableSize, computeInputSize(mj)) * this.inflationFactor); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 70912e0..f02f12a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -134,9 +134,10 @@ public static ReduceWork createReduceWork( if (reduceWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, edgeType, true, - reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer, + reduceSink.getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(edgeType); + edgeProp = new TezEdgeProperty(edgeType, reduceSink.getStatistics().getDataSize()); } tezWork.connect( diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 1a49de1..be92cfe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.lib.Node; @@ -129,6 +130,7 @@ public Object process(Node nd, Stack stack, } else { work = GenTezUtils.createReduceWork(context, root, tezWork); } + work.setMemoryNeeded(operator.getStatistics().getDataSize()); context.rootToWorkMap.put(root, work); } @@ -452,9 +454,10 @@ public Object process(Node nd, Stack stack, if (rWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, edgeType, true, - rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer, + rs.getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(edgeType); + edgeProp = new TezEdgeProperty(edgeType, rs.getStatistics().getDataSize()); } tezWork.connect(work, followingWork, edgeProp); context.connectedReduceSinks.add(rs); @@ -499,7 +502,7 @@ private int getFollowingWorkIndex(TezWork tezWork, UnionWork unionWork, ReduceSi private void connectUnionWorkWithWork(UnionWork unionWork, BaseWork work, TezWork tezWork, GenTezProcContext context) { LOG.debug("Connecting union work (" + unionWork + ") with work (" + work + ")"); - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS, work.getMemoryNeeded()); tezWork.connect(unionWork, work, edgeProp); unionWork.addUnionOperators(context.currentUnionOperators); context.workWithUnionOperators.add(work); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ff971ac..b45960e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -482,8 +482,9 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping stage id rearranger"); } - if ((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) - && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { + if (((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER_FOR_HASH_JOIN)) + && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) + || (conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER))) { physicalCtx = new MemoryDecider().resolve(physicalCtx); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index bc67e5a..575f67c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -30,7 +30,7 @@ protected Statistics statistics; protected transient OpTraits opTraits; protected transient Map opProps; - protected long memNeeded = 0; + protected long memNeeded = -1; @Override @Explain(skipHeader = true, displayName = "Statistics", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java index b088326..3c5d799 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -81,14 +81,14 @@ public void addMergedWork(BaseWork work, BaseWork connectWork, * output name in the reduce sink needs to be setup appropriately. In the case of reduce * side merge work, we need to ensure that the parent work that provides data to this merge * work is setup to point to the right vertex name - the main work name. - * + * * In this case, if the big table work has already been created, we can hook up the merge * work items for the small table correctly. */ setReduceSinkOutputName(connectWork, leafOperatorToFollowingWork, bigTableWork.getName()); } } - + setMemoryNeeded(work.getMemoryNeeded() + getMemoryNeeded()); if (work != null) { /* * Same reason as above. This is the case when we have the main work item after the merge work diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index a3aa12f..aada4d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -18,7 +18,12 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.IOException; + import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TezEdgeProperty { @@ -30,25 +35,31 @@ CUSTOM_SIMPLE_EDGE, } - private HiveConf hiveConf; - private EdgeType edgeType; - private int numBuckets; + private final HiveConf hiveConf; + private final EdgeType edgeType; + private final int numBuckets; private boolean isAutoReduce; private int minReducer; private int maxReducer; private long inputSizePerReducer; + private long dataFlowSize; + private float shuffleMemFraction; + private long sortBufferMemRequired; + + private static final Logger LOG = LoggerFactory.getLogger(DagUtils.class.getName()); + - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, - int buckets) { + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets, long dataFlowSize) { this.hiveConf = hiveConf; this.edgeType = edgeType; this.numBuckets = buckets; + this.dataFlowSize = dataFlowSize; } public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, - int minReducer, int maxReducer, long bytesPerReducer) { - this(hiveConf, edgeType, -1); + int minReducer, int maxReducer, long bytesPerReducer, long dataFlowSize) { + this(hiveConf, edgeType, -1, dataFlowSize); this.minReducer = minReducer; this.maxReducer = maxReducer; this.isAutoReduce = isAutoReduce; @@ -56,7 +67,11 @@ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduc } public TezEdgeProperty(EdgeType edgeType) { - this(null, edgeType, -1); + this(edgeType, 0); + } + + public TezEdgeProperty(EdgeType edgeType, long dataFlowSize) { + this(null, edgeType, -1, dataFlowSize); } public EdgeType getEdgeType() { @@ -86,4 +101,31 @@ public int getMaxReducer() { public long getInputSizePerReducer() { return inputSizePerReducer; } + + public long getDataFlowSize() { + return dataFlowSize; + } + + public void setDataFlowSize(long dataFlowSize) { + this.dataFlowSize = dataFlowSize; + } + + public double getShuffleMemFraction() { + return shuffleMemFraction; + } + + public void setShuffleMemFraction(float shuffleMemFraction) { + this.shuffleMemFraction = shuffleMemFraction; + } + + public long getSortBufferMemRequiredMB() { + long mbSize = sortBufferMemRequired / (1024 * 1024); + mbSize = (mbSize > 0) ? mbSize : 1; + + return mbSize; + } + + public void setSortBufferMemRequired(long sortBufferMemRequired) { + this.sortBufferMemRequired = sortBufferMemRequired; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 8b82c66..6d01dce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -34,6 +34,10 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; + import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -73,8 +77,8 @@ public static boolean isCustomInputType(VertexType vertex) { private final Set leaves = new HashSet(); private final Map> workGraph = new HashMap>(); private final Map> invertedWorkGraph = new HashMap>(); - private final Map, TezEdgeProperty> edgeProperties = - new HashMap, TezEdgeProperty>(); + private final BiMap, TezEdgeProperty> edgeProperties = + HashBiMap.create(); private final Map workVertexTypeMap = new HashMap(); public TezWork(String name) { @@ -250,6 +254,7 @@ public void remove(BaseWork work) { invertedWorkGraph.remove(work); } + @SuppressWarnings({ "rawtypes", "unchecked" }) public EdgeType getEdgeType(BaseWork a, BaseWork b) { return edgeProperties.get(new ImmutablePair(a,b)).getEdgeType(); } @@ -257,6 +262,7 @@ public EdgeType getEdgeType(BaseWork a, BaseWork b) { /** * returns the edge type connecting work a and b */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public TezEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) { return edgeProperties.get(new ImmutablePair(a,b)); } @@ -288,6 +294,7 @@ public int compareTo(Dependency o) { } } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Explain(displayName = "Edges", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public Map> getDependencyMap() { Map> result = new LinkedHashMap>(); @@ -351,6 +358,7 @@ public int compareTo(Dependency o) { * to be added prior to calling connect. * @param */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public void connect(BaseWork a, BaseWork b, TezEdgeProperty edgeProp) { workGraph.get(a).add(b); @@ -405,4 +413,8 @@ public boolean getLlapMode() { } return false; } + + public Pair getWorkConnectedByEdge(TezEdgeProperty edge) { + return edgeProperties.inverse().get(edge); + } }