diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3783dc4..3b7030e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2470,6 +2470,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "remains the default engine for historical reasons, it is itself a historical engine\n" + "and is deprecated in Hive 2 line. It may be removed without further warning."), + // TODO Define a helper class which can accept and validate potential values HIVE_EXECUTION_MODE("hive.execution.mode", "container", new StringSet("container", "llap"), "Chooses whether query fragments will run in container or in llap"), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index f8f3cad..687c33a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -45,6 +45,7 @@ import javax.security.auth.login.LoginException; +import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; @@ -82,7 +83,7 @@ private Semaphore llapQueue; private HiveConf initConf = null; - int numConcurrentLlapQueries = -1; + int numConcurrentLlapQueries = -1; // TODO This is not thread safe private long sessionLifetimeMs = 0; private long sessionLifetimeJitterMs = 0; /** A queue for initial sessions that have not been started yet. */ @@ -112,10 +113,11 @@ protected TezSessionPoolManager() { } private void startInitialSession(TezSessionPoolSession sessionState) throws Exception { - HiveConf newConf = new HiveConf(initConf); + HiveConf newConf = new HiveConf(initConf); // TODO Why is this configuration management not happening inside TezSessionPool. + // Makes no senses for it to be mixed up like this. boolean isUsable = sessionState.tryUse(); if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup"); - newConf.set("tez.queue.name", sessionState.getQueueName()); + newConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName()); sessionState.open(newConf); if (sessionState.returnAfterUse()) { defaultQueuePool.put(sessionState); @@ -134,6 +136,7 @@ public void startPool() throws Exception { startInitialSession(session); } } else { + // TODO What is this doing now ? final SessionState parentSessionState = SessionState.get(); // The runnable has no mutable state, so each thread can run the same thing. final AtomicReference firstError = new AtomicReference<>(null); @@ -150,6 +153,7 @@ public void run() { } catch (Exception e) { if (!firstError.compareAndSet(null, e)) { LOG.error("Failed to start session; ignoring due to previous error", e); + // TODO Why even continue after this. We're already in a state where things are messed up ? } } } @@ -248,8 +252,11 @@ public void run() { } } + // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration private TezSessionPoolSession createAndInitSession(String queue, boolean isDefault) { TezSessionPoolSession sessionState = createSession(TezSessionState.makeSessionId()); + // TODO When will the queue ever be null. + // Pass queue and default in as constructor parameters, and make them final. if (queue != null) { sessionState.setQueueName(queue); } @@ -266,6 +273,7 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen, throws Exception { String queueName = conf.get("tez.queue.name"); + // TODO Session re-use completely disabled for doAs=true. Always launches a new session. boolean nonDefaultUser = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); /* @@ -283,6 +291,7 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen, LOG.info("Choosing a session from the defaultQueuePool"); while (true) { + // TODO Should block here for a session to beomce available. If a restart is in progress - it's alreadt been taken out of this queue ? TezSessionPoolSession result = defaultQueuePool.take(); if (result.tryUse()) return result; LOG.info("Couldn't use a session [" + result + "]; attempting another one"); @@ -333,7 +342,7 @@ public void returnSession(TezSessionState tezSessionState, boolean llap) public static void closeIfNotDefault( TezSessionState tezSessionState, boolean keepTmpDir) throws Exception { - LOG.info("Closing tez session default? " + tezSessionState.isDefault()); + LOG.info("Closing tez session default? " + tezSessionState.isDefault()); // TODO Absurdly placed log statement. No session id logged. Closing isn't necessarily true. if (!tezSessionState.isDefault()) { tezSessionState.close(keepTmpDir); } @@ -395,12 +404,19 @@ public TezSessionState getSession(TezSessionState session, HiveConf conf, boolea private static boolean canWorkWithSameSession(TezSessionState session, HiveConf conf) throws HiveException { if (session == null || conf == null) { + // TODO Will the session being passed in here be null if this is being used from the set of default sessions. + // Really don't see why - since SessionState can be re-used - in which case the session will be re-used. return false; } try { UserGroupInformation ugi = Utils.getUGI(); String userName = ugi.getShortUserName(); + // TODO Will these checks work if some other user logs in. Isn't a doAs check required somewhere here as well. + // Should a doAs check happen here instead of after the user test. + // With HiveServer2 - who is the incoming user in terms of UGI (the hive user itself, or the user who actually submitted the query) + + // Working in the assumption that the user here will be the hive user if doAs = false, we'll make it past this false check. LOG.info("The current user: " + userName + ", session user: " + session.getUser()); if (userName.equals(session.getUser()) == false) { LOG.info("Different users incoming: " + userName + " existing: " + session.getUser()); @@ -410,6 +426,7 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf throw new HiveException(e); } + // TODO Get rid of this brilliant piece of code. boolean doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); // either variables will never be null because a default value is returned in case of absence if (doAsEnabled != conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { @@ -432,14 +449,18 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf if (!queueName.equals(conf.get("tez.queue.name"))) { // the String.equals method handles the case of conf not having the queue name as well. return false; + } else { + return true; } } else { // this session should never be a default session unless something has messed up. + // TODO: Why not ? (Everything is accessed via the SessionPoolManager). Is the check supposed to fail before this ? + + // TODO This still makes no sense. Giw us this exception not hit each time. Also looks like this was meant to be + // the else block of if (!queueName.equals ... given the exception message. throw new HiveException("Default queue should always be returned." + "Hence we should not be here."); } - - return true; } public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen, @@ -447,6 +468,7 @@ public TezSessionState getSession(TezSessionState session, HiveConf conf, boolea if (llap && (this.numConcurrentLlapQueries > 0)) { llapQueue.acquire(); // blocks if no more llap queries can be submitted. } + if (canWorkWithSameSession(session, conf)) { return session; } @@ -464,6 +486,7 @@ public void closeAndOpen(TezSessionState sessionState, HiveConf conf, if (sessionConf != null && sessionConf.get("tez.queue.name") != null) { conf.set("tez.queue.name", sessionConf.get("tez.queue.name")); } + // TODO Why closeIfNotDfeault when invoked after a failure on the TezSession not being available ? closeIfNotDefault(sessionState, keepTmpDir); sessionState.open(conf, additionalFiles); } @@ -491,6 +514,7 @@ private void closeAndReopen(TezSessionPoolSession oldSession) throws Exception { } finally { TezSessionPoolSession newSession = createAndInitSession(queueName, isDefault); newSession.open(conf, additionalFiles, scratchDir); + // TODO Why is the isDefault flag being ignored here ? defaultQueuePool.put(newSession); } } @@ -506,6 +530,7 @@ private void runRestartThread() { } catch (InterruptedException ie) { throw ie; } catch (Exception e) { + // TODO What happens in this case - no new sessions launched ? LOG.error("Failed to close or restart a session, ignoring", e); } } @@ -589,6 +614,7 @@ public TezSessionPoolSession(String sessionId, TezSessionPoolManager parent) { @Override public void close(boolean keepTmpDir) throws Exception { + // TODO Inconsistent state handling. This does not update sessionState. returnAfterUse can fail as a result. try { super.close(keepTmpDir); } finally { @@ -670,7 +696,7 @@ public boolean returnAfterUse() throws Exception { public boolean tryExpire(boolean isAsync) throws Exception { if (expirationNs == null) return true; if (!shouldExpire()) return false; - while (true) { + while (true) { // TODO What is this tight loop - what is it waiting on ? if (sessionState.get() != STATE_NONE) return true; // returnAfterUse will take care of this if (sessionState.compareAndSet(STATE_NONE, STATE_EXPIRED)) { closeAndRestartExpiredSession(isAsync); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 919b35a..70cee01 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -220,12 +220,14 @@ protected void openInternal(final HiveConf conf, Collection additionalFi boolean isAsync, LogHelper console, Path scratchDir) throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { this.conf = conf; + // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two. this.queueName = conf.get("tez.queue.name"); this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); - final boolean llapMode = "llap".equals(HiveConf.getVar( + final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar( conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ? UserGroupInformation ugi = Utils.getUGI(); user = ugi.getShortUserName(); LOG.info("User of session id " + sessionId + " is " + user); @@ -279,6 +281,7 @@ protected void openInternal(final HiveConf conf, Collection additionalFi llapCredentials = new Credentials(); llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig)); } + // TODO Change this to not serialize the entire Configuration - minor. UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig); // we need plugins to handle llap and uber mode servicePluginsDescriptor = ServicePluginsDescriptor.create(true, @@ -641,7 +644,7 @@ public String getQueueName() { } public void setDefault() { - defaultQueue = true; + defaultQueue = true; } public boolean isDefault() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 9e114c0..a745978 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -152,6 +152,7 @@ public int execute(DriverContext driverContext) { getExtraLocalResources(jobConf, scratchDir, inputOutputJars); // Ensure the session is open and has the necessary local resources + // TODO Lots of exceptions can come out of this one invocation. If not returned in the finally block - things blow up. updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources); List additionalLr = session.getLocalizedResources(); @@ -270,6 +271,7 @@ void updateSession(TezSessionState session, .hasResources(inputOutputJars); TezClient client = session.getSession(); + // TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ? if (client == null) { // can happen if the user sets the tez flag after the session was // established @@ -448,6 +450,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, console.printInfo("Tez session was closed. Reopening..."); // close the old one, but keep the tmp files around + // TODO Why is the session being create using a conf instance belonging to TezTask - instead of the session conf instance. TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars, true); console.printInfo("Session re-established.");