diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index f1a631a621..63f05e808e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -84,6 +84,7 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -474,6 +475,18 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx, checkOutputSpec(w, wxConf); Vertex wx = utils.createVertex(wxConf, w, scratchDir, fs, ctx, !isFinal, work, work.getVertexType(w), vertexResources); + if (work.getChildren(w).size() > 1) { + String value = wxConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB); + int originalValue = 0; + if(value == null) { + originalValue = TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT; + } else { + originalValue = Integer.valueOf(value); + } + int newValue = (int) (originalValue / work.getChildren(w).size()); + wxConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, Integer.toString(newValue)); + LOG.info("Modified " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " to " + newValue); + } if (w.getReservedMemoryMB() > 0) { // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf);