diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 3bfe35a..077d868 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -85,8 +85,6 @@ private boolean inited = false; - - private static TezSessionPoolManager sessionPool = null; private static List openSessions = Collections @@ -137,6 +135,7 @@ public void setupPool(HiveConf conf) throws InterruptedException { + sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") ms"); } expirationQueue = new PriorityBlockingQueue<>(11, new Comparator() { + @Override public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) { assert o1.expirationNs != null && o2.expirationNs != null; return o1.expirationNs.compareTo(o2.expirationNs); @@ -144,11 +143,13 @@ public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) { }); restartQueue = new LinkedBlockingQueue<>(); expirationThread = new Thread(new Runnable() { + @Override public void run() { runExpirationThread(); } }, "TezSessionPool-expiration"); restartThread = new Thread(new Runnable() { + @Override public void run() { runRestartThread(); } @@ -279,13 +280,15 @@ public void stop() throws Exception { return; } - // we can just stop all the sessions - Iterator iter = openSessions.iterator(); - while (iter.hasNext()) { - TezSessionState sessionState = iter.next(); - if (sessionState.isDefault()) { - sessionState.close(false); - iter.remove(); + synchronized (openSessions) { + // we can just stop all the sessions + Iterator iter = openSessions.iterator(); + while (iter.hasNext()) { + TezSessionState sessionState = iter.next(); + if (sessionState.isDefault()) { + sessionState.close(false); + iter.remove(); + } } } @@ -522,7 +525,9 @@ public void close(boolean keepTmpDir) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Closed a pool session [" + this + "]"); } - openSessions.remove(this); + synchronized (openSessions) { + openSessions.remove(this); + } if (parent.expirationQueue != null) { parent.expirationQueue.remove(this); } @@ -534,7 +539,9 @@ protected void openInternal(HiveConf conf, Collection additionalFiles, boolean isAsync, LogHelper console, Path scratchDir) throws IOException, LoginException, URISyntaxException, TezException { super.openInternal(conf, additionalFiles, isAsync, console, scratchDir); - openSessions.add(this); + synchronized (openSessions) { + openSessions.add(this); + } if (parent.expirationQueue != null) { long jitterModMs = (long)(parent.sessionLifetimeJitterMs * rdm.nextFloat()); expirationNs = System.nanoTime() + (parent.sessionLifetimeMs + jitterModMs) * 1000000L;