diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 908478528d..26bf00916d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2889,6 +2889,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether to send the query plan via local resource or RPC"), HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true, "Whether to generate the splits locally or in the AM (tez only)"), + HIVE_AM_SPLIT_GENERATION_HEADROOM_CALCULATOR("hive.split.computation.headroom-calculator", + "org.apache.hadoop.hive.ql.exec.tez.GreedyHeadroomCalculator", + "Class to be used to calculate queue-headroom, when computing splits."), + HIVE_AM_SCALED_HEADROOM_CALCULATOR_DISTRIBUTION("hive.split.computation.headroom-calculator.distribution.gb", + "600,15000", + "Comma-separated ranges of total-queue-capacity (in GB of memory) " + + "over which available splits are to be scaled linearly."), + HIVE_AM_AUTO_TUNE_ENABLED("hive.am.auto.tune.enabled", false, + "Whether (or not) to auto-tune Tez AM settings (like task concurrency, heap-size, etc.)."), + HIVE_AM_AUTO_TUNE_NUM_CONCURRENT_TASKS_PER_GB("hive.am.auto.tune.num.concurrent.tasks.per.gb", 10000, + "When auto-tune is enabled, the number of concurrent tasks per 1GB of heap-memory available to the Tez AM."), HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true, "Whether to generate consistent split locations when generating splits in the AM"), HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez/Spark (Hadoop 2 only)"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GreedyHeadroomCalculator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GreedyHeadroomCalculator.java new file mode 100644 index 0000000000..83fef5ae8a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GreedyHeadroomCalculator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tez.runtime.api.InputInitializerContext; + +class GreedyHeadroomCalculator extends QueueHeadroomCalculator { + + private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class); + + @Override + public int getAvailableSlots(InputInitializerContext context) { + int totalResource = context.getTotalAvailableResource().getMemory(); + int taskResource = context.getVertexTaskResource().getMemory(); + int availableSlots = totalResource / taskResource; + + String logMessage = "GreedyHeadroomCalculator::getAvailableSlots(): " + + "totalResources == " + totalResource + + "taskResource == " + taskResource + + "availableSlots == " + availableSlots; + + LOG.info(logMessage); + + return availableSlots; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 5dd5e80214..7f8e83baf0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -160,8 +160,6 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE (InputFormat) ReflectionUtils.newInstance(JavaUtils.loadClass(realInputFormatName), jobConf); - int totalResource = 0; - int taskResource = 0; int availableSlots = 0; // FIXME. Do the right thing Luke. if (getContext() == null) { @@ -170,9 +168,16 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE } if (getContext() != null) { - totalResource = getContext().getTotalAvailableResource().getMemory(); - taskResource = getContext().getVertexTaskResource().getMemory(); - availableSlots = totalResource / taskResource; + QueueHeadroomCalculator headroomCalculator = + (QueueHeadroomCalculator) + ReflectionUtils.newInstance( + JavaUtils.loadClass( + HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION_HEADROOM_CALCULATOR) + ), + jobConf + ); + + availableSlots = headroomCalculator.getAvailableSlots(getContext()); } if (HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, 1) <= 1) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueueHeadroomCalculator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueueHeadroomCalculator.java new file mode 100644 index 0000000000..b7f5b705b9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueueHeadroomCalculator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.runtime.api.InputInitializerContext; + +abstract class QueueHeadroomCalculator implements Configurable { + + private Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + public abstract int getAvailableSlots(InputInitializerContext context); + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ScaledHeadroomCalculator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ScaledHeadroomCalculator.java new file mode 100644 index 0000000000..3110106ad2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ScaledHeadroomCalculator.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.tez.runtime.api.InputInitializerContext; + +import java.util.Collections; +import java.util.List; + +class ScaledHeadroomCalculator extends QueueHeadroomCalculator { + + private static final Log LOG = LogFactory.getLog(ScaledHeadroomCalculator.class); + + private List thresholds = Lists.newArrayList(); + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + thresholds = getThresholds(HiveConf.getVar(conf, + HiveConf.ConfVars.HIVE_AM_SCALED_HEADROOM_CALCULATOR_DISTRIBUTION)); + } + + private List getThresholds(String distributionString) { + try { + List thresholdStrings = Lists.newArrayList(distributionString.split(",")); + List thresholds = Lists.newArrayList(Iterables.transform( + thresholdStrings, new Function() { + @Override + public Double apply(String input) { + return Double.parseDouble(input); + } + } + )); + Collections.sort(thresholds); + return thresholds; + } + catch (Exception exception) { + LOG.error("Could not extract thresholds from string: " + distributionString + + ". Switching to defaults: " + + HiveConf.ConfVars.HIVE_AM_SCALED_HEADROOM_CALCULATOR_DISTRIBUTION.getDefaultValue(), exception) ; + return getThresholds(HiveConf.ConfVars.HIVE_AM_SCALED_HEADROOM_CALCULATOR_DISTRIBUTION.getDefaultValue()); + } + + } + + @Override + public int getAvailableSlots(InputInitializerContext context) { + + int totalResourceMB = context.getTotalAvailableResource().getMemory(); + int i = 0; + while (i additionalFiles, setupSessionAcls(tezConfig, conf); + tuneTezSessionSettings(tezConfig); + final TezClient session = TezClient.newBuilder("HIVE-" + sessionId, tezConfig) .setIsSession(true).setLocalResources(commonLocalResources) .setCredentials(llapCredentials).setServicePluginDescriptor(servicePluginsDescriptor) @@ -436,6 +438,51 @@ public void endOpen() throws InterruptedException, CancellationException { } } + private static TezConfiguration tuneTezSessionSettings(TezConfiguration conf) { + + if (conf.getBoolean(ConfVars.HIVE_AM_AUTO_TUNE_ENABLED.varname, + ConfVars.HIVE_AM_AUTO_TUNE_ENABLED.defaultBoolVal)) { + + LOG.info("Attempting to auto-tune AM settings."); + + int amContainerSizeMB = conf.getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 1536); + + if (amContainerSizeMB <= 1536) { + LOG.warn(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + "(" + amContainerSizeMB + ") " + + "must exceed 1535 for auto-tuning. Disabling auto-tuning."); + return conf; + } + + // Set AM heap-size to `max(512, 0.2*amContainerSizeMB)` less than the chosen AM container size. + int amMaxHeapSizeMB = amContainerSizeMB - Math.max(512, Math.round(0.2f*amContainerSizeMB)); + String amCmdOpts = conf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT); + amCmdOpts += " -Xmx" + amMaxHeapSizeMB + "m"; + LOG.info("Setting " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " to " + amCmdOpts + "."); + conf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, amCmdOpts); + + // Set concurrency based on available memory. + // If N = nConcurrentTasksPerGB, + // 1. Assume N/2 parallel tasks for the first 1 GB heap, + // 2. N more parallel tasks for every additional 1 GB. + // i.e. + // totalConcurrency = N * ((amMaxHeapSizeMB/1024) - 1/2) + + int nConcurrentTasksPerGB = conf.getInt(ConfVars.HIVE_AM_AUTO_TUNE_NUM_CONCURRENT_TASKS_PER_GB.varname, + ConfVars.HIVE_AM_AUTO_TUNE_NUM_CONCURRENT_TASKS_PER_GB.defaultIntVal); + int totalConcurrency = Math.round( nConcurrentTasksPerGB * (amMaxHeapSizeMB/1024f - 0.5f) ); + + LOG.info("Setting " + TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY + + " to " + totalConcurrency + "."); + conf.setInt(TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, totalConcurrency); + } + else { + LOG.info("Auto-tuning disabled for AM settings."); + } + + return conf; + } + private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws IOException {