diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3ab1dba..bdd06f1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2628,7 +2628,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", "Name of the SASL mechanism to use for authentication."), SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "", - "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + + "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." + "This is only necessary if the host has mutiple network addresses and if a different network address other than " + "hive.server2.thrift.bind.host is to be used."), @@ -2657,8 +2657,6 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_LOG_TRACE_ID("hive.log.trace.id", "", "Log tracing id that can be used by upstream clients for tracking respective logs. " + "Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use auto-generated session id."), - - HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role", "Comma separated list of configuration options which are immutable at runtime"), @@ -2667,7 +2665,14 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Comma separated list of configuration options which should not be read by normal user like passwords"), HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list", "hive.added.files.path,hive.added.jars.path,hive.added.archives.path", - "Comma separated list of variables which are used internally and should not be configurable."); + "Comma separated list of variables which are used internally and should not be configurable."), + HIVE_TEZ_ENABLE_MEMORY_MANAGER_FOR_HASH_JOIN("hive.tez.enable.memory.manager.for.hash.join", + true, "Enable memory manager for hash join operators in tez"), + HIVE_TEZ_DEFAULT_BUFFER_SIZE_MB("hive.tez.default.buffer.size.mb", 32L, + "Tez default sort buffer size. Default is 32 MB. Set this appropriately to " + + "number of tables in your query * largest row size among these tables." + + "For e.g if you have a 2 way join and the row size of the 2 tables is 1 MB and 10KB," + + "set this value to 2 * 1MB in the minimum. Highly recommend to keep this value above 32 MB"); public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 39866a3..2796bb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -147,7 +147,6 @@ "hive.tez.current.merge.file.prefix"; // "A comma separated list of work names used as prefix. public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes"; - /** * Notifiers to synchronize resource localization across threads. If one thread is localizing * a file, other threads can wait on the corresponding notifier object instead of just sleeping @@ -345,7 +344,7 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgePr * Helper function to create an edge property from an edge type. */ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) - throws IOException { + throws IOException { MRHelpers.translateMRConfToTez(conf); String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); @@ -355,22 +354,23 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { case BROADCAST_EDGE: - UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig - .newBuilder(keyClass, valClass) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + UnorderedKVEdgeConfig et1Conf = + UnorderedKVEdgeConfig.newBuilder(keyClass, valClass).setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); return et1Conf.createDefaultBroadcastEdgeProperty(); + case CUSTOM_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + UnorderedPartitionedKVEdgeConfig et2Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); EdgeManagerPluginDescriptor edgeDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = @@ -380,27 +380,41 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration byte[] userPayload = dob.getData(); edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); + case CUSTOM_SIMPLE_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + UnorderedPartitionedKVEdgeConfig et3Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); return et3Conf.createDefaultEdgeProperty(); + case SIMPLE_EDGE: default: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig + OrderedPartitionedKVEdgeConfig.Builder builder = + OrderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) .setFromConfiguration(conf) .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), TezBytesComparator.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null); + + boolean memoryManagerEnabled = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER); + if (memoryManagerEnabled) { + LOG.info( + "Setting the sort buffer memory required to " + edgeProp.getSortBufferMemRequiredMB()); + builder = builder.configureOutput() + .setSortBufferSize((int) edgeProp.getSortBufferMemRequiredMB()).done(); + } + + OrderedPartitionedKVEdgeConfig et4Conf = builder.build(); return et4Conf.createDefaultEdgeProperty(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 00bc193..5b1b8bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -307,6 +308,8 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont dummyStoreOp.getParentOperators().add(parentOp); mergeJoinOp.getParentOperators().remove(parentIndex); mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp); + dummyStoreOp.setConf(new DummyStoreDesc()); + dummyStoreOp.setStatistics(parentOp.getStatistics()); } } mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators()); @@ -372,7 +375,7 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont // MapRecordProcessor and ReduceRecordProcessor with respect to the sources. @SuppressWarnings({"rawtypes","unchecked"}) Set set = - OperatorUtils.findOperatorsUpstream((Collection)parentOp.getParentOperators(), + OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(), ReduceSinkOperator.class); if (size < 0) { size = set.size(); @@ -589,7 +592,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c // on size and there's another one that's bigger. return -1; } - + if (inputSize/buckets > maxSize) { if (!bigTableCandidateSet.contains(pos)) { // can't use the current table as the big table, but it's too diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index c38c6d7..0b42b3d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -280,7 +280,8 @@ public static Object processReduceSinkToHashJoin(ReduceSinkOperator parentRS, Ma // disable auto parallelism for bucket map joins parentRS.getConf().setReducerTraits(EnumSet.of(FIXED)); } - TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets); + TezEdgeProperty edgeProp = + new TezEdgeProperty(null, edgeType, numBuckets, parentRS.getStatistics().getDataSize()); if (mapJoinWork != null) { for (BaseWork myWork: mapJoinWork) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java index 7149f5c..79b882b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java @@ -284,7 +284,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List> bucketColsList = new ArrayList>(); List> sortColsList = new ArrayList>(); byte pos = 0; - int numReduceSinks = 0; // will be set to the larger of the parents for (Operator parentOp : joinOp.getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { // can be mux operator diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java index 3a20cfe..93dd616 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java @@ -22,7 +22,6 @@ import java.util.Comparator; import java.util.Iterator; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -35,12 +34,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -55,12 +53,15 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; /** * MemoryDecider is a simple physical optimizer that adjusts the memory layout of tez tasks. @@ -71,20 +72,28 @@ public class MemoryDecider implements PhysicalPlanResolver { protected static transient final Logger LOG = LoggerFactory.getLogger(MemoryDecider.class); + private TezWork tezWork = null; public class MemoryCalculator implements Dispatcher { - private final long totalAvailableMemory; // how much to we have + private final long mapJoinTotalAvailableMemory; // how much to we have private final long minimumHashTableSize; // minimum size of ht completely in memory private final double inflationFactor; // blowout factor datasize -> memory size private final PhysicalContext pctx; + private final long defaultBufferSize; + private final long ioMemory; public MemoryCalculator(PhysicalContext pctx) { this.pctx = pctx; - this.totalAvailableMemory = HiveConf.getLongVar(pctx.conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + this.mapJoinTotalAvailableMemory = HiveConf.getLongVar(pctx.conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); this.minimumHashTableSize = HiveConf.getIntVar(pctx.conf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS) * HiveConf.getIntVar(pctx.conf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE); this.inflationFactor = HiveConf.getFloatVar(pctx.conf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + this.defaultBufferSize = + HiveConf.getLongVar(pctx.conf, HiveConf.ConfVars.HIVE_TEZ_DEFAULT_BUFFER_SIZE_MB); + LOG.info("Hive tez default buffer size: " + defaultBufferSize); + this.ioMemory = this.pctx.getConf().getLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, + defaultBufferSize); } @SuppressWarnings("unchecked") @@ -96,8 +105,8 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) currTask = ((StatsTask) currTask).getWork().getSourceTask(); } if (currTask instanceof TezTask) { - TezWork work = ((TezTask) currTask).getWork(); - for (BaseWork w : work.getAllWork()) { + tezWork = ((TezTask) currTask).getWork(); + for (BaseWork w : tezWork.getAllWork()) { evaluateWork(w); } } @@ -105,18 +114,57 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) } private void evaluateWork(BaseWork w) throws SemanticException { + for (TezEdgeProperty edge : getInputEdges(w)) { + if (edge.getEdgeType() == EdgeType.SIMPLE_EDGE) { + long sortBufferComputedMemory = edge.getDataFlowSize() / (1024 * 1024); + if (sortBufferComputedMemory < this.ioMemory) { + if (sortBufferComputedMemory < defaultBufferSize) { + sortBufferComputedMemory = defaultBufferSize; + } + edge.setSortBufferMemRequiredMB(sortBufferComputedMemory); + if (w instanceof ReduceWork) { + ((ReduceWork) w).setSortBufferMemory(sortBufferComputedMemory); + } else if (w instanceof MergeJoinWork) { + ((ReduceWork) (((MergeJoinWork) w).getMainWork())) + .setSortBufferMemory(sortBufferComputedMemory); + } + LOG.info("Setting the sort buffer memory required to " + sortBufferComputedMemory); + } + } + } if (w instanceof MapWork) { evaluateMapWork((MapWork) w); } else if (w instanceof ReduceWork) { evaluateReduceWork((ReduceWork) w); } else if (w instanceof MergeJoinWork) { evaluateMergeWork((MergeJoinWork) w); + } else if (w instanceof UnionWork) { + evaluateUnionWork((UnionWork) w, getInputEdges(w), getOutputEdges(w)); } else { LOG.info("We are not going to evaluate this work type: " + w.getClass().getCanonicalName()); } } + private void evaluateUnionWork(UnionWork w, List inputEdges, + List outputEdges) { + List realOutputEdges = new ArrayList(); + long totalInputSize = 0; + for (TezEdgeProperty edge : outputEdges) { + if (edge.getEdgeType() == TezEdgeProperty.EdgeType.CONTAINS) { + totalInputSize += edge.getDataFlowSize(); + } else { + realOutputEdges.add(edge); + } + } + + LOG.info("Real total input size for union " + w.getName() + " is " + totalInputSize); + for (TezEdgeProperty edge : realOutputEdges) { + edge.setDataFlowSize(totalInputSize); + } + return; + } + private void evaluateMergeWork(MergeJoinWork w) throws SemanticException { for (BaseWork baseWork : w.getBaseWorkList()) { evaluateOperators(baseWork, pctx); @@ -174,6 +222,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } Comparator comp = new Comparator() { + @Override public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { if (mj1 == null || mj2 == null) { throw new NullPointerException(); @@ -190,7 +239,7 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { SortedSet sortedMapJoins = new TreeSet(comp); sortedMapJoins.addAll(mapJoins); - long remainingSize = totalAvailableMemory / 2; + long remainingSize = mapJoinTotalAvailableMemory / 2; Iterator it = sortedMapJoins.iterator(); @@ -221,15 +270,15 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { sortedMapJoins.addAll(mapJoins); totalLargeJoins = total; - if (totalLargeJoins > totalAvailableMemory) { + if (totalLargeJoins > mapJoinTotalAvailableMemory) { // this shouldn't happen throw new HiveException(); } - remainingSize = totalAvailableMemory / 2; + remainingSize = mapJoinTotalAvailableMemory / 2; } // we used half the mem for small joins, now let's scale the rest - double weight = (remainingSize + totalAvailableMemory / 2) / (double) totalLargeJoins; + double weight = (remainingSize + mapJoinTotalAvailableMemory / 2) / (double) totalLargeJoins; for (MapJoinOperator mj : sortedMapJoins) { long size = (long)(weight * sizes.get(mj)); @@ -241,7 +290,7 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { } } catch (HiveException e) { // if we have issues with stats, just scale linearily - long size = totalAvailableMemory / mapJoins.size(); + long size = mapJoinTotalAvailableMemory / mapJoins.size(); if (LOG.isInfoEnabled()) { LOG.info("Scaling mapjoin memory w/o stats"); } @@ -255,6 +304,25 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { } } + private List getOutputEdges(BaseWork w) { + List outputEdges = new ArrayList(); + + for (BaseWork b : tezWork.getChildren(w)) { + outputEdges.add(tezWork.getEdgeProperty(w, b)); + } + + return outputEdges; + } + + private List getInputEdges(BaseWork w) { + List inputEdges = new ArrayList(); + for (BaseWork b : tezWork.getParents(w)) { + inputEdges.add(tezWork.getEdgeProperty(b, w)); + } + + return inputEdges; + } + private long computeSizeToFitInMem(MapJoinOperator mj) throws HiveException { return (long) (Math.max(this.minimumHashTableSize, computeInputSize(mj)) * this.inflationFactor); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 8eab3af..32d1624 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -134,9 +134,10 @@ public static ReduceWork createReduceWork( if (reduceWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, edgeType, true, - reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer, + reduceSink.getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(edgeType); + edgeProp = new TezEdgeProperty(edgeType, reduceSink.getStatistics().getDataSize()); } tezWork.connect( diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 46d279e..d1c0a21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.lib.Node; @@ -129,6 +130,11 @@ public Object process(Node nd, Stack stack, } else { work = GenTezUtils.createReduceWork(context, root, tezWork); } + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER) + && (operator != null) && (operator.getStatistics() != null) + && (operator instanceof ReduceSinkOperator)) { + work.setMemoryNeeded(operator.getStatistics().getDataSize()); + } context.rootToWorkMap.put(root, work); } @@ -451,9 +457,10 @@ public Object process(Node nd, Stack stack, if (rWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, edgeType, true, - rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer, + rs.getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(edgeType); + edgeProp = new TezEdgeProperty(edgeType, rs.getStatistics().getDataSize()); } tezWork.connect(work, followingWork, edgeProp); context.connectedReduceSinks.add(rs); @@ -498,7 +505,7 @@ private int getFollowingWorkIndex(TezWork tezWork, UnionWork unionWork, ReduceSi private void connectUnionWorkWithWork(UnionWork unionWork, BaseWork work, TezWork tezWork, GenTezProcContext context) { LOG.debug("Connecting union work (" + unionWork + ") with work (" + work + ")"); - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS, work.getMemoryNeeded()); tezWork.connect(unionWork, work, edgeProp); unionWork.addUnionOperators(context.currentUnionOperators); context.workWithUnionOperators.add(work); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ff971ac..b45960e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -482,8 +482,9 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping stage id rearranger"); } - if ((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) - && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { + if (((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER_FOR_HASH_JOIN)) + && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) + || (conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER))) { physicalCtx = new MemoryDecider().resolve(physicalCtx); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index adec5c7..a2765c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -30,7 +30,7 @@ protected Statistics statistics; protected transient OpTraits opTraits; protected transient Map opProps; - protected long memNeeded = 0; + protected long memNeeded = -1; @Override @Explain(skipHeader = true, displayName = "Statistics") diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 429a058..3004e0f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.hive.ql.plan.Explain.Level; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java index a5527dc..25f7437 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -80,20 +80,20 @@ public void addMergedWork(BaseWork work, BaseWork connectWork, if (connectWork != null) { this.mergeWorkList.add(connectWork); + setMemoryNeeded(getMemoryNeeded() + connectWork.getMemoryNeeded()); if ((connectWork instanceof ReduceWork) && (bigTableWork != null)) { /* * For tez to route data from an up-stream vertex correctly to the following vertex, the * output name in the reduce sink needs to be setup appropriately. In the case of reduce * side merge work, we need to ensure that the parent work that provides data to this merge * work is setup to point to the right vertex name - the main work name. - * + * * In this case, if the big table work has already been created, we can hook up the merge * work items for the small table correctly. */ setReduceSinkOutputName(connectWork, leafOperatorToFollowingWork, bigTableWork.getName()); } } - if (work != null) { /* * Same reason as above. This is the case when we have the main work item after the merge work @@ -104,6 +104,7 @@ public void addMergedWork(BaseWork work, BaseWork connectWork, setReduceSinkOutputName(mergeWork, leafOperatorToFollowingWork, work.getName()); } } + setMemoryNeeded(work.getMemoryNeeded() + getMemoryNeeded()); } } @@ -169,7 +170,8 @@ public void setLlapMode(boolean llapMode) { public boolean getLlapMode() { return getMainWork().getLlapMode(); } - + + @Override public void addDummyOp(HashTableDummyOperator dummyOp) { getMainWork().addDummyOp(dummyOp); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 41d9ffe..632ae9f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -91,7 +91,7 @@ private float topNMemoryUsage = -1; private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded //flag used to control how TopN handled for PTF/Windowing partitions. - private boolean isPTFReduceSink = false; + private boolean isPTFReduceSink = false; private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable public static enum ReducerTraits { @@ -438,7 +438,7 @@ public final void setReducerTraits(EnumSet traits) { // reducers or hash function. boolean wasUnset = this.reduceTraits.remove(ReducerTraits.UNSET); - + if (this.reduceTraits.contains(ReducerTraits.FIXED)) { return; } else if (traits.contains(ReducerTraits.FIXED)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index 72fc4ca..fa5e707 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -88,6 +88,7 @@ public ReduceWork(String name) { private ObjectInspector keyObjectInspector = null; private ObjectInspector valueObjectInspector = null; + private long sortBufferMemory = 0; /** * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing @@ -142,6 +143,15 @@ public void setTagToValueDesc(final List tagToValueDesc) { this.tagToValueDesc = tagToValueDesc; } + public void setSortBufferMemory(long sortBufferComputedMemory) { + this.sortBufferMemory = sortBufferComputedMemory; + } + + @Explain(displayName = "Sort Buffer Memory (MB)", explainLevels = { Level.EXTENDED }) + public long getSortBufferMemory() { + return this.sortBufferMemory; + } + @Explain(displayName = "Execution mode", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getExecutionMode() { if (vectorMode) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index a3aa12f..8d3a786 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -18,7 +18,12 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.IOException; + import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TezEdgeProperty { @@ -30,25 +35,31 @@ CUSTOM_SIMPLE_EDGE, } - private HiveConf hiveConf; - private EdgeType edgeType; - private int numBuckets; + private final HiveConf hiveConf; + private final EdgeType edgeType; + private final int numBuckets; private boolean isAutoReduce; private int minReducer; private int maxReducer; private long inputSizePerReducer; + private long dataFlowSize; + private float shuffleMemFraction; + private long sortBufferMemRequired; + + private static final Logger LOG = LoggerFactory.getLogger(DagUtils.class.getName()); + - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, - int buckets) { + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets, long dataFlowSize) { this.hiveConf = hiveConf; this.edgeType = edgeType; this.numBuckets = buckets; + this.dataFlowSize = dataFlowSize; } public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, - int minReducer, int maxReducer, long bytesPerReducer) { - this(hiveConf, edgeType, -1); + int minReducer, int maxReducer, long bytesPerReducer, long dataFlowSize) { + this(hiveConf, edgeType, -1, dataFlowSize); this.minReducer = minReducer; this.maxReducer = maxReducer; this.isAutoReduce = isAutoReduce; @@ -56,7 +67,11 @@ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduc } public TezEdgeProperty(EdgeType edgeType) { - this(null, edgeType, -1); + this(edgeType, 0); + } + + public TezEdgeProperty(EdgeType edgeType, long dataFlowSize) { + this(null, edgeType, -1, dataFlowSize); } public EdgeType getEdgeType() { @@ -86,4 +101,31 @@ public int getMaxReducer() { public long getInputSizePerReducer() { return inputSizePerReducer; } + + public long getDataFlowSize() { + return dataFlowSize; + } + + public void setDataFlowSize(long dataFlowSize) { + this.dataFlowSize = dataFlowSize; + } + + public double getShuffleMemFraction() { + return shuffleMemFraction; + } + + public void setShuffleMemFraction(float shuffleMemFraction) { + this.shuffleMemFraction = shuffleMemFraction; + } + + public long getSortBufferMemRequiredMB() { + long mbSize = sortBufferMemRequired; + mbSize = (mbSize > 0) ? mbSize : 1; + + return mbSize; + } + + public void setSortBufferMemRequiredMB(long sortBufferMemRequired) { + this.sortBufferMemRequired = sortBufferMemRequired; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index e345215..f5c7029 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -32,8 +31,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; @@ -67,8 +64,6 @@ public static boolean isCustomInputType(VertexType vertex) { } } - private static transient final Logger LOG = LoggerFactory.getLogger(TezWork.class); - private static int counter; private final String dagId; private final String queryName; @@ -190,6 +185,7 @@ public void disconnect(BaseWork a, BaseWork b) { if (getChildren(a).isEmpty()) { leaves.add(a); } + edgeProperties.remove(new ImmutablePair(a, b)); } /** @@ -243,6 +239,7 @@ public void remove(BaseWork work) { if (invertedWorkGraph.get(w).size() == 0) { roots.add(w); } + edgeProperties.remove(new ImmutablePair(work, w)); } for (BaseWork w: parents) { @@ -250,6 +247,7 @@ public void remove(BaseWork work) { if (workGraph.get(w).size() == 0) { leaves.add(w); } + edgeProperties.remove(new ImmutablePair(w, work)); } roots.remove(work); @@ -259,6 +257,7 @@ public void remove(BaseWork work) { invertedWorkGraph.remove(work); } + @SuppressWarnings({ "rawtypes", "unchecked" }) public EdgeType getEdgeType(BaseWork a, BaseWork b) { return edgeProperties.get(new ImmutablePair(a,b)).getEdgeType(); } @@ -266,6 +265,7 @@ public EdgeType getEdgeType(BaseWork a, BaseWork b) { /** * returns the edge type connecting work a and b */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public TezEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) { return edgeProperties.get(new ImmutablePair(a,b)); } @@ -297,6 +297,7 @@ public int compareTo(Dependency o) { } } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Explain(displayName = "Edges", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public Map> getDependencyMap() { Map> result = new LinkedHashMap>(); @@ -360,6 +361,7 @@ public int compareTo(Dependency o) { * to be added prior to calling connect. * @param */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public void connect(BaseWork a, BaseWork b, TezEdgeProperty edgeProp) { workGraph.get(a).add(b); diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java index fc42aaa..73a0834 100644 --- ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java +++ ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -106,6 +107,7 @@ protected void setupMapWork(MapWork mapWork, GenTezProcContext context, ts.getChildOperators().add(rs); rs.getParentOperators().add(ts); rs.getChildOperators().add(fs); + rs.setStatistics(new Statistics()); fs.getParentOperators().add(rs); ctx.preceedingWork = null; ctx.currentRootOperator = ts; diff --git ql/src/test/results/clientpositive/llap/tez_join.q.out ql/src/test/results/clientpositive/llap/tez_join.q.out index ca849c2..6daec49 100644 --- ql/src/test/results/clientpositive/llap/tez_join.q.out +++ ql/src/test/results/clientpositive/llap/tez_join.q.out @@ -92,6 +92,8 @@ STAGE PLANS: expressions: KEY.reducesinkkey0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Dummy Store + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Execution mode: llap Reduce Operator Tree: Select Operator diff --git ql/src/test/results/clientpositive/llap/tez_smb_1.q.out ql/src/test/results/clientpositive/llap/tez_smb_1.q.out index 570118c..ceecf43 100644 --- ql/src/test/results/clientpositive/llap/tez_smb_1.q.out +++ ql/src/test/results/clientpositive/llap/tez_smb_1.q.out @@ -138,6 +138,8 @@ STAGE PLANS: expressions: key (type: int) outputColumnNames: _col0 Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Dummy Store + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE Map Operator Tree: TableScan alias: s1 @@ -545,6 +547,8 @@ STAGE PLANS: expressions: KEY.reducesinkkey0 (type: int) outputColumnNames: _col0 Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Dummy Store + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE Execution mode: llap Reduce Operator Tree: Select Operator diff --git ql/src/test/results/clientpositive/llap/tez_smb_main.q.out ql/src/test/results/clientpositive/llap/tez_smb_main.q.out index 5d18ddd..37322b9 100644 --- ql/src/test/results/clientpositive/llap/tez_smb_main.q.out +++ ql/src/test/results/clientpositive/llap/tez_smb_main.q.out @@ -594,6 +594,8 @@ STAGE PLANS: expressions: key (type: int), value (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Dummy Store + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE Map Operator Tree: TableScan alias: b @@ -838,6 +840,8 @@ STAGE PLANS: expressions: key (type: int) outputColumnNames: _col0 Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Dummy Store + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE Map Operator Tree: TableScan alias: s1 diff --git ql/src/test/results/clientpositive/tez/explainuser_3.q.out ql/src/test/results/clientpositive/tez/explainuser_3.q.out index 3f948a5..18e56a6 100644 --- ql/src/test/results/clientpositive/tez/explainuser_3.q.out +++ ql/src/test/results/clientpositive/tez/explainuser_3.q.out @@ -31,13 +31,13 @@ Stage-0 Stage-1 Reducer 2 vectorized File Output Operator [FS_8] - Select Operator [OP_7] (rows=10 width=170) + Select Operator [OP_7] (rows=10 width=171) Output:["_col0","_col1"] <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_6] - Select Operator [OP_5] (rows=10 width=170) + Select Operator [OP_5] (rows=10 width=171) Output:["_col0","_col1"] - TableScan [TS_0] (rows=10 width=170) + TableScan [TS_0] (rows=10 width=171) default@acid_vectorized,acid_vectorized, ACID table,Tbl:COMPLETE,Col:NONE,Output:["a","b"] PREHOOK: query: explain select key, value