diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 89c9349..40fd6e0 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -886,6 +886,9 @@ // Whether to generate the splits locally or in the AM (tez only) HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true), + HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false), + HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10), + // none, idonly, traverse, execution HIVESTAGEIDREARRANGE("hive.stageid.rearrange", "none"), HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES("hive.explain.dependency.append.tasktype", false), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 420d959..47ec61f 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -2168,6 +2168,22 @@ + hive.prewarm.enabled + false + + Enables container prewarm for tez (hadoop 2 only) + + + + + hive.prewarm.numcontainers + 10 + + Controls the number of containers to prewarm for tez (hadoop 2 only) + + + + hive.server2.table.type.mapping CLASSIC diff --git data/conf/tez/hive-site.xml data/conf/tez/hive-site.xml index 1af4495..d240056 100644 --- data/conf/tez/hive-site.xml +++ data/conf/tez/hive-site.xml @@ -194,4 +194,20 @@ Whether to use MR or Tez + + hive.prewarm.enabled + true + + Enables container prewarm for tez (hadoop 2 only) + + + + + hive.prewarm.numcontainers + 3 + + Controls the number of containers to prewarm for tez (hadoop 2 only) + + + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 77c0c46..8d058d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -86,6 +86,10 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexLocationHint; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.client.PreWarmContext; +import org.apache.tez.client.TezSessionConfiguration; import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; @@ -407,6 +411,49 @@ private LocalResource createLocalResource(FileSystem remoteFs, Path file, } /** + * @param sessionConfig session configuration + * @param numContainers number of containers to pre-warm + * @param localResources additional resources to pre-warm with + * @return prewarm context object + */ + public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers, + Map localResources) throws IOException, TezException { + + Configuration conf = sessionConfig.getTezConfiguration(); + + ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName()); + prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf)); + + PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, MRHelpers.getMapResource(conf), + new VertexLocationHint(numContainers, null)); + + Map combinedResources = new HashMap(); + + combinedResources.putAll(sessionConfig.getSessionResources()); + + try { + for(LocalResource lr : localizeTempFiles(conf)) { + combinedResources.put(getBaseName(lr), lr); + } + } catch(LoginException le) { + throw new IOException(le); + } + + if(localResources != null) { + combinedResources.putAll(localResources); + } + + context.setLocalResources(combinedResources); + + /* boiler plate task env */ + Map environment = new HashMap(); + MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); + context.setEnvironment(environment); + context.setJavaOpts(MRHelpers.getMapJavaOpts(conf)); + return context; + } + + /** * @param conf * @return path to destination directory on hdfs * @throws LoginException if we are unable to figure user information diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java new file mode 100644 index 0000000..9785415 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.tez.common.TezUtils; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.LogicalIOProcessor; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.TezProcessorContext; + +import java.net.URL; +import java.net.JarURLConnection; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.Map; +import java.util.jar.JarFile; +import java.util.jar.JarEntry; + +import javax.crypto.Mac; + +/** + * A simple sleep processor implementation that sleeps for the configured + * time in milliseconds. + * + * @see Config for configuring the HivePreWarmProcessor + */ +public class HivePreWarmProcessor implements LogicalIOProcessor { + + private static boolean prewarmed = false; + + private static final Log LOG = LogFactory.getLog(HivePreWarmProcessor.class); + + private Configuration conf; + + @Override + public void initialize(TezProcessorContext processorContext) + throws Exception { + byte[] userPayload = processorContext.getUserPayload(); + this.conf = TezUtils.createConfFromUserPayload(userPayload); + } + + @Override + public void run(Map inputs, + Map outputs) throws Exception { + if(prewarmed) { + /* container reuse */ + return; + } + /* these are things that goes through singleton initialization on most queries */ + FileSystem fs = FileSystem.get(conf); + Mac mac = Mac.getInstance("HmacSHA1"); + ReadaheadPool rpool = ReadaheadPool.getInstance(); + ShimLoader.getHadoopShims(); + + URL hiveurl = new URL("jar:"+DagUtils.getInstance().getExecJarPathLocal()+"!/"); + JarURLConnection hiveconn = (JarURLConnection)hiveurl.openConnection(); + JarFile hivejar = hiveconn.getJarFile(); + try { + Enumeration classes = hivejar.entries(); + while(classes.hasMoreElements()) { + JarEntry je = classes.nextElement(); + if (je.getName().endsWith(".class")) { + String klass = je.getName().replace(".class","").replaceAll("/","\\."); + if(klass.indexOf("ql.exec") != -1 || klass.indexOf("ql.io") != -1) { + /* several hive classes depend on the metastore APIs, which is not included + * in hive-exec.jar. These are the relatively safe ones - operators & io classes. + */ + if(klass.indexOf("vector") != -1 || klass.indexOf("Operator") != -1) { + Class.forName(klass); + } + } + } + } + } finally { + hivejar.close(); + } + prewarmed = true; + } + + @Override + public void handleEvents(List processorEvents) { + // Nothing to do + } + + @Override + public void close() throws Exception { + // Nothing to cleanup + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index b8552a3..aef6e68 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.AMConfiguration; @@ -43,7 +44,7 @@ import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; -import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.client.PreWarmContext; /** * Holds session state related to Tez @@ -134,8 +135,24 @@ public void open(String sessionId, HiveConf conf) session = new TezSession("HIVE-"+sessionId, sessionConfig); LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")"); + session.start(); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { + int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); + LOG.info("Prewarming " + n + " containers (id: " + sessionId + + ", scratch dir: " + tezScratchDir + ")"); + PreWarmContext context = utils.createPreWarmContext(sessionConfig, n, + commonLocalResources); + try { + session.preWarm(context); + } catch (InterruptedException ie) { + if (LOG.isDebugEnabled()) { + LOG.debug("Hive Prewarm threw an exception ", ie); + } + } + } + // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session // id is used for tez to reuse the current session rather than start a new one. conf.set("mapreduce.framework.name", "yarn-tez"); diff --git ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java index d4e737f..acf94da 100644 --- ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java +++ ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java @@ -20,20 +20,43 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import java.util.Arrays; +import java.util.Collection; + import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * Test SessionState */ +@RunWith(value = Parameterized.class) public class TestSessionState { + private final boolean prewarm; + + public TestSessionState(Boolean mode) { + this.prewarm = mode.booleanValue(); + } + + @Parameters + public static Collection data() { + return Arrays.asList(new Boolean[][] { {false}, {true}}); + } @Before - public void setup(){ - SessionState.start(new HiveConf()); + public void setup() { + HiveConf conf = new HiveConf(); + if (prewarm) { + HiveConf.setBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED, true); + HiveConf.setIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS, 1); + } + SessionState.start(conf); } /**