diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index 0053997..2f2f04f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -173,19 +173,22 @@ private void adjustAutoParallelism(BaseWork work) { return; // Not based on ARP and cannot assume uniform distribution, bail. } clusterState.initClusterInfo(); - int targetCount = 0; + final int targetCount; + final int executorCount; + final int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); if (!clusterState.hasClusterInfo()) { LOG.warn("Cannot determine LLAP cluster information"); - targetCount = (int)Math.ceil(minReducersPerExec * 1 * executorsPerNode); + executorCount = executorsPerNode; // assume 1 node } else { - targetCount = (int)Math.ceil(minReducersPerExec * (clusterState.getKnownExecutorCount() - + clusterState.getNodeCountWithUnknownExecutors() * executorsPerNode)); + executorCount = + clusterState.getKnownExecutorCount() + executorsPerNode + * clusterState.getNodeCountWithUnknownExecutors(); } - // We only increase the targets here. + targetCount = Math.min(maxReducers, (int) Math.ceil(minReducersPerExec * executorCount)); + // We only increase the targets here, but we stay below maxReducers if (reduceWork.isAutoReduceParallelism()) { // Do not exceed the configured max reducers. - int newMin = Math.min(conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS), - Math.max(reduceWork.getMinReduceTasks(), targetCount)); + int newMin = Math.min(maxReducers, Math.max(reduceWork.getMinReduceTasks(), targetCount)); if (newMin < reduceWork.getMaxReduceTasks()) { reduceWork.setMinReduceTasks(newMin); reduceWork.getEdgePropRef().setAutoReduce(conf, true, newMin,