diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java index a52928cc7a..8f9fca9ba5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -122,6 +122,7 @@ public void updateSessionsAsync(Double totalMaxAlloc, List session // we'd produce 2-2-2-2-0 as we round 1.6; whereas adding the last delta to the next query // we'd round 1.6-1.2-1.8-1.4-2.0 and thus give out 2-1-2-1-2, as intended. // Note that fractions don't have to all be the same like in this example. + assert session.hasClusterFraction(); double fraction = session.getClusterFraction(); double allocation = fraction * totalCount + lastDelta; double roundedAlloc = Math.round(allocation); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java index 33341ad4a9..fae68ef75b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmEvent.java @@ -31,6 +31,7 @@ private static final Logger LOG = LoggerFactory.getLogger(WmEvent.class); enum EventType { GET, // get session + UPDATE, // update session allocation KILL, // kill query DESTROY, // destroy session RESTART, // restart session @@ -51,7 +52,8 @@ WmTezSessionInfo(WmTezSession wmTezSession) { this.poolName = wmTezSession.getPoolName(); this.sessionId = wmTezSession.getSessionId(); - this.clusterPercent = wmTezSession.getClusterFraction() * 100.0; + this.clusterPercent = wmTezSession.hasClusterFraction() + ? wmTezSession.getClusterFraction() * 100.0 : 0; } public String getPoolName() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index 1cf5493959..fa2b02e591 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -38,7 +38,7 @@ @JsonProperty("poolName") private String poolName; @JsonProperty("clusterFraction") - private double clusterFraction; + private Double clusterFraction; /** * The reason to kill an AM. Note that this is for the entire session, not just for a query. * Once set, this can never be unset because you can only kill the session once. @@ -174,7 +174,11 @@ void setClusterFraction(double fraction) { void clearWm() { this.poolName = null; - this.clusterFraction = 0f; + this.clusterFraction = null; + } + + public boolean hasClusterFraction() { + return this.clusterFraction != null; } public double getClusterFraction() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index f0e620c684..10c1bdc286 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -23,11 +23,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.math.DoubleMath; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -62,6 +65,7 @@ import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; +import org.apache.hadoop.hive.ql.exec.tez.WmEvent.EventType; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; @@ -721,14 +725,16 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw private void dumpPoolState(PoolState ps, List set) { StringBuilder sb = new StringBuilder(); - sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism).append(", %% ") - .append(ps.finalFraction).append(", sessions: ").append(ps.sessions.size()) - .append(", initializing: ").append(ps.initializingSessions.size()).append(", queued: ").append(ps.queue.size()); + sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism) + .append(", %% ").append(ps.finalFraction).append(", sessions: ").append(ps.sessions.size()) + .append(", initializing: ").append(ps.initializingSessions.size()).append(", queued: ") + .append(ps.queue.size()); set.add(sb.toString()); sb.setLength(0); for (WmTezSession session : ps.sessions) { - sb.append("RUNNING: ").append(session.getClusterFraction()).append(" (") - .append(session.getAllocationState()).append(") => ").append(session.getSessionId()); + double cf = session.hasClusterFraction() ? session.getClusterFraction() : 0; + sb.append("RUNNING: ").append(cf).append(" (") .append(session.getAllocationState()) + .append(") => ").append(session.getSessionId()); set.add(sb.toString()); sb.setLength(0); } @@ -1737,7 +1743,7 @@ public double updateAllocationPercentages() { if (totalSessions == 0) return 0; double allocation = finalFractionRemaining / totalSessions; for (WmTezSession session : sessions) { - session.setClusterFraction(allocation); + updateSessionAllocationWithEvent(session, allocation); } // Do not give out the capacity of the initializing sessions to the running ones; // we expect init to be fast. @@ -1746,7 +1752,7 @@ public double updateAllocationPercentages() { if (sessions.isEmpty()) return 0; boolean isFirst = true; for (WmTezSession session : sessions) { - session.setClusterFraction(isFirst ? finalFractionRemaining : 0); + updateSessionAllocationWithEvent(session, isFirst ? finalFractionRemaining : 0); isFirst = false; } return finalFractionRemaining; @@ -1755,6 +1761,19 @@ public double updateAllocationPercentages() { } } + private void updateSessionAllocationWithEvent(WmTezSession session, double allocation) { + WmEvent event = null; + WmContext ctx = session.getWmContext(); + if (ctx != null && session.hasClusterFraction() + && !DoubleMath.fuzzyEquals(session.getClusterFraction(), allocation, 0.0001f)) { + event = new WmEvent(EventType.UPDATE); + } + session.setClusterFraction(allocation); + if (event != null) { + event.endEvent(session); + } + } + public LinkedList getSessions() { return sessions; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 20a5947291..fc06f1f525 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -308,7 +308,7 @@ public void testReopen() throws Exception { assertNotSame(session, session2); wm.addTestEvent().get(); assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON); - assertEquals(0.0, session.getClusterFraction(), EPSILON); + assertFalse(session.hasClusterFraction()); qam.assertWasCalledAndReset(); } @@ -329,14 +329,14 @@ public void testDestroyAndReturn() throws Exception { assertNotSame(session, session2); session.destroy(); // Destroy before returning to the pool. assertEquals(1.0, session2.getClusterFraction(), EPSILON); - assertEquals(0.0, session.getClusterFraction(), EPSILON); + assertFalse(session.hasClusterFraction()); qam.assertWasCalledAndReset(); // We never lose pool session, so we should still be able to get. session = (WmTezSession) wm.getSession(null, mappingInput("user"), conf); session.returnToSessionManager(); assertEquals(1.0, session2.getClusterFraction(), EPSILON); - assertEquals(0.0, session.getClusterFraction(), EPSILON); + assertFalse(session.hasClusterFraction()); qam.assertWasCalledAndReset(); } @@ -1089,7 +1089,7 @@ public void testMoveSessionsMultiPool() throws Exception { assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); assertEquals(0, allSessionProviders.get("C").getSessions().size()); - assertEquals(0.0f, sessionA1.getClusterFraction(), EPSILON); + assertFalse(sessionA1.hasClusterFraction()); assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1)); } @@ -1207,7 +1207,7 @@ private SampleTezSessionState validatePoolAfterCleanup( assertNotNull(theOnlySession); theOnlySession.setWaitForAmRegistryFuture(null); assertNull(oldSession.getPoolName()); - assertEquals(0f, oldSession.getClusterFraction(), EPSILON); + assertFalse(oldSession.hasClusterFraction()); pool.returnSession(theOnlySession); // Make sure we can actually get a session still - parallelism/etc. should not be affected. WmTezSession result = (WmTezSession) wm.getSession(null, mappingInput("A"), conf); @@ -1219,7 +1219,7 @@ private SampleTezSessionState validatePoolAfterCleanup( private void assertKilledByWm(WmTezSession session) { assertNull(session.getPoolName()); - assertEquals(0f, session.getClusterFraction(), EPSILON); + assertFalse(session.hasClusterFraction()); assertTrue(session.isIrrelevantForWm()); }