diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dffdb5c..134c66f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2320,7 +2320,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", true, "Enable memory manager for tez"), HIVE_HASH_TABLE_INFLATION_FACTOR("hive.hash.table.inflation.factor", (float) 2.0, - "Expected inflation factor between disk/in memory representation of hash tables"); + "Expected inflation factor between disk/in memory representation of hash tables"), + HIVE_TEZ_ENABLE_MEMORY_MANAGER_FOR_HASH_JOIN("hive.tez.enable.memory.manager.for.hash.join", + true, "Enable memory manager for hash join operators in tez"); public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 9867739..c139deb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -390,8 +390,11 @@ public static FastBitSet groupingSet2BitSet(int value) { if (hashAggr) { computeMaxEntriesHashAggr(hconf); } - memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + maxMemory = this.getConf().getMemoryNeeded(); + if (maxMemory < 0) { + memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + } memoryThreshold = this.getConf().getMemoryThreshold(); return result; } @@ -865,11 +868,13 @@ private boolean shouldBeFlushed(KeyWrapper newKeys) { // The fixed size for the aggregation class is already known. Get the // variable portion of the size every NUMROWSESTIMATESIZE rows. if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) { - //check how much memory left memory - usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - rate = (float) usedMemory / (float) maxMemory; - if(rate > memoryThreshold){ - return true; + if (memoryMXBean != null) { + // check how much memory left memory + usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); + rate = (float) usedMemory / (float) maxMemory; + if (rate > memoryThreshold) { + return true; + } } for (Integer pos : keyPositionsSize) { Object key = newKeys.getKeyArray()[pos.intValue()]; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 19da1c3..39f2147 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -336,7 +336,7 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgePr * Helper function to create an edge property from an edge type. */ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) - throws IOException { + throws IOException { MRHelpers.translateMRConfToTez(conf); String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); @@ -346,22 +346,23 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { case BROADCAST_EDGE: - UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig - .newBuilder(keyClass, valClass) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + UnorderedKVEdgeConfig et1Conf = + UnorderedKVEdgeConfig.newBuilder(keyClass, valClass).setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); return et1Conf.createDefaultBroadcastEdgeProperty(); + case CUSTOM_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + UnorderedPartitionedKVEdgeConfig et2Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); EdgeManagerPluginDescriptor edgeDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = @@ -371,31 +372,53 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration byte[] userPayload = dob.getData(); edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); + case CUSTOM_SIMPLE_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig - .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) - .setFromConfiguration(conf) - .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + UnorderedPartitionedKVEdgeConfig et3Conf = + UnorderedPartitionedKVEdgeConfig + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf) + .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) + .build(); return et3Conf.createDefaultEdgeProperty(); + case SIMPLE_EDGE: default: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig + OrderedPartitionedKVEdgeConfig.Builder builder = + OrderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) .setFromConfiguration(conf) .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), TezBytesComparator.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null); + + boolean memoryManagerEnabled = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER); + if (memoryManagerEnabled) { + builder = + builder.configureInput() + .setShuffleBufferFraction((float) edgeProp.getShuffleMemFraction()) + .setPostMergeBufferFraction(0).done().configureOutput() + .setSortBufferSize((int) edgeProp.getSortBufferMemRequiredMB()).done(); + } + + OrderedPartitionedKVEdgeConfig et4Conf = builder.build(); return et4Conf.createDefaultEdgeProperty(); } } + private void setUnsortedMemoryRequirementConfig(TezEdgeProperty edgeProp, Configuration conf) { + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, + edgeProp.getSortBufferMemRequiredMB()); + conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, + (float) edgeProp.getShuffleMemFraction()); + } + /** * Utility method to create a stripped down configuration for the MR partitioner. * diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index c357329..f0161c8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; +import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -288,6 +289,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont dummyStoreOp.getParentOperators().add(parentOp); mergeJoinOp.getParentOperators().remove(parentIndex); mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp); + dummyStoreOp.setConf(new DummyStoreDesc()); } } mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 757ff5e..7b19de6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -280,7 +280,8 @@ public static Object processReduceSinkToHashJoin(ReduceSinkOperator parentRS, Ma // disable auto parallelism for bucket map joins parentRS.getConf().setReducerTraits(EnumSet.of(FIXED)); } - TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets); + TezEdgeProperty edgeProp = + new TezEdgeProperty(null, edgeType, numBuckets, parentRS.getStatistics().getDataSize()); if (mapJoinWork != null) { for (BaseWork myWork: mapJoinWork) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java index 7149f5c..79b882b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java @@ -284,7 +284,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List> bucketColsList = new ArrayList>(); List> sortColsList = new ArrayList>(); byte pos = 0; - int numReduceSinks = 0; // will be set to the larger of the parents for (Operator parentOp : joinOp.getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { // can be mux operator diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java index eb8597d..59be27b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java @@ -5,7 +5,6 @@ import java.util.Comparator; import java.util.Iterator; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -15,11 +14,12 @@ import java.util.Stack; import java.util.TreeSet; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; @@ -38,12 +38,13 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; -import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; /** * MemoryDecider is a simple physical optimizer that adjusts the memory layout of tez tasks. @@ -54,17 +55,33 @@ public class MemoryDecider implements PhysicalPlanResolver { protected static transient final Log LOG = LogFactory.getLog(MemoryDecider.class); + private boolean forAll; public class MemoryCalculator implements Dispatcher { - private final long totalAvailableMemory; // how much to we have + private final long mapJoinsTotalAvailableMemory; // how much to we have private final long minimumHashTableSize; // minimum size of ht completely in memory private final double inflationFactor; // blowout factor datasize -> memory size private final PhysicalContext pctx; + private long totalAvailableMemory = 0; + private final long ONE_MB = 1024 * 1024L; + private final long TEN_MB = ONE_MB * 10; + private TezWork currWork = null; - public MemoryCalculator(PhysicalContext pctx) { + public MemoryCalculator(PhysicalContext pctx) throws SemanticException { this.pctx = pctx; - this.totalAvailableMemory = HiveConf.getLongVar(pctx.conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + this.mapJoinsTotalAvailableMemory = + HiveConf.getLongVar(pctx.conf, + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + this.totalAvailableMemory = DagUtils.getContainerResource(pctx.conf).getMemory() * ONE_MB; + LOG.info("Total available memory is " + totalAvailableMemory); + if (mapJoinsTotalAvailableMemory > totalAvailableMemory) { + throw new SemanticException( + "Invalid configuration could cause OutOfMemory issues at runtime. Configuration " + + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD + " = " + + mapJoinsTotalAvailableMemory + " is greater than configured container size: " + + totalAvailableMemory + ". Check your tez/yarn settings"); + } this.minimumHashTableSize = HiveConf.getIntVar(pctx.conf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS) * HiveConf.getIntVar(pctx.conf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE); this.inflationFactor = HiveConf.getFloatVar(pctx.conf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); @@ -80,6 +97,7 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) } if (currTask instanceof TezTask) { TezWork work = ((TezTask) currTask).getWork(); + currWork = work; for (BaseWork w : work.getAllWork()) { evaluateWork(w); } @@ -89,39 +107,100 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) private void evaluateWork(BaseWork w) throws SemanticException { + List inputEdges = getInputEdges(w); + List outputEdges = getOutputEdges(w); if (w instanceof MapWork) { - evaluateMapWork((MapWork) w); + evaluateMapWork((MapWork) w, inputEdges, outputEdges); } else if (w instanceof ReduceWork) { - evaluateReduceWork((ReduceWork) w); + evaluateReduceWork((ReduceWork) w, inputEdges, outputEdges); } else if (w instanceof MergeJoinWork) { - evaluateMergeWork((MergeJoinWork) w); + evaluateMergeWork((MergeJoinWork) w, inputEdges, outputEdges); + } else if (w instanceof UnionWork) { + evaluateUnionWork((UnionWork) w, inputEdges, outputEdges); } else { LOG.info("We are not going to evaluate this work type: " + w.getClass().getCanonicalName()); } } - private void evaluateMergeWork(MergeJoinWork w) throws SemanticException { + private void evaluateUnionWork(UnionWork w, List inputEdges, + List outputEdges) { + List realOutputEdges = new ArrayList(); + long totalInputSize = 0; + for (TezEdgeProperty edge : outputEdges) { + if (edge.getEdgeType() == TezEdgeProperty.EdgeType.CONTAINS) { + totalInputSize += edge.getDataFlowSize(); + } else { + realOutputEdges.add(edge); + } + } + + LOG.info("Real total input size for union " + w.getName() + " is " + totalInputSize); + for (TezEdgeProperty edge : realOutputEdges) { + edge.setDataFlowSize(totalInputSize); + } + return; + } + + private void evaluateMergeWork(MergeJoinWork w, List inputEdges, + List outputEdges) throws SemanticException { + evaluateOperators(w.getMainWork(), pctx, inputEdges, outputEdges); for (BaseWork baseWork : w.getBaseWorkList()) { - evaluateOperators(baseWork, pctx); + evaluateOperators(baseWork, pctx, inputEdges, outputEdges); } } - private void evaluateReduceWork(ReduceWork w) throws SemanticException { - evaluateOperators(w, pctx); + private List getOutputEdges(BaseWork w) { + List outputEdges = new ArrayList(); + + for (BaseWork b : currWork.getChildren(w)) { + outputEdges.add(currWork.getEdgeProperty(w, b)); + } + + return outputEdges; + } + + private List getInputEdges(BaseWork w) { + List inputEdges = new ArrayList(); + for (BaseWork b : currWork.getParents(w)) { + inputEdges.add(currWork.getEdgeProperty(b, w)); + } + + return inputEdges; + } + + private void evaluateReduceWork(ReduceWork w, List inputEdges, + List outputEdges) throws SemanticException { + evaluateOperators(w, pctx, inputEdges, outputEdges); } - private void evaluateMapWork(MapWork w) throws SemanticException { - evaluateOperators(w, pctx); + private void evaluateMapWork(MapWork w, List inputEdges, + List outputEdges) throws SemanticException { + evaluateOperators(w, pctx, inputEdges, outputEdges); } - private void evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException { + private void evaluateOperators(BaseWork w, PhysicalContext pctx, + List inputEdges, List outputEdges) + throws SemanticException { // lets take a look at the operator memory requirements. Dispatcher disp = null; final Set mapJoins = new LinkedHashSet(); Map rules = new HashMap(); - rules.put(new RuleRegExp("Map join memory estimator", - MapJoinOperator.getOperatorName() + "%"), new NodeProcessor() { + if (forAll) { + NodeProcessor defaultRule = new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) { + if (nd instanceof MapJoinOperator) { + mapJoins.add((MapJoinOperator) nd); + } + return null; + } + }; + disp = new DefaultRuleDispatcher(defaultRule, new HashMap(), null); + } else { + rules.put(new RuleRegExp("Map join memory estimator", MapJoinOperator.getOperatorName() + + "%"), new NodeProcessor() { @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) { @@ -129,7 +208,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } }); - disp = new DefaultRuleDispatcher(null, rules, null); + disp = new DefaultRuleDispatcher(null, rules, null); + } GraphWalker ogw = new DefaultGraphWalker(disp); @@ -138,11 +218,156 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LinkedHashMap nodeOutput = new LinkedHashMap(); ogw.startWalking(topNodes, nodeOutput); + boolean mapJoinsExist = evaluateMapJoin(mapJoins); + if (forAll) { + if (mapJoinsExist) { + // FIXME Exception or can we do something better? + totalAvailableMemory = totalAvailableMemory - mapJoinsTotalAvailableMemory; + if (totalAvailableMemory < 0) { + throw new SemanticException("Memory shortage of " + -(totalAvailableMemory) + + ". Please modify the container size to be greater than " + + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + } + } - if (mapJoins.size() == 0) { + long parallelism = 0; + if (w instanceof MapWork) { + MapWork mapWork = (MapWork) w; + long totalSize = 0; + for (Operator op : mapWork.getAllRootOperators()) { + totalSize += op.getStatistics().getDataSize(); + } + parallelism = totalSize / (pctx.conf.getLongVar(HiveConf.ConfVars.MAPREDMAXSPLITSIZE)); + } else if (w instanceof ReduceWork) { + parallelism = ((ReduceWork) w).getNumReduceTasks(); + } + parallelism = (parallelism == 0 ? 1 : parallelism); + evaluateAllOperators(nodeOutput.keySet(), inputEdges, outputEdges, parallelism); + } + } + + private boolean isEdgeTypeShuffle(TezEdgeProperty edge) { + return (edge.getEdgeType() == TezEdgeProperty.EdgeType.SIMPLE_EDGE); + } + + private long computeMemoryRequirementForEdges(List edges, long parallelism) { + long retSize = 0; + for (TezEdgeProperty edge : edges) { + if (isEdgeTypeShuffle(edge)) { + if (edge.getDataFlowSize() > TEN_MB) { + long size = edge.getDataFlowSize() / parallelism; + size = (size > TEN_MB) ? TEN_MB : size; + edge.setDataFlowSize(size); + retSize += size; + } else { + edge.setDataFlowSize(TEN_MB); + retSize += TEN_MB; + } + } else { + retSize += TEN_MB; // leave 10 MB minimum for the edges. + } + } + + return retSize; + } + + /* + * This method is used to divvy up the memory of a container among the operators and the sort + * buffers (for input and output). + * + * The rules are as follows: + * + * 1. Every operator in the pipeline gets at least 1 MB of memory by default. + * 2. The remaining memory is broken down according to size of the sort buffers required. + */ + private void evaluateAllOperators(Set keySet, List inputEdges, + List outputEdges, long parallelism) { + + long inputSize = computeMemoryRequirementForEdges(inputEdges, parallelism); + long outputSize = computeMemoryRequirementForEdges(outputEdges, parallelism); + long remainingMemory = 0; + long inputOutputAvailableMem = 0; + long totalRequiredInputOutputMem = (inputSize + outputSize); + if (totalRequiredInputOutputMem > (totalAvailableMemory / 2)) { + inputOutputAvailableMem = totalAvailableMemory / 2; + remainingMemory = totalAvailableMemory / 2; + } else { + inputOutputAvailableMem = totalRequiredInputOutputMem; + remainingMemory = totalAvailableMemory - totalRequiredInputOutputMem; + } + LOG.info("Total memory required by input/output: " + totalRequiredInputOutputMem); + + remainingMemory -= keySet.size() * ONE_MB; // leave 1 MB memory for each operator at least + if (remainingMemory < 0) { + // FIXME + // cannot do anymore consistent allocations. Pass through. Hope no OOM/throw exception return; } + for (Node nd : keySet) { + if (nd instanceof MapJoinOperator) { + // already accounted for. Nothing to do. + continue; + } + Operator op = (Operator) nd; + op.getConf().setMemoryNeeded(ONE_MB); + if (nd instanceof GroupByOperator) { + GroupByOperator gbyOp = (GroupByOperator) nd; + if (gbyOp.getConf().getMode() != GroupByDesc.Mode.HASH) { + continue; + } + // get all parent memory sizes + long parentDataSize = 0; + for (Operator parentOp : gbyOp.getParentOperators()) { + parentDataSize += (parentOp.getStatistics().getDataSize() / parallelism); + } + if (parentDataSize > remainingMemory) { + // We could just give all of the remaining memory to this operator because we don't + // expect any more memory-hogging operators following a group by. + // The RS that could follow this operator is accounted for the by input/output + // computations. + gbyOp.getConf().setMemoryNeeded(remainingMemory + ONE_MB); + remainingMemory = 0; + } else { + gbyOp.getConf().setMemoryNeeded(parentDataSize + ONE_MB); + } + + gbyOp.getConf().setMemoryThreshold( + gbyOp.getConf().getMemoryNeeded() / (float) totalAvailableMemory); + } + } + inputOutputAvailableMem += remainingMemory; + LOG.info("After accounting for map joins, available memory is: " + inputOutputAvailableMem); + computeMemoryForInputOutput(inputEdges, outputEdges, + totalRequiredInputOutputMem, inputOutputAvailableMem); + } + + private void computeMemoryForInputOutput(List inputEdges, + List outputEdges, long required, long available) { + + for (TezEdgeProperty edge : inputEdges) { + if (isEdgeTypeShuffle(edge)) { + float fraction = + (edge.getDataFlowSize() * available) / (float) (required * totalAvailableMemory); + LOG.info("Setting shuffle memory fraction for edge: " + fraction); + edge.setShuffleMemFraction(fraction); + } + } + + for (TezEdgeProperty edge : outputEdges) { + if (isEdgeTypeShuffle(edge)) { + long memNeeded = (edge.getDataFlowSize() * available) / (required); + LOG.info("Setting sort buffer memory needed: " + memNeeded); + edge.setSortBufferMemRequired(memNeeded); + } + } + } + + private boolean evaluateMapJoin(Set mapJoins) { + if (mapJoins.size() == 0) { + return false; + } + try { long total = 0; final Map sizes = new HashMap(); @@ -157,6 +382,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } Comparator comp = new Comparator() { + @Override public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { if (mj1 == null || mj2 == null) { throw new NullPointerException(); @@ -173,7 +399,7 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { SortedSet sortedMapJoins = new TreeSet(comp); sortedMapJoins.addAll(mapJoins); - long remainingSize = totalAvailableMemory / 2; + long remainingSize = mapJoinsTotalAvailableMemory / 2; Iterator it = sortedMapJoins.iterator(); @@ -204,15 +430,16 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { sortedMapJoins.addAll(mapJoins); totalLargeJoins = total; - if (totalLargeJoins > totalAvailableMemory) { + if (totalLargeJoins > mapJoinsTotalAvailableMemory) { // this shouldn't happen throw new HiveException(); } - remainingSize = totalAvailableMemory / 2; + remainingSize = mapJoinsTotalAvailableMemory / 2; } // we used half the mem for small joins, now let's scale the rest - double weight = (remainingSize + totalAvailableMemory / 2) / (double) totalLargeJoins; + double weight = + (remainingSize + mapJoinsTotalAvailableMemory / 2) / (double) totalLargeJoins; for (MapJoinOperator mj : sortedMapJoins) { long size = (long)(weight * sizes.get(mj)); @@ -224,7 +451,7 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { } } catch (HiveException e) { // if we have issues with stats, just scale linearily - long size = totalAvailableMemory / mapJoins.size(); + long size = mapJoinsTotalAvailableMemory / mapJoins.size(); if (LOG.isInfoEnabled()) { LOG.info("Scaling mapjoin memory w/o stats"); } @@ -236,6 +463,7 @@ public int compare(MapJoinOperator mj1, MapJoinOperator mj2) { mj.getConf().setMemoryNeeded(size); } } + return true; } private long computeSizeToFitInMem(MapJoinOperator mj) throws HiveException { @@ -270,7 +498,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { - pctx.getConf(); + HiveConf conf = pctx.getConf(); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) { + forAll = true; + } else { + forAll = false; + } // create dispatcher and graph walker Dispatcher disp = new MemoryCalculator(pctx); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index a9d1f8e..116cd6f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -135,9 +135,10 @@ public static ReduceWork createReduceWork( if (reduceWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, edgeType, true, - reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer); + reduceWork.getMinReduceTasks(), reduceWork.getMaxReduceTasks(), bytesPerReducer, + reduceSink.getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(edgeType); + edgeProp = new TezEdgeProperty(edgeType, reduceSink.getStatistics().getDataSize()); } tezWork.connect( diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index c4e0413..32fb253 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.lib.Node; @@ -129,6 +130,7 @@ public Object process(Node nd, Stack stack, } else { work = GenTezUtils.createReduceWork(context, root, tezWork); } + work.setMemoryNeeded(operator.getStatistics().getDataSize()); context.rootToWorkMap.put(root, work); } @@ -337,7 +339,7 @@ public Object process(Node nd, Stack stack, unionWork = context.rootUnionWorkMap.get(root); if (unionWork == null) { // if unionWork is null, it means it is the first time. we need to - // create a union work object and add this work to it. Subsequent + // create a union work object and add this work to it. Subsequent // work should reference the union and not the actual work. unionWork = GenTezUtils.createUnionWork(context, root, operator, tezWork); // finally connect the union work with work @@ -452,9 +454,10 @@ public Object process(Node nd, Stack stack, if (rWork.isAutoReduceParallelism()) { edgeProp = new TezEdgeProperty(context.conf, edgeType, true, - rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer); + rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer, + rs.getStatistics().getDataSize()); } else { - edgeProp = new TezEdgeProperty(edgeType); + edgeProp = new TezEdgeProperty(edgeType, rs.getStatistics().getDataSize()); } tezWork.connect(work, followingWork, edgeProp); context.connectedReduceSinks.add(rs); @@ -495,11 +498,11 @@ private int getFollowingWorkIndex(TezWork tezWork, UnionWork unionWork, ReduceSi int pos = stack.indexOf(currentMergeJoinOperator); return (Operator) stack.get(pos - 1); } - + private void connectUnionWorkWithWork(UnionWork unionWork, BaseWork work, TezWork tezWork, GenTezProcContext context) { LOG.debug("Connecting union work (" + unionWork + ") with work (" + work + ")"); - TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS); + TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS, work.getMemoryNeeded()); tezWork.connect(unionWork, work, edgeProp); unionWork.addUnionOperators(context.currentUnionOperators); context.workWithUnionOperators.add(work); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 9503fa8..31325b6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -478,8 +478,9 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping stage id rearranger"); } - if ((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER)) - && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { + if (((conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER_FOR_HASH_JOIN)) + && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) + || (conf.getBoolVar(HiveConf.ConfVars.HIVE_TEZ_ENABLE_MEMORY_MANAGER))) { physicalCtx = new MemoryDecider().resolve(physicalCtx); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index 463da5d..368240e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -30,7 +30,7 @@ protected Statistics statistics; protected transient OpTraits opTraits; protected transient Map opProps; - protected long memNeeded = 0; + protected long memNeeded = -1; static { PTFUtils.makeTransient(AbstractOperatorDesc.class, "opProps"); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java index f09138d..4a98c66 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -86,14 +86,14 @@ public void addMergedWork(BaseWork work, BaseWork connectWork, * output name in the reduce sink needs to be setup appropriately. In the case of reduce * side merge work, we need to ensure that the parent work that provides data to this merge * work is setup to point to the right vertex name - the main work name. - * + * * In this case, if the big table work has already been created, we can hook up the merge * work items for the small table correctly. */ setReduceSinkOutputName(connectWork, leafOperatorToFollowingWork, bigTableWork.getName()); } } - + setMemoryNeeded(work.getMemoryNeeded() + getMemoryNeeded()); if (work != null) { /* * Same reason as above. This is the case when we have the main work item after the merge work diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index a3aa12f..05b18cc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -30,25 +30,28 @@ CUSTOM_SIMPLE_EDGE, } - private HiveConf hiveConf; - private EdgeType edgeType; - private int numBuckets; + private final HiveConf hiveConf; + private final EdgeType edgeType; + private final int numBuckets; private boolean isAutoReduce; private int minReducer; private int maxReducer; private long inputSizePerReducer; + private long dataFlowSize; + private float shuffleMemFraction; + private long sortBufferMemRequired; - public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, - int buckets) { + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets, long dataFlowSize) { this.hiveConf = hiveConf; this.edgeType = edgeType; this.numBuckets = buckets; + this.dataFlowSize = dataFlowSize; } public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduce, - int minReducer, int maxReducer, long bytesPerReducer) { - this(hiveConf, edgeType, -1); + int minReducer, int maxReducer, long bytesPerReducer, long dataFlowSize) { + this(hiveConf, edgeType, -1, dataFlowSize); this.minReducer = minReducer; this.maxReducer = maxReducer; this.isAutoReduce = isAutoReduce; @@ -56,7 +59,11 @@ public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, boolean isAutoReduc } public TezEdgeProperty(EdgeType edgeType) { - this(null, edgeType, -1); + this(edgeType, 0); + } + + public TezEdgeProperty(EdgeType edgeType, long dataFlowSize) { + this(null, edgeType, -1, dataFlowSize); } public EdgeType getEdgeType() { @@ -86,4 +93,30 @@ public int getMaxReducer() { public long getInputSizePerReducer() { return inputSizePerReducer; } + + public long getDataFlowSize() { + return dataFlowSize; + } + + public void setDataFlowSize(long dataFlowSize) { + this.dataFlowSize = dataFlowSize; + } + + public double getShuffleMemFraction() { + return shuffleMemFraction; + } + + public void setShuffleMemFraction(float shuffleMemFraction) { + this.shuffleMemFraction = shuffleMemFraction; + } + + public long getSortBufferMemRequiredMB() { + long mbSize = sortBufferMemRequired / (1024 * 1024); + mbSize = (mbSize > 0) ? mbSize : 1; + return mbSize; + } + + public void setSortBufferMemRequired(long sortBufferMemRequired) { + this.sortBufferMemRequired = sortBufferMemRequired; + } } diff --git ql/src/test/queries/clientpositive/tez_join_hash.q ql/src/test/queries/clientpositive/tez_join_hash.q index 09a1d8b..fc06267 100644 --- ql/src/test/queries/clientpositive/tez_join_hash.q +++ ql/src/test/queries/clientpositive/tez_join_hash.q @@ -5,6 +5,7 @@ create table orc_src (key string, value string) STORED AS ORC; insert into table orc_src select * from src; set hive.execution.engine=tez; +set hive.tez.enable.memory.manager=true; set hive.vectorized.execution.enabled=true; set hive.auto.convert.join.noconditionaltask.size=1; set hive.exec.reducers.bytes.per.reducer=20000; diff --git ql/src/test/queries/clientpositive/tez_join_result_complex.q ql/src/test/queries/clientpositive/tez_join_result_complex.q index 5bc9151..4c1f3f8 100644 --- ql/src/test/queries/clientpositive/tez_join_result_complex.q +++ ql/src/test/queries/clientpositive/tez_join_result_complex.q @@ -1,6 +1,6 @@ SET hive.auto.convert.join=true; SET hive.auto.convert.join.noconditionaltask=true; -SET hive.auto.convert.join.noconditionaltask.size=1000000000; +SET hive.auto.convert.join.noconditionaltask.size=100000000; set hive.mapjoin.optimized.hashtable=true; create table service_request_clean( @@ -125,15 +125,3 @@ on a.contact_event_id = b.cnctevn_id; select * from ct_events1_test; - - - - - - - - - - - - diff --git ql/src/test/queries/clientpositive/vectorized_nested_mapjoin.q ql/src/test/queries/clientpositive/vectorized_nested_mapjoin.q index 849af18..cb93029 100644 --- ql/src/test/queries/clientpositive/vectorized_nested_mapjoin.q +++ ql/src/test/queries/clientpositive/vectorized_nested_mapjoin.q @@ -2,7 +2,7 @@ set hive.explain.user=false; SET hive.vectorized.execution.enabled=true; SET hive.auto.convert.join=true; SET hive.auto.convert.join.noconditionaltask=true; -SET hive.auto.convert.join.noconditionaltask.size=1000000000; +SET hive.auto.convert.join.noconditionaltask.size=100000000; -- SORT_QUERY_RESULTS diff --git ql/src/test/results/clientpositive/tez/auto_sortmerge_join_5.q.out ql/src/test/results/clientpositive/tez/auto_sortmerge_join_5.q.out index 4d85837..9072b8c 100644 --- ql/src/test/results/clientpositive/tez/auto_sortmerge_join_5.q.out +++ ql/src/test/results/clientpositive/tez/auto_sortmerge_join_5.q.out @@ -129,6 +129,7 @@ STAGE PLANS: isSamplingPred: false predicate: key is not null (type: boolean) Statistics: Num rows: 1 Data size: 113 Basic stats: COMPLETE Column stats: NONE + Dummy Store Path -> Alias: #### A masked pattern was here #### Path -> Partition: @@ -357,6 +358,7 @@ STAGE PLANS: isSamplingPred: false predicate: key is not null (type: boolean) Statistics: Num rows: 1 Data size: 113 Basic stats: COMPLETE Column stats: NONE + Dummy Store Path -> Alias: #### A masked pattern was here #### Path -> Partition: diff --git ql/src/test/results/clientpositive/tez/explainuser_2.q.out ql/src/test/results/clientpositive/tez/explainuser_2.q.out index 57fcc3c..8cc6c72 100644 --- ql/src/test/results/clientpositive/tez/explainuser_2.q.out +++ ql/src/test/results/clientpositive/tez/explainuser_2.q.out @@ -2607,12 +2607,13 @@ Stage-0 | outputColumnNames:["_col0","_col1"] | Statistics:Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE | - |<-Filter Operator [FIL_12] - | predicate:key is not null (type: boolean) - | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE - | TableScan [TS_1] - | alias:s3 - | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + |<-Dummy Store [OP_14] + | Filter Operator [FIL_12] + | predicate:key is not null (type: boolean) + | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + | TableScan [TS_1] + | alias:s3 + | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE |<-Filter Operator [FIL_11] predicate:key is not null (type: boolean) Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE @@ -2658,12 +2659,13 @@ Stage-0 | | outputColumnNames:["_col0","_col1"] | | Statistics:Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE | | - | |<-Filter Operator [FIL_21] - | | predicate:key is not null (type: boolean) - | | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE - | | TableScan [TS_1] - | | alias:s3 - | | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + | |<-Dummy Store [OP_24] + | | Filter Operator [FIL_21] + | | predicate:key is not null (type: boolean) + | | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + | | TableScan [TS_1] + | | alias:s3 + | | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE | |<-Filter Operator [FIL_20] | predicate:(key is not null and value is not null) (type: boolean) | Statistics:Num rows: 61 Data size: 646 Basic stats: COMPLETE Column stats: NONE @@ -2706,12 +2708,13 @@ Stage-0 | outputColumnNames:["_col0","_col1"] | Statistics:Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE | - |<-Filter Operator [FIL_12] - | predicate:key is not null (type: boolean) - | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE - | TableScan [TS_1] - | alias:s3 - | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + |<-Dummy Store [OP_14] + | Filter Operator [FIL_12] + | predicate:key is not null (type: boolean) + | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + | TableScan [TS_1] + | alias:s3 + | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE |<-Filter Operator [FIL_11] predicate:key is not null (type: boolean) Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE @@ -2757,12 +2760,13 @@ Stage-0 | | outputColumnNames:["_col0","_col1"] | | Statistics:Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE | | - | |<-Filter Operator [FIL_21] - | | predicate:key is not null (type: boolean) - | | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE - | | TableScan [TS_1] - | | alias:s3 - | | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + | |<-Dummy Store [OP_24] + | | Filter Operator [FIL_21] + | | predicate:key is not null (type: boolean) + | | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + | | TableScan [TS_1] + | | alias:s3 + | | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE | |<-Filter Operator [FIL_20] | predicate:(key is not null and value is not null) (type: boolean) | Statistics:Num rows: 61 Data size: 646 Basic stats: COMPLETE Column stats: NONE @@ -2853,12 +2857,13 @@ Stage-0 | | outputColumnNames:["_col0"] | | Statistics:Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE | | - | |<-Filter Operator [FIL_30] - | | predicate:key is not null (type: boolean) - | | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE - | | TableScan [TS_1] - | | alias:s3 - | | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + | |<-Dummy Store [OP_34] + | | Filter Operator [FIL_30] + | | predicate:key is not null (type: boolean) + | | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + | | TableScan [TS_1] + | | alias:s3 + | | Statistics:Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE | |<-Filter Operator [FIL_29] | predicate:key is not null (type: boolean) | Statistics:Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE diff --git ql/src/test/results/clientpositive/tez/tez_join.q.out ql/src/test/results/clientpositive/tez/tez_join.q.out index 7b22996..d1cde57 100644 --- ql/src/test/results/clientpositive/tez/tez_join.q.out +++ ql/src/test/results/clientpositive/tez/tez_join.q.out @@ -89,6 +89,7 @@ STAGE PLANS: expressions: KEY.reducesinkkey0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Dummy Store Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string) diff --git ql/src/test/results/clientpositive/tez/tez_smb_1.q.out ql/src/test/results/clientpositive/tez/tez_smb_1.q.out index e60d5af..222bdf2 100644 --- ql/src/test/results/clientpositive/tez/tez_smb_1.q.out +++ ql/src/test/results/clientpositive/tez/tez_smb_1.q.out @@ -133,6 +133,7 @@ STAGE PLANS: Filter Operator predicate: key is not null (type: boolean) Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Dummy Store Map Operator Tree: TableScan alias: s1 @@ -523,6 +524,7 @@ STAGE PLANS: expressions: KEY.reducesinkkey0 (type: int) outputColumnNames: _col0 Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Dummy Store Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: int) diff --git ql/src/test/results/clientpositive/tez/tez_smb_main.q.out ql/src/test/results/clientpositive/tez/tez_smb_main.q.out index 52e1750..d0713ba 100644 --- ql/src/test/results/clientpositive/tez/tez_smb_main.q.out +++ ql/src/test/results/clientpositive/tez/tez_smb_main.q.out @@ -775,6 +775,7 @@ STAGE PLANS: Filter Operator predicate: key is not null (type: boolean) Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Dummy Store Map Operator Tree: TableScan alias: s1