diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 25dd970a9b..81cb1b70c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -82,6 +82,7 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -468,6 +469,18 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx, checkOutputSpec(w, wxConf); Vertex wx = utils.createVertex(wxConf, w, scratchDir, fs, ctx, !isFinal, work, work.getVertexType(w), vertexResources); + if (work.getChildren(w).size() > 1) { + String value = wx.getConf().get(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB); + double doubleValue = 0; + if(value == null) { + doubleValue = TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT; + } else { + doubleValue = Double.valueOf(value); + } + Double newValue = doubleValue / work.getChildren(w).size(); + wx.setConf(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, Double.toString(newValue)); + LOG.info("Modified " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " to " + newValue); + } if (w.getReservedMemoryMB() > 0) { // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf);