diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 99f39d3..b4f30bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -116,7 +116,7 @@ private final NumberFormat secondsFormat; private final NumberFormat commaFormat; private static final List shutdownList; - private Map workMap; + private final Map workMap; private StringBuffer diagnostics; @@ -129,7 +129,7 @@ public void run() { TezJobMonitor.killRunningJobs(); } try { - for (TezSessionState s: TezSessionState.getOpenSessions()) { + for (TezSessionState s : TezSessionPoolManager.getInstance().getOpenSessions()) { System.err.println("Shutting down tez session."); TezSessionPoolManager.getInstance().close(s, false); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 1798201..dfa539f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -20,13 +20,16 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -49,6 +52,9 @@ private static TezSessionPoolManager sessionPool = null; + private static List openSessions = Collections + .synchronizedList(new LinkedList()); + public static TezSessionPoolManager getInstance() throws Exception { if (sessionPool == null) { @@ -68,6 +74,7 @@ public void startPool() throws Exception { TezSessionState sessionState = defaultQueuePool.take(); newConf.set("tez.queue.name", sessionState.getQueueName()); sessionState.open(newConf); + openSessions.add(sessionState); defaultQueuePool.put(sessionState); } } @@ -148,6 +155,7 @@ private TezSessionState getNewSessionState(HiveConf conf, String what = "Created"; if (doOpen) { retTezSessionState.open(conf); + openSessions.add(retTezSessionState); what = "Started"; } @@ -175,6 +183,7 @@ public void close(TezSessionState tezSessionState, boolean keepTmpDir) throws Ex LOG.info("Closing tez session default? " + tezSessionState.isDefault()); if (!tezSessionState.isDefault()) { tezSessionState.close(keepTmpDir); + openSessions.remove(tezSessionState); } } @@ -184,9 +193,12 @@ public void stop() throws Exception { } // we can just stop all the sessions - for (TezSessionState sessionState: TezSessionState.getOpenSessions()) { + Iterator iter = openSessions.iterator(); + while (iter.hasNext()) { + TezSessionState sessionState = iter.next(); if (sessionState.isDefault()) { sessionState.close(false); + iter.remove(); } } } @@ -282,5 +294,10 @@ public void closeAndOpen(TezSessionState sessionState, HiveConf conf, } close(sessionState, keepTmpDir); sessionState.open(conf, additionalFiles); + openSessions.add(sessionState); + } + + public List getOpenSessions() { + return openSessions; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index a4483fa..34e8cc8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -94,9 +94,6 @@ private final Set localizedResources = new HashSet(); private boolean doAsEnabled; - private static List openSessions - = Collections.synchronizedList(new LinkedList()); - /** * Constructor. We do not automatically connect, because we only want to * load tez classes when the user has tez installed. @@ -125,10 +122,6 @@ public boolean isOpen() { * Get all open sessions. Only used to clean up at shutdown. * @return List */ - public static List getOpenSessions() { - return openSessions; - } - public static String makeSessionId() { return UUID.randomUUID().toString(); } @@ -281,8 +274,6 @@ public void open(HiveConf conf, String[] additionalFiles) } catch(InterruptedException ie) { //ignore } - - openSessions.add(this); } public void refreshLocalResourcesFromConf(HiveConf conf) @@ -332,7 +323,6 @@ public void close(boolean keepTmpDir) throws Exception { LOG.info("Closing Tez Session"); try { session.stop(); - openSessions.remove(this); } catch (SessionNotRunning nr) { // ignore }