diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index f27bde8..c3e3f08 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -830,7 +830,7 @@ public LocalResource localizeResource(Path src, Path dest, Configuration conf) } return createLocalResource(destFS, dest, LocalResourceType.FILE, - LocalResourceVisibility.APPLICATION); + LocalResourceVisibility.PRIVATE); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 27e4cd0..2959fcc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -28,6 +28,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import javax.security.auth.login.LoginException; @@ -76,8 +77,8 @@ private boolean defaultQueue = false; private String user; - private HashSet additionalFilesNotFromConf = null; - private List localizedResources; + private final Set additionalFilesNotFromConf = new HashSet(); + private final Set localizedResources = new HashSet(); private static List openSessions = Collections.synchronizedList(new LinkedList()); @@ -143,22 +144,15 @@ public void open(HiveConf conf, String[] additionalFiles) // create the tez tmp dir tezScratchDir = createTezDir(sessionId); - String dir = tezScratchDir.toString(); - // Localize resources to session scratch dir - localizedResources = utils.localizeTempFilesFromConf(dir, conf); - List handlerLr = utils.localizeTempFiles(dir, conf, additionalFiles); - if (handlerLr != null) { - if (localizedResources == null) { - localizedResources = handlerLr; - } else { - localizedResources.addAll(handlerLr); - } - additionalFilesNotFromConf = new HashSet(); + additionalFilesNotFromConf.clear(); + if (additionalFiles != null) { for (String originalFile : additionalFiles) { additionalFilesNotFromConf.add(originalFile); } } + refreshLocalResourcesFromConf(conf); + // generate basic tez config TezConfiguration tezConfig = new TezConfiguration(conf); @@ -171,10 +165,8 @@ public void open(HiveConf conf, String[] additionalFiles) // configuration for the application master Map commonLocalResources = new HashMap(); commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr); - if (localizedResources != null) { - for (LocalResource lr : localizedResources) { - commonLocalResources.put(utils.getBaseName(lr), lr); - } + for (LocalResource lr : localizedResources) { + commonLocalResources.put(utils.getBaseName(lr), lr); } // Create environment for AM. @@ -216,9 +208,31 @@ public void open(HiveConf conf, String[] additionalFiles) openSessions.add(this); } + public void refreshLocalResourcesFromConf(HiveConf conf) + throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { + + String dir = tezScratchDir.toString(); + + localizedResources.clear(); + + // these are local resources set through add file, jar, etc + List lrs = utils.localizeTempFilesFromConf(dir, conf); + if (lrs != null) { + localizedResources.addAll(lrs); + } + + // these are local resources that are set through the mr "tmpjars" property + List handlerLr = utils.localizeTempFiles(dir, conf, + additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()])); + + if (handlerLr != null) { + localizedResources.addAll(handlerLr); + } + } + public boolean hasResources(String[] localAmResources) { if (localAmResources == null || localAmResources.length == 0) return true; - if (additionalFilesNotFromConf == null || additionalFilesNotFromConf.isEmpty()) return false; + if (additionalFilesNotFromConf.isEmpty()) return false; for (String s : localAmResources) { if (!additionalFilesNotFromConf.contains(s)) return false; } @@ -252,8 +266,8 @@ public void close(boolean keepTmpDir) throws TezException, IOException { tezScratchDir = null; conf = null; appJarLr = null; - additionalFilesNotFromConf = null; - localizedResources = null; + additionalFilesNotFromConf.clear(); + localizedResources.clear(); } public void cleanupScratchDir () throws IOException { @@ -369,7 +383,7 @@ public HiveConf getConf() { } public List getLocalizedResources() { - return localizedResources; + return new ArrayList(localizedResources); } public String getUser() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 84cfca4..949bcfb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -120,33 +120,28 @@ public int execute(DriverContext driverContext) { // create the tez tmp dir scratchDir = utils.createTezDir(scratchDir, conf); - boolean hasResources = session.hasResources(inputOutputJars); - - if (ss.hasAddedResource()) { - // need to re-launch session because of new added jars. - hasResources = false; - // reset the added resource flag for this session since we would - // relocalize (either by restarting or relocalizing) due to the above - // hasResources flag. - ss.setAddedResource(false); - } - - // If we have any jars from input format, we need to restart the session because - // AM will need them; so, AM has to be restarted. What a mess... - if (!hasResources && session.isOpen()) { - LOG.info("Tez session being reopened to pass custom jars to AM"); - TezSessionPoolManager.getInstance().close(session); - session = TezSessionPoolManager.getInstance().getSession(null, conf, false); - ss.setTezSession(session); - } if (!session.isOpen()) { // can happen if the user sets the tez flag after the session was // established LOG.info("Tez session hasn't been created yet. Opening session"); session.open(conf, inputOutputJars); + } else { + session.refreshLocalResourcesFromConf(conf); } + List additionalLr = session.getLocalizedResources(); + + // log which resources we're adding (apart from the hive exec) + if (LOG.isDebugEnabled()) { + if (additionalLr == null || additionalLr.size() == 0) { + LOG.debug("No local resources to process (other than hive-exec)"); + } else { + for (LocalResource lr: additionalLr) { + LOG.debug("Adding local resource: " + lr.getResource()); + } + } + } // unless already installed on all the cluster nodes, we'll have to // localize hive-exec.jar as well. @@ -156,7 +151,7 @@ public int execute(DriverContext driverContext) { DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx); // submit will send the job to the cluster and start executing - client = submit(jobConf, dag, scratchDir, appJarLr, session); + client = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr); // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); @@ -290,15 +285,23 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, } DAGClient submit(JobConf conf, DAG dag, Path scratchDir, - LocalResource appJarLr, TezSessionState sessionState) + LocalResource appJarLr, TezSessionState sessionState, + List additionalLr) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); DAGClient dagClient = null; + Map resourceMap = new HashMap(); + if (additionalLr != null) { + for (LocalResource lr: additionalLr) { + resourceMap.put(utils.getBaseName(lr), lr); + } + } + try { // ready to start execution on the cluster - dagClient = sessionState.getSession().submitDAG(dag); + dagClient = sessionState.getSession().submitDAG(dag, resourceMap); } catch (SessionNotRunning nr) { console.printInfo("Tez session was closed. Reopening..."); @@ -306,7 +309,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); console.printInfo("Session re-established."); - dagClient = sessionState.getSession().submitDAG(dag); + dagClient = sessionState.getSession().submitDAG(dag, resourceMap); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 7feba1d..342f219 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -164,8 +164,6 @@ private final String CONFIG_AUTHZ_SETTINGS_APPLIED_MARKER = "hive.internal.ss.authz.settings.applied.marker"; - private boolean addedResource; - /** * Lineage state. */ @@ -1004,12 +1002,4 @@ public void applyAuthorizationPolicy() throws HiveException { conf.set(CONFIG_AUTHZ_SETTINGS_APPLIED_MARKER, Boolean.TRUE.toString()); } - - public boolean hasAddedResource() { - return addedResource; - } - - public void setAddedResource(boolean addedResouce) { - this.addedResource = addedResouce; - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 3d55a7c..2de9989 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -200,7 +201,7 @@ public void testEmptyWork() throws IllegalArgumentException, IOException, Except @Test public void testSubmit() throws Exception { DAG dag = new DAG("test"); - task.submit(conf, dag, path, appLr, sessionState); + task.submit(conf, dag, path, appLr, sessionState, new LinkedList()); // validate close/reopen verify(sessionState, times(1)).open(any(HiveConf.class)); verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043