diff --git conf/hive-default.xml.template conf/hive-default.xml.template index f01e715..63ab999 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1919,6 +1919,14 @@ + + hive.optimize.tez + false + + Setting this property turns on Tez execution. Needs tez installed on the + cluster. (Only availble on hadoop 2) + + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index a54d8b7..f954bf7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -27,8 +27,8 @@ import javax.security.auth.login.LoginException; -import org.apache.commons.lang.StringUtils; import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -143,7 +143,7 @@ private static JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { * @param w The second vertex (sink) * @return */ - public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) + public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) throws IOException { // Tez needs to setup output subsequent input pairs correctly @@ -180,18 +180,18 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo, } // write out the operator plan - Path planPath = Utilities.setMapWork(conf, mapWork, + Path planPath = Utilities.setMapWork(conf, mapWork, mrScratchDir.toUri().toString(), false); LocalResource planLr = createLocalResource(fs, planPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); // setup input paths and split info - List inputPaths = Utilities.getInputPaths(conf, mapWork, + List inputPaths = Utilities.getInputPaths(conf, mapWork, mrScratchDir.toUri().toString(), ctx); Utilities.setInputPaths(conf, inputPaths); - InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, + InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, new Path(tezDir, ""+seqNo)); // create the directories FileSinkOperators need @@ -332,7 +332,7 @@ private static LocalResource createLocalResource(FileSystem remoteFs, Path file, * @throws LoginException if we are unable to figure user information * @throws IOException when any dfs operation fails. */ - private static Path getDefaultDestDir(Configuration conf) throws LoginException, IOException { + public static Path getDefaultDestDir(Configuration conf) throws LoginException, IOException { UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); String userName = ShimLoader.getHadoopShims().getShortUserName(ugi); String userPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_USER_INSTALL_DIR); @@ -409,11 +409,11 @@ private static Path getDefaultDestDir(Configuration conf) throws LoginException, } // the api that finds the jar being used by this class on disk - private static String getExecJarPathLocal () throws URISyntaxException { + public static String getExecJarPathLocal () throws URISyntaxException { // returns the location on disc of the jar of this class. return DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString(); } - + /* * Helper function to retrieve the basename of a local resource */ @@ -425,7 +425,7 @@ public static String getBaseName(LocalResource lr) { * @param pathStr - the string from which we try to determine the resource base name * @return the name of the resource from a given path string. */ - private static String getResourceBaseName(String pathStr) { + public static String getResourceBaseName(String pathStr) { String[] splits = pathStr.split("/"); return splits[splits.length - 1]; } @@ -466,7 +466,7 @@ private static boolean checkPreExisting(Path src, Path dest, Configuration conf) * @return localresource from tez localization. * @throws IOException when any file system related calls fails. */ - private static LocalResource localizeResource(Path src, Path dest, Configuration conf) + public static LocalResource localizeResource(Path src, Path dest, Configuration conf) throws IOException { FileSystem destFS = dest.getFileSystem(conf); if (!(destFS instanceof DistributedFileSystem)) { @@ -486,79 +486,6 @@ private static LocalResource localizeResource(Path src, Path dest, Configuration } /** - * Returns a local resource representing the hive-exec jar. This resource will - * be used to execute the plan on the cluster. - * @param conf - * @return LocalResource corresponding to the localized hive exec resource. - * @throws IOException when any file system related call fails. - * @throws LoginException when we are unable to determine the user. - * @throws URISyntaxException when current jar location cannot be determined. - */ - public static LocalResource createHiveExecLocalResource(HiveConf conf) - throws IOException, LoginException, URISyntaxException { - String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY); - String currentVersionPathStr = getExecJarPathLocal(); - String currentJarName = getResourceBaseName(currentVersionPathStr); - FileSystem fs = null; - Path jarPath = null; - FileStatus dirStatus = null; - - if (hiveJarDir != null) { - // check if it is a valid directory in HDFS - Path hiveJarDirPath = new Path(hiveJarDir); - fs = hiveJarDirPath.getFileSystem(conf); - - if (!(fs instanceof DistributedFileSystem)) { - throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir)); - } - - try { - dirStatus = fs.getFileStatus(hiveJarDirPath); - } catch (FileNotFoundException fe) { - // do nothing - } - if ((dirStatus != null) && (dirStatus.isDir())) { - FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath); - for (FileStatus fstatus : listFileStatus) { - String jarName = getResourceBaseName(fstatus.getPath().toString()); - if (jarName.equals(currentJarName)) { - // we have found the jar we need. - jarPath = fstatus.getPath(); - return localizeResource(null, jarPath, conf); - } - } - - // jar wasn't in the directory, copy the one in current use - if (jarPath == null) { - return localizeResource(new Path(currentVersionPathStr), hiveJarDirPath, conf); - } - } - } - - /* - * specified location does not exist or is not a directory - * try to push the jar to the hdfs location pointed by - * config variable HIVE_INSTALL_DIR. Path will be - * HIVE_INSTALL_DIR/{username}/.hiveJars/ - */ - if ((hiveJarDir == null) || (dirStatus == null) || - ((dirStatus != null) && (!dirStatus.isDir()))) { - Path dest = getDefaultDestDir(conf); - String destPathStr = dest.toString(); - String jarPathStr = destPathStr + "/" + currentJarName; - dirStatus = fs.getFileStatus(dest); - if (dirStatus.isDir()) { - return localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf); - } else { - throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString())); - } - } - - // we couldn't find any valid locations. Throw exception - throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg()); - } - - /** * Creates and initializes a JobConf object that can be used to execute * the DAG. The configuration object will contain configurations from mapred-site * overlaid with key/value pairs from the hiveConf object. Finally it will also diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java new file mode 100644 index 0000000..017540e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + +import javax.security.auth.login.LoginException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.AMConfiguration; +import org.apache.tez.client.TezSession; +import org.apache.tez.client.TezSessionConfiguration; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.hadoop.MRHelpers; + +/** + * Holds session state related to Tez + */ +public class TezSessionState { + + private static final Log LOG = LogFactory.getLog(TezSessionState.class.getName()); + private static final String TEZ_DIR = "_tez_session_dir"; + + private HiveConf conf; + private Path tezScratchDir; + private LocalResource appJarLr; + private TezSession session; + private String sessionId; + + /** + * Constructor. We do not automatically connect, because we only want to + * load tez classes when the user has tez installed. + */ + public void TezSessionContext() { + } + + /** + * Returns whether a session has been established + */ + public boolean isOpen() { + return session != null; + } + + /** + * Creates a tez session. A session is tied to either a cli/hs2 session. You can + * submit multiple DAGs against a session (as long as they are executed serially). + * @throws IOException + * @throws URISyntaxException + * @throws LoginException + * @throws TezException + */ + public void open(String sessionId, HiveConf conf) + throws IOException, LoginException, URISyntaxException, TezException { + + this.sessionId = sessionId; + this.conf = conf; + + // create the tez tmp dir + tezScratchDir = createTezDir(sessionId); + + // generate basic tez config + TezConfiguration tezConfig = new TezConfiguration(conf); + + tezConfig.set(TezConfiguration.TEZ_AM_JAVA_OPTS, + MRHelpers.getMRAMJavaOpts(conf)); + + tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); + + // unless already installed on all the cluster nodes, we'll have to + // localize hive-exec.jar as well. + appJarLr = createHiveExecLocalResource(); + + // configuration for the application master + Map commonLocalResources = new HashMap(); + commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr); + + AMConfiguration amConfig = new AMConfiguration("default", null, commonLocalResources, + tezConfig, null); + + // configuration for the session + TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig); + + // and finally we're ready to create and start the session + session = new TezSession("HIVE-"+sessionId, sessionConfig); + + LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")"); + session.start(); + } + + /** + * Close a tez session. Will cleanup any tez/am related resources. After closing a session + * no further DAGs can be executed against it. + * @throws IOException + * @throws TezException + */ + public void close() throws TezException, IOException { + if (!isOpen()) { + return; + } + + LOG.info("Closing Tez Session"); + session.stop(); + FileSystem fs = tezScratchDir.getFileSystem(conf); + fs.delete(tezScratchDir, true); + session = null; + tezScratchDir = null; + conf = null; + appJarLr = null; + } + + public String getSessionId() { + return sessionId; + } + + public TezSession getSession() { + return session; + } + + public Path getTezScratchDir() { + return tezScratchDir; + } + + public LocalResource getAppJarLr() { + return appJarLr; + } + + /** + * createTezDir creates a temporary directory in the scratchDir folder to + * be used with Tez. Assumes scratchDir exists. + */ + private Path createTezDir(String sessionId) + throws IOException { + + // tez needs its own scratch dir (per session) + Path tezDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR), + TEZ_DIR); + tezDir = new Path(tezDir, sessionId); + FileSystem fs = tezDir.getFileSystem(conf); + fs.mkdirs(tezDir); + + // don't keep the directory around on non-clean exit + fs.deleteOnExit(tezDir); + + return tezDir; + } + + /** + * Returns a local resource representing the hive-exec jar. This resource will + * be used to execute the plan on the cluster. + * @param conf + * @return LocalResource corresponding to the localized hive exec resource. + * @throws IOException when any file system related call fails. + * @throws LoginException when we are unable to determine the user. + * @throws URISyntaxException when current jar location cannot be determined. + */ + private LocalResource createHiveExecLocalResource() + throws IOException, LoginException, URISyntaxException { + String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY); + String currentVersionPathStr = DagUtils.getExecJarPathLocal(); + String currentJarName = DagUtils.getResourceBaseName(currentVersionPathStr); + FileSystem fs = null; + Path jarPath = null; + FileStatus dirStatus = null; + + if (hiveJarDir != null) { + // check if it is a valid directory in HDFS + Path hiveJarDirPath = new Path(hiveJarDir); + fs = hiveJarDirPath.getFileSystem(conf); + + if (!(fs instanceof DistributedFileSystem)) { + throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir)); + } + + try { + dirStatus = fs.getFileStatus(hiveJarDirPath); + } catch (FileNotFoundException fe) { + // do nothing + } + if ((dirStatus != null) && (dirStatus.isDir())) { + FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath); + for (FileStatus fstatus : listFileStatus) { + String jarName = DagUtils.getResourceBaseName(fstatus.getPath().toString()); + if (jarName.equals(currentJarName)) { + // we have found the jar we need. + jarPath = fstatus.getPath(); + return DagUtils.localizeResource(null, jarPath, conf); + } + } + + // jar wasn't in the directory, copy the one in current use + if (jarPath == null) { + return DagUtils.localizeResource(new Path(currentVersionPathStr), hiveJarDirPath, conf); + } + } + } + + /* + * specified location does not exist or is not a directory + * try to push the jar to the hdfs location pointed by + * config variable HIVE_INSTALL_DIR. Path will be + * HIVE_INSTALL_DIR/{username}/.hiveJars/ + */ + if ((hiveJarDir == null) || (dirStatus == null) || + ((dirStatus != null) && (!dirStatus.isDir()))) { + Path dest = DagUtils.getDefaultDestDir(conf); + String destPathStr = dest.toString(); + String jarPathStr = destPathStr + "/" + currentJarName; + dirStatus = fs.getFileStatus(dest); + if (dirStatus.isDir()) { + return DagUtils.localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf); + } else { + throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString())); + } + } + + // we couldn't find any valid locations. Throw exception + throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg()); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index f5d136f..22b2321 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -36,17 +36,16 @@ import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.client.TezClient; +import org.apache.tez.client.TezSession; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; -import org.apache.tez.mapreduce.hadoop.MRHelpers; /** * @@ -67,6 +66,7 @@ public int execute(DriverContext driverContext) { boolean cleanContext = false; Context ctx = null; DAGClient client = null; + TezSessionState session = null; try { // Get or create Context object. If we create it we have to clean @@ -77,6 +77,16 @@ public int execute(DriverContext driverContext) { cleanContext = true; } + // Need to remove this static hack. But this is the way currently to + // get a session. + SessionState ss = SessionState.get(); + session = ss.getTezSession(); + if (!session.isOpen()) { + // can happen if the user sets the tez flag after the session was + // established + session.open(ss.getSessionId(), conf); + } + // we will localize all the files (jars, plans, hashtables) to the // scratch dir. let's create this first. Path scratchDir = new Path(ctx.getMRScratchDir()); @@ -89,13 +99,13 @@ public int execute(DriverContext driverContext) { // unless already installed on all the cluster nodes, we'll have to // localize hive-exec.jar as well. - LocalResource appJarLr = DagUtils.createHiveExecLocalResource(conf); + LocalResource appJarLr = session.getAppJarLr(); // next we translate the TezWork to a Tez DAG DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx); // submit will send the job to the cluster and start executing - client = submit(jobConf, dag, scratchDir, appJarLr); + client = submit(jobConf, dag, scratchDir, appJarLr, session.getSession()); // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); @@ -151,7 +161,7 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, // translate work to vertex JobConf wxConf = DagUtils.initializeVertexConf(conf, w); - Vertex wx = DagUtils.createVertex(wxConf, w, tezDir, + Vertex wx = DagUtils.createVertex(wxConf, w, tezDir, i--, appJarLr, additionalLr, fs, ctx); dag.addVertex(wx); workToVertex.put(w, wx); @@ -168,25 +178,12 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, return dag; } - private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr) + private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, + LocalResource appJarLr, TezSession session) throws IOException, TezException, InterruptedException { - TezClient tezClient = new TezClient(new TezConfiguration(conf)); - - // environment variables used by application master - Map amEnv = new HashMap(); - MRHelpers.updateEnvironmentForMRTasks(conf, amEnv, false); - - // setup local resources used by application master - Map amLrs = new HashMap(); - amLrs.put(DagUtils.getBaseName(appJarLr), appJarLr); - - Path tezDir = DagUtils.getTezDir(scratchDir); - // ready to start execution on the cluster - DAGClient dagClient = tezClient.submitDAGApplication(dag, tezDir, - null, "default", Collections.singletonList(""), amEnv, amLrs, - new TezConfiguration(conf)); + DAGClient dagClient = session.submitDAG(dag); return dagClient; } @@ -209,7 +206,7 @@ private int close(TezWork work, int rc) { // jobClose needs to execute successfully otherwise fail task if (rc == 0) { rc = 3; - String mesg = "Job Commit failed with exception '" + String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'"; console.printError(mesg, "\n" + StringUtils.stringifyException(e)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index ab369f0..d7f5a6f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -131,6 +132,8 @@ private Map> localMapRedErrors; + private final TezSessionState tezSessionState; + /** * Lineage state. */ @@ -189,6 +192,7 @@ public SessionState(HiveConf conf) { this.conf = conf; isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT); ls = new LineageState(); + tezSessionState = new TezSessionState(); overriddenConfigurations = new HashMap(); overriddenConfigurations.putAll(HiveConf.getConfSystemProperties()); // if there isn't already a session name, go ahead and create it. @@ -281,6 +285,16 @@ public static SessionState start(SessionState startSs) { throw new RuntimeException(e); } + if (HiveConf.getBoolVar(startSs.getConf(), HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ)) { + try { + startSs.tezSessionState.open(startSs.getSessionId(), startSs.conf); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + LOG.info("No Tez session required at this point. hive.optimize.tez is false."); + } + return startSs; } @@ -749,6 +763,12 @@ public void close() throws IOException { } catch (IOException e) { LOG.info("Error removing session resource dir " + resourceDir, e); } + + try { + tezSessionState.close(); + } catch (Exception e) { + LOG.info("Error closing tez session", e); + } } /** @@ -771,4 +791,8 @@ public PerfLogger getPerfLogger(boolean resetPerfLogger) { return perfLogger; } + public TezSessionState getTezSession() { + return tezSessionState; + } + }