diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 97ba036335..7137a17aae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -339,7 +339,8 @@ private void updateSessionTriggerProvidersOnMasterThread() { private WMFullResourcePlan resourcePlanToApply = null; private boolean doClearResourcePlan = false; private boolean hasClusterStateChanged = false; - private SettableFuture testEvent, applyRpFuture; + private List> testEvents = new LinkedList<>(); + private SettableFuture applyRpFuture; private SettableFuture> dumpStateFuture; private final List moveSessions = new LinkedList<>(); } @@ -401,10 +402,11 @@ private void runWmThread() { return; } catch (Exception | AssertionError ex) { LOG.error("WM thread encountered an error but will attempt to continue", ex); - if (currentEvents.testEvent != null) { - currentEvents.testEvent.setException(ex); - currentEvents.testEvent = null; + for (SettableFuture testEvent : currentEvents.testEvents) { + LOG.info("Failing test event " + System.identityHashCode(testEvent)); + testEvent.setException(ex); } + currentEvents.testEvents.clear(); if (currentEvents.applyRpFuture != null) { currentEvents.applyRpFuture.setException(ex); currentEvents.applyRpFuture = null; @@ -721,12 +723,14 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw e.dumpStateFuture.set(result); e.dumpStateFuture = null; } - + // 15. Notify tests and global async ops. - if (e.testEvent != null) { - e.testEvent.set(true); - e.testEvent = null; + for (SettableFuture testEvent : e.testEvents) { + LOG.info("Triggering test event " + System.identityHashCode(testEvent)); + testEvent.set(null); } + e.testEvents.clear(); + if (e.applyRpFuture != null) { e.applyRpFuture.set(true); e.applyRpFuture = null; @@ -1552,7 +1556,8 @@ private void addKillQueryResult(WmTezSession toKill, boolean success) { SettableFuture testEvent = SettableFuture.create(); currentLock.lock(); try { - current.testEvent = testEvent; + LOG.info("Adding test event " + System.identityHashCode(testEvent)); + current.testEvents.add(testEvent); notifyWmThreadUnderLock(); } finally { currentLock.unlock(); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 6e15b2c007..30ad212545 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -60,7 +60,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@RunWith(RetryTestRunner.class) public class TestWorkloadManager { @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class); @@ -90,8 +89,10 @@ public void run() { if (cdl != null) { cdl.countDown(); } + LOG.info("About to call get with " + old); try { session.set((WmTezSession) wm.getSession(old, mappingInput(userName), conf)); + LOG.info("Received " + session.get()); } catch (Throwable e) { error.compareAndSet(null, e); } @@ -114,7 +115,7 @@ public int updateSessionsAsync(Double totalMaxAlloc, List sessions isCalled = true; return 0; } - + @Override public void updateSessionAsync(WmTezSession session) { } @@ -206,7 +207,7 @@ public void notifyOfClusterStateChange() { } private static WMFullResourcePlan createDummyPlan(int numSessions) { - WMFullResourcePlan plan = new WMFullResourcePlan(new WMResourcePlan("rp"), + WMFullResourcePlan plan = new WMFullResourcePlan(new WMResourcePlan("rp"), Lists.newArrayList(pool("llap", numSessions, 1.0f))); plan.getPlan().setDefaultPoolPath("llap"); return plan; @@ -441,7 +442,7 @@ private static void verifyMapping( session.returnToSessionManager(); } - + @Test(timeout=10000) @@ -477,12 +478,12 @@ public void testQueueing() throws Exception { checkError(error); // Now release a single session from A. sessionA1.returnToSessionManager(); - t1.join(); + joinThread(t1); checkError(error); assertNotNull(sessionA3.get()); assertNull(sessionA4.get()); sessionA3.get().returnToSessionManager(); - t2.join(); + joinThread(t2); checkError(error); assertNotNull(sessionA4.get()); sessionA4.get().returnToSessionManager(); @@ -553,14 +554,14 @@ public void testReuseWithQueueing() throws Exception { assertNull(session4.get()); // We have released the session by trying to reuse it and going back into queue, s3 can start. - t1.join(); + joinThread(t1); checkError(error); assertNotNull(session3.get()); assertEquals(0.5, session3.get().getClusterFraction(), EPSILON); // Now release another session; the thread that gave up on reuse can proceed. session1.returnToSessionManager(); - t2.join(); + joinThread(t2); checkError(error); assertNotNull(session4.get()); assertNotSame(session2, session4.get()); @@ -569,6 +570,12 @@ public void testReuseWithQueueing() throws Exception { session4.get().returnToSessionManager(); } + private static void joinThread(Thread t) throws InterruptedException { + LOG.debug("Joining " + t.getName()); + t.join(); + LOG.debug("Joined " + t.getName()); + } + private void waitForThreadToBlock(CountDownLatch cdl, Thread t1) throws InterruptedException { t1.start(); cdl.await(); @@ -619,7 +626,7 @@ public void testApplyPlanUserMapping() throws Exception { plan.setMappings(Lists.newArrayList(mapping("U", "A"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - + // One session will be running, the other will be queued in "A" WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("U"), conf); assertEquals("A", sessionA1.getPoolName()); @@ -639,7 +646,7 @@ public void testApplyPlanUserMapping() throws Exception { wm.updateResourcePlanAsync(plan); // The session will go to B with the new mapping; check it. - t1.join(); + joinThread(t1); checkError(error); assertNotNull(sessionA2.get()); assertEquals("B", sessionA2.get().getPoolName()); @@ -667,7 +674,7 @@ public void testApplyPlanQpChanges() throws Exception { wm.start(); TezSessionPool tezAmPool = wm.getTezAmPool(); assertEquals(6, tezAmPool.getCurrentSize()); - + // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued. // Total: 5/6 running. WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf), @@ -700,8 +707,8 @@ public void testApplyPlanQpChanges() throws Exception { wm.updateResourcePlanAsync(plan); wm.addTestEvent().get(); - t1.join(); - t2.join(); + joinThread(t1); + joinThread(t2); checkError(error); assertNotNull(sessionA2.get()); assertNotNull(sessionD2.get()); @@ -743,7 +750,7 @@ public void testFifoSchedulingPolicy() throws Exception { plan.getPlan().setDefaultPoolPath("A"); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - + // 2 running. WmTezSession sessionA1 = (WmTezSession) wm.getSession( null, mappingInput("A", null), conf, null), @@ -807,7 +814,7 @@ public void testDisableEnable() throws Exception { // Remove the resource plan - disable WM. All the queries die. wm.updateResourcePlanAsync(null).get(); - t1.join(); + joinThread(t1); assertNotNull(error.get()); assertNull(sessionA2.get()); assertKilledByWm(sessionA1); @@ -843,7 +850,7 @@ public void testAmPoolInteractions() throws Exception { // Take away the only session, as if it was expiring. TezSessionPool pool = wm.getTezAmPool(); WmTezSession oob = pool.getSession(); - + final AtomicReference sessionA1 = new AtomicReference<>(); final AtomicReference error = new AtomicReference<>(); final CountDownLatch cdl1 = new CountDownLatch(1); @@ -852,7 +859,7 @@ public void testAmPoolInteractions() throws Exception { checkError(error); // Replacing it directly in the pool should unblock get. pool.replaceSession(oob); - t1.join(); + joinThread(t1); assertNotNull(sessionA1.get()); assertEquals("A", sessionA1.get().getPoolName()); @@ -1136,7 +1143,7 @@ public void testAsyncSessionInitFailures() throws Exception { wm.updateResourcePlanAsync(plan); wm.addTestEvent().get(); blockedWait.set(true); // Meanwhile, the init succeeds! - t1.join(); + joinThread(t1); try { sessionA.get(); fail("Expected an error but got " + sessionA.get()); @@ -1191,7 +1198,7 @@ public void testAsyncSessionInitFailures() throws Exception { wm.updateResourcePlanAsync(plan); wm.addTestEvent().get(); failedWait.setException(new Exception("moo")); // Meanwhile, the init fails. - t1.join(); + joinThread(t1); try { sessionA.get(); fail("Expected an error but got " + sessionA.get());