diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index e81b73d..7d54b2d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -127,7 +127,7 @@ private final NumberFormat secondsFormat; private final NumberFormat commaFormat; private static final List shutdownList; - private Map workMap; + private final Map workMap; private StringBuffer diagnostics; @@ -138,10 +138,7 @@ public void run() { TezJobMonitor.killRunningJobs(); try { - for (TezSessionState s : TezSessionPoolManager.getInstance().getOpenSessions()) { - System.err.println("Shutting down tez session."); - TezSessionPoolManager.getInstance().closeIfNotDefault(s, false); - } + TezSessionPoolManager.getInstance().closeNonDefaultSessions(false); } catch (Exception e) { // ignore } @@ -396,12 +393,14 @@ private static boolean hasInterruptedException(Throwable e) { * currently running tez queries. No guarantees, best effort only. */ public static void killRunningJobs() { - for (DAGClient c: shutdownList) { - try { - System.err.println("Trying to shutdown DAG"); - c.tryKillDAG(); - } catch (Exception e) { - // ignore + synchronized (shutdownList) { + for (DAGClient c : shutdownList) { + try { + System.err.println("Trying to shutdown DAG"); + c.tryKillDAG(); + } catch (Exception e) { + // ignore + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 3bfe35a..1321b5f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -85,12 +85,9 @@ private boolean inited = false; - - private static TezSessionPoolManager sessionPool = null; - private static List openSessions = Collections - .synchronizedList(new LinkedList()); + private static List openSessions = new LinkedList(); public static TezSessionPoolManager getInstance() throws Exception { @@ -137,6 +134,7 @@ public void setupPool(HiveConf conf) throws InterruptedException { + sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") ms"); } expirationQueue = new PriorityBlockingQueue<>(11, new Comparator() { + @Override public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) { assert o1.expirationNs != null && o2.expirationNs != null; return o1.expirationNs.compareTo(o2.expirationNs); @@ -144,11 +142,13 @@ public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) { }); restartQueue = new LinkedBlockingQueue<>(); expirationThread = new Thread(new Runnable() { + @Override public void run() { runExpirationThread(); } }, "TezSessionPool-expiration"); restartThread = new Thread(new Runnable() { + @Override public void run() { runRestartThread(); } @@ -279,13 +279,15 @@ public void stop() throws Exception { return; } - // we can just stop all the sessions - Iterator iter = openSessions.iterator(); - while (iter.hasNext()) { - TezSessionState sessionState = iter.next(); - if (sessionState.isDefault()) { - sessionState.close(false); - iter.remove(); + synchronized (openSessions) { + // we can just stop all the sessions + Iterator iter = openSessions.iterator(); + while (iter.hasNext()) { + TezSessionState sessionState = iter.next(); + if (sessionState.isDefault()) { + sessionState.close(false); + iter.remove(); + } } } @@ -402,8 +404,18 @@ public void closeAndOpen(TezSessionState sessionState, HiveConf conf, sessionState.open(conf, additionalFiles); } - public List getOpenSessions() { - return openSessions; + public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception { + synchronized (openSessions) { + Iterator iter = openSessions.iterator(); + while (iter.hasNext()) { + System.err.println("Shutting down tez session."); + TezSessionState sessionState = iter.next(); + closeIfNotDefault(sessionState, keepTmpDir); + if (sessionState.isDefault() == false) { + iter.remove(); + } + } + } } private void closeAndReopen(TezSessionPoolSession oldSession) throws Exception { @@ -522,7 +534,9 @@ public void close(boolean keepTmpDir) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Closed a pool session [" + this + "]"); } - openSessions.remove(this); + synchronized (openSessions) { + openSessions.remove(this); + } if (parent.expirationQueue != null) { parent.expirationQueue.remove(this); } @@ -534,7 +548,9 @@ protected void openInternal(HiveConf conf, Collection additionalFiles, boolean isAsync, LogHelper console, Path scratchDir) throws IOException, LoginException, URISyntaxException, TezException { super.openInternal(conf, additionalFiles, isAsync, console, scratchDir); - openSessions.add(this); + synchronized (openSessions) { + openSessions.add(this); + } if (parent.expirationQueue != null) { long jitterModMs = (long)(parent.sessionLifetimeJitterMs * rdm.nextFloat()); expirationNs = System.nanoTime() + (parent.sessionLifetimeMs + jitterModMs) * 1000000L;