diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ae44b083be..0dea0996c9 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3969,6 +3969,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal -1f, "The customized fraction of JVM memory which Tez will reserve for the processor"), TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED("hive.tez.cartesian-product.enabled", false, "Use Tez cartesian product edge to speed up cross product"), + TEZ_SIMPLE_CUSTOM_EDGE_TINY_BUFFER_SIZE_MB("hive.tez.unordered.output.buffer.size.mb", -1, + "When we have an operation that does not need a large buffer, we use this buffer size for simple custom edge.\n" + + "Value is an integer. Default value is -1, which means that we will estimate this value from operators in the plan."), // The default is different on the client and server, so it's null here. LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."), LLAP_IO_ROW_WRAPPER_ENABLED("hive.llap.io.row.wrapper.enabled", true, "Whether the LLAP IO row wrapper is enabled for non-vectorized queries."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index d9340d0371..1d38f99854 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -511,13 +511,17 @@ private EdgeProperty createEdgeProperty(Vertex w, TezEdgeProperty edgeProp, case CUSTOM_SIMPLE_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig + UnorderedPartitionedKVEdgeConfig.Builder et3Conf = UnorderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) .setFromConfiguration(conf) .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null) - .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) - .build(); - return et3Conf.createDefaultEdgeProperty(); + .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null); + if (edgeProp.getBufferSize() != null) { + et3Conf.setAdditionalConfiguration( + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, + edgeProp.getBufferSize().toString()); + } + return et3Conf.build().createDefaultEdgeProperty(); case ONE_TO_ONE_EDGE: UnorderedKVEdgeConfig et4Conf = UnorderedKVEdgeConfig .newBuilder(keyClass, valClass) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 94879c9529..564fdca111 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -83,6 +84,7 @@ public static ReduceWork createReduceWork( context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR); float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR); long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); + int defaultTinyBufferSize = context.conf.getIntVar(HiveConf.ConfVars.TEZ_SIMPLE_CUSTOM_EDGE_TINY_BUFFER_SIZE_MB); ReduceWork reduceWork = new ReduceWork(Utilities.REDUCENAME + context.nextSequenceNumber()); LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root); @@ -142,6 +144,7 @@ public static ReduceWork createReduceWork( edgeProp = new TezEdgeProperty(edgeType); edgeProp.setSlowStart(reduceWork.isSlowStart()); } + edgeProp.setBufferSize(obtainBufferSize(root, reduceSink, defaultTinyBufferSize)); reduceWork.setEdgePropRef(edgeProp); tezWork.connect( @@ -850,4 +853,23 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, egw.startWalking(startNodes, outputMap); return outputMap; } + + private static Integer obtainBufferSize(Operator op, ReduceSinkOperator rsOp, int defaultTinyBufferSize) { + if (op instanceof GroupByOperator) { + GroupByOperator groupByOperator = (GroupByOperator) op; + if (groupByOperator.getConf().getKeys().isEmpty() && + groupByOperator.getConf().getMode() == GroupByDesc.Mode.MERGEPARTIAL) { + // Check configuration and value is -1, infer value + int result = defaultTinyBufferSize == -1 ? + (int) Math.ceil((double) groupByOperator.getStatistics().getDataSize() / 1E6) : + defaultTinyBufferSize; + if (LOG.isDebugEnabled()) { + LOG.debug("Buffer size for output from operator {} can be set to {}Mb", rsOp, result); + } + return result; + } + } + return null; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index 0abacb3cb8..e6e82613bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -41,6 +41,7 @@ private int minReducer; private int maxReducer; private long inputSizePerReducer; + private Integer bufferSize; public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets) { @@ -105,6 +106,14 @@ public void setSlowStart(boolean slowStart) { this.isSlowStart = slowStart; } + public void setBufferSize(Integer bufferSize) { + this.bufferSize = bufferSize; + } + + public Integer getBufferSize() { + return bufferSize; + } + public void setEdgeType(EdgeType type) { this.edgeType = type; }