diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index e81b73d..9ec46ba 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -127,7 +127,7 @@ private final NumberFormat secondsFormat; private final NumberFormat commaFormat; private static final List shutdownList; - private Map workMap; + private final Map workMap; private StringBuffer diagnostics; @@ -138,9 +138,13 @@ public void run() { TezJobMonitor.killRunningJobs(); try { - for (TezSessionState s : TezSessionPoolManager.getInstance().getOpenSessions()) { - System.err.println("Shutting down tez session."); - TezSessionPoolManager.getInstance().closeIfNotDefault(s, false); + List openSessions = + TezSessionPoolManager.getInstance().getOpenSessions(); + synchronized (openSessions) { + for (TezSessionState s : openSessions) { + System.err.println("Shutting down tez session."); + TezSessionPoolManager.getInstance().closeIfNotDefault(s, false); + } } } catch (Exception e) { // ignore @@ -396,12 +400,14 @@ private static boolean hasInterruptedException(Throwable e) { * currently running tez queries. No guarantees, best effort only. */ public static void killRunningJobs() { - for (DAGClient c: shutdownList) { - try { - System.err.println("Trying to shutdown DAG"); - c.tryKillDAG(); - } catch (Exception e) { - // ignore + synchronized (shutdownList) { + for (DAGClient c : shutdownList) { + try { + System.err.println("Trying to shutdown DAG"); + c.tryKillDAG(); + } catch (Exception e) { + // ignore + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 3bfe35a..b41a30d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -85,8 +85,6 @@ private boolean inited = false; - - private static TezSessionPoolManager sessionPool = null; private static List openSessions = Collections @@ -137,6 +135,7 @@ public void setupPool(HiveConf conf) throws InterruptedException { + sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") ms"); } expirationQueue = new PriorityBlockingQueue<>(11, new Comparator() { + @Override public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) { assert o1.expirationNs != null && o2.expirationNs != null; return o1.expirationNs.compareTo(o2.expirationNs); @@ -144,11 +143,13 @@ public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) { }); restartQueue = new LinkedBlockingQueue<>(); expirationThread = new Thread(new Runnable() { + @Override public void run() { runExpirationThread(); } }, "TezSessionPool-expiration"); restartThread = new Thread(new Runnable() { + @Override public void run() { runRestartThread(); } @@ -279,13 +280,15 @@ public void stop() throws Exception { return; } - // we can just stop all the sessions - Iterator iter = openSessions.iterator(); - while (iter.hasNext()) { - TezSessionState sessionState = iter.next(); - if (sessionState.isDefault()) { - sessionState.close(false); - iter.remove(); + synchronized (openSessions) { + // we can just stop all the sessions + Iterator iter = openSessions.iterator(); + while (iter.hasNext()) { + TezSessionState sessionState = iter.next(); + if (sessionState.isDefault()) { + sessionState.close(false); + iter.remove(); + } } }