diff --git common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java index effe26b6b6..d05c7289e5 100644 --- common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java +++ common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java @@ -226,6 +226,11 @@ public void addGauge(String name, MetricsVariable variable) { } @Override + public void removeGauge(String name) { + //This implementation completely and exhaustively reverses the addGauge method above. + } + + @Override public void addRatio(String name, MetricsVariable numerator, MetricsVariable denominator) { //Not implemented diff --git common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java index 88c513b8cd..99d3e57d84 100644 --- common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java +++ common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java @@ -92,7 +92,15 @@ * @param name name of gauge * @param variable variable to track. */ - public void addGauge(String name, final MetricsVariable variable); + public void addGauge(String name, final MetricsVariable variable); + + + /** + * Removed the gauge added by addGauge. + * @param name name of gauge + */ + public void removeGauge(String name); + /** * Add a ratio metric to track the correlation between two variables diff --git common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java index a43b09db8c..4f35a6da60 100644 --- common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java +++ common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java @@ -294,6 +294,21 @@ public Object getValue() { addGaugeInternal(name, gauge); } + + @Override + public void removeGauge(String name) { + try { + gaugesLock.lock(); + gauges.remove(name); + // Metrics throws an Exception if we don't do this when the key already exists + if (metricRegistry.getGauges().containsKey(name)) { + metricRegistry.remove(name); + } + } finally { + gaugesLock.unlock(); + } + } + @Override public void addRatio(String name, MetricsVariable numerator, MetricsVariable denominator) { @@ -409,6 +424,7 @@ private boolean initCodahaleMetricsReporterClasses() { throw new IllegalArgumentException(e); } try { + // Note: Hadoop metric reporter does not support tags. We create a single reporter for all metrics. Constructor constructor = name.getConstructor(MetricRegistry.class, HiveConf.class); CodahaleReporter reporter = (CodahaleReporter) constructor.newInstance(metricRegistry, conf); reporter.start(); diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8d9b5a3194..de071f8ab9 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2486,6 +2486,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Applies when a user specifies a target WM pool in the JDBC connection string. If\n" + "false, the user can only specify a pool he is mapped to (e.g. make a choice among\n" + "multiple group mappings); if true, the user can specify any existing pool."), + HIVE_SERVER2_WM_POOL_METRICS("hive.server2.wm.pool.metrics", true, + "Whether per-pool WM metrics should be enabled."), HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s", new TimeValidator(TimeUnit.SECONDS), "The timeout for AM registry registration, after which (on attempting to use the\n" + diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 3a2c19a3e6..7451ea44e2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -476,7 +476,7 @@ public void close() throws IOException { isClosed, isInterrupted, pendingError.get(), queue.size()); } LlapIoImpl.LOG.info("Maximum queue length observed " + maxQueueSize); - LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged! + LlapIoImpl.LOG.info("Llap counters: {}" , counters); // This is where counters are logged! feedback.stop(); isClosed = true; rethrowErrorIfAny(pendingError.get()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java index a52928cc7a..d3b4e0799b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -102,13 +102,20 @@ protected int getExecutorCount(boolean allowUpdate) { } @Override - public void updateSessionsAsync(Double totalMaxAlloc, List sessionsToUpdate) { + public int translateAllocationToCpus(double allocation) { + // Do not make a remote call under any circumstances - this is supposed to be async. + return (int)Math.round(getExecutorCount(false) * allocation); + } + + @Override + public int updateSessionsAsync(Double totalMaxAlloc, List sessionsToUpdate) { // Do not make a remote call under any circumstances - this is supposed to be async. int totalCount = getExecutorCount(false); int totalToDistribute = -1; if (totalMaxAlloc != null) { totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc); } + int totalDistributed = 0; double lastDelta = 0; for (int i = 0; i < sessionsToUpdate.size(); ++i) { WmTezSession session = sessionsToUpdate.get(i); @@ -139,8 +146,10 @@ public void updateSessionsAsync(Double totalMaxAlloc, List session totalToDistribute -= intAlloc; } // This will only send update if it's necessary. + totalDistributed += intAlloc; updateSessionAsync(session, intAlloc); } + return totalDistributed; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java index 9885ce7221..32702c042c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java @@ -32,8 +32,14 @@ * avoid various artifacts, esp. with small numbers and double weirdness. * Null means the total is unknown. * @param sessions Sessions to update based on their allocation fraction. + * @return The number of executors/cpus allocated. */ - void updateSessionsAsync(Double totalMaxAlloc, List sessions); + int updateSessionsAsync(Double totalMaxAlloc, List sessions); + + /** + * @return the number of CPUs equivalent to percentage allocation, for information purposes. + */ + int translateAllocationToCpus(double allocation); /** * Sets a callback to be invoked on cluster changes relevant to resource allocation. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java new file mode 100644 index 0000000000..74878a620b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmPoolMetrics.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; +import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableMetric; + +/** + * A wrapper for metrics for single WM pool. For now this is going to use Hadoop metrics directly. + * Need to figure out the best way to convert this to Codahale... the problem is that we need to + * tag metrics separately per pool. Codahale seems to create only one sink with fixed tags. + */ +public class WmPoolMetrics implements MetricsSource { + private final String poolName, sourceName; + private MetricsSystem ms; + @SuppressWarnings("unused") // Metrics system will get this via reflection 0_o + private final MetricsRegistry registry; + + // Codahale. We just include the pool name in the counter name. + private List codahaleGaugeNames; + private Map allMetrics; + + @Metric("Number of guaranteed cluster executors given to queries") + MutableGaugeInt numExecutors; + @Metric("Number of guaranteed cluster executors allocated") + MutableGaugeInt numExecutorsMax; + @Metric("Number of parallel queries allowed to run") + MutableGaugeInt numParallelQueries; + @Metric("Number of queries running") + MutableCounterInt numRunningQueries; + @Metric("Number of queries queued") + MutableCounterInt numQueuedQueries; + + // TODO: these would need to be propagated from AM via progress. + // @Metric("Number of allocated guaranteed executors in use"), + // @Metric("Number of speculative executors in use") + + public WmPoolMetrics(String poolName, MetricsSystem ms) { + this.poolName = poolName; + this.sourceName = "WmPoolMetrics." + poolName; + this.ms = ms; + + this.registry = new MetricsRegistry(sourceName); + } + + + public void initAfterRegister() { + // Make sure we capture the same metrics as Hadoop2 metrics system, via annotations. + allMetrics = new HashMap<>(); + for (Field field : this.getClass().getDeclaredFields()) { + for (Annotation annotation : field.getAnnotations()) { + if (!(annotation instanceof Metric)) continue; + try { + field.setAccessible(true); + allMetrics.put(field.getName(), (MutableMetric) field.get(this)); + } catch (IllegalAccessException ex) { + break; // Not expected, access by the same class. + } + break; + } + } + + // Set up codahale if enabled; we cannot tag the values so just prefix them for the JMX view. + Metrics chMetrics = MetricsFactory.getInstance(); + if (!(chMetrics instanceof CodahaleMetrics)) return; + + List codahaleNames = new ArrayList<>(); + for (Map.Entry e : allMetrics.entrySet()) { + MutableMetric metric = e.getValue(); + MetricsVariable var = null; + if (metric instanceof MutableCounterInt) { + var = new CodahaleCounterWrapper((MutableCounterInt) metric); + } else if (metric instanceof MutableGaugeInt) { + var = new CodahaleGaugeWrapper((MutableGaugeInt) metric); + } + if (var == null) continue; // Unexpected metric type. + String name = "WM_" + poolName + "_" + e.getKey(); + codahaleNames.add(name); + chMetrics.addGauge(name, var); + } + this.codahaleGaugeNames = codahaleNames; + } + + + public void setParallelQueries(int size) { + numParallelQueries.set(size); + } + + public void setExecutors(int allocation) { + numExecutors.set(allocation); + } + + public void setMaxExecutors(int allocation) { + numExecutorsMax.set(allocation); + } + + public void addQueuedQuery() { + numQueuedQueries.incr(); + } + + public void addRunningQuery() { + numRunningQueries.incr(); + } + + public void removeQueuedQueries(int num) { + numQueuedQueries.incr(-num); + } + + public void removeRunningQueries(int num) { + numRunningQueries.incr(-num); + } + + public void moveQueuedToRunning() { + numQueuedQueries.incr(-1); + numRunningQueries.incr(); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + // We could also have one metricssource for all the pools and add all the pools to the collector + // in its getMetrics call (as separate records). Not clear if that's supported. + // Also, we'd have to initialize the metrics ourselves instead of using @Metric annotation. + MetricsRecordBuilder rb = collector.addRecord("WmPoolMetrics." + poolName) + .setContext("HS2").tag(SessionId, poolName); + for (MutableMetric metric : allMetrics.values()) { + metric.snapshot(rb, all); + } + } + + public static WmPoolMetrics create(String poolName, MetricsSystem ms) { + WmPoolMetrics metrics = new WmPoolMetrics(poolName, ms); + metrics = ms.register(metrics.sourceName, "WM " + poolName + " pool metrics", metrics); + metrics.initAfterRegister(); + return metrics; + } + + public void destroy() { + ms.unregisterSource(sourceName); + ms = null; + if (codahaleGaugeNames != null) { + Metrics metrics = MetricsFactory.getInstance(); + for (String chgName : codahaleGaugeNames) { + metrics.removeGauge(chgName); + } + codahaleGaugeNames = null; + } + } + + private static class CodahaleGaugeWrapper implements MetricsVariable { + private final MutableGaugeInt mm; + + public CodahaleGaugeWrapper(MutableGaugeInt mm) { + this.mm = mm; + } + + @Override + public Integer getValue() { + return mm.value(); + } + } + + private static class CodahaleCounterWrapper implements MetricsVariable { + private final MutableCounterInt mm; + + public CodahaleCounterWrapper(MutableCounterInt mm) { + this.mm = mm; + } + + @Override + public Integer getValue() { + return mm.value(); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index f0e620c684..71ce71c091 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -50,6 +51,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -70,6 +72,8 @@ import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.hive.ql.wm.WmContext; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hive.common.util.Ref; import org.apache.tez.dag.api.TezConfiguration; import org.codehaus.jackson.annotate.JsonAutoDetect; @@ -107,8 +111,9 @@ private final String yarnQueue; private final int amRegistryTimeoutMs; private final boolean allowAnyPool; + private final MetricsSystem metricsSystem; // Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool - // sessions, so the pool itself could internally track the sessions it gave out, since + // sessions, so the pool itself could internally track the ses sions it gave out, since // calling close on an unopened session is probably harmless. private final IdentityHashMap openSessions = new IdentityHashMap<>(); @@ -216,6 +221,11 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso .setDaemon(true).setNameFormat("Workload management timeout thread").build()); allowAnyPool = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_POOL_METRICS)) { + metricsSystem = DefaultMetricsSystem.instance(); + } else { + metricsSystem = null; + } wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); @@ -755,7 +765,7 @@ private void handleMoveSessionOnMasterThread(final MoveSession moveSession, WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE); // remove from src pool RemoveSessionResult rr = checkAndRemoveSessionFromItsPool( - moveSession.srcSession, poolsToRedistribute, true); + moveSession.srcSession, poolsToRedistribute, true, true); if (rr == RemoveSessionResult.OK) { // check if there is capacity in dest pool, if so move else kill the session if (capacityAvailable(destPoolName)) { @@ -859,7 +869,7 @@ private RemoveSessionResult handleReturnedInUseSessionOnMasterThread( reuseRequest.future.setException(new AssertionError("Invalid reuse attempt")); } session.setQueryId(null); - return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn); + return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn, true); } private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session, @@ -876,7 +886,9 @@ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session // anything. Instead, we will try to give out an existing session from the pool, and restart // the problematic one in background. String poolName = session.getPoolName(); - RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, false); + // Do not update metrics, we'd immediately add the session back if we are able to remove. + RemoveSessionResult rr = checkAndRemoveSessionFromItsPool( + session, poolsToRedistribute, false, false); switch (rr) { case OK: // If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK. @@ -885,6 +897,7 @@ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session session.getWmContext(), session.extractHiveResources()); // We have just removed the session from the same pool, so don't check concurrency here. pool.initializingSessions.add(sw); + // Do not update metrics - see above. sw.start(); syncWork.toRestartInUse.add(session); return; @@ -920,10 +933,10 @@ private void handleUpdateErrorOnMasterThread(WmTezSession session, // This session is bad, so don't allow reuse; just convert it to normal get. reuseRequest.sessionToReuse = null; } - // TODO: we should communicate this to the user more explicitly (use kill query API, or - // add an option for bg kill checking to TezTask/monitor? + // We are assuming the update-error AM is bad and just try to kill it. - RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, null); + RemoveSessionResult rr = checkAndRemoveSessionFromItsPool( + session, poolsToRedistribute, null, true); switch (rr) { case OK: case NOT_FOUND: @@ -989,7 +1002,7 @@ private void applyNewResourcePlanOnMasterThread( } PoolState state = oldPools == null ? null : oldPools.remove(fullName); if (state == null) { - state = new PoolState(fullName, qp, fraction, pool.getSchedulingPolicy()); + state = new PoolState(fullName, qp, fraction, pool.getSchedulingPolicy(), metricsSystem); } else { // This will also take care of the queries if query parallelism changed. state.update(qp, fraction, syncWork, e, pool.getSchedulingPolicy()); @@ -1001,6 +1014,12 @@ private void applyNewResourcePlanOnMasterThread( totalQueryParallelism += qp; } } + for (PoolState pool : pools.values()) { + if (pool.metrics != null) { + pool.metrics.setMaxExecutors( + allocationManager.translateAllocationToCpus(pool.finalFractionRemaining)); + } + } // TODO: in the current impl, triggers are added to RP. For tez, no pool triggers (mapping between trigger name and // pool name) will exist which means all triggers applies to tez. For LLAP, pool triggers has to exist for attaching // triggers to specific pools. @@ -1094,12 +1113,18 @@ private void queueGetRequestOnMasterThread( String oldPoolName = req.sessionToReuse.getPoolName(); oldPool = pools.get(oldPoolName); RemoveSessionResult rr = checkAndRemoveSessionFromItsPool( - req.sessionToReuse, poolsToRedistribute, true); + req.sessionToReuse, poolsToRedistribute, true, false); if (rr != RemoveSessionResult.OK) { + if (oldPool.metrics != null) { + oldPool.metrics.removeRunningQueries(1); + } // Abandon the reuse attempt. returnSessionOnFailedReuse(req, syncWork, null); req.sessionToReuse = null; } else if (pool.getTotalActiveSessions() + pool.queue.size() >= pool.queryParallelism) { + if (oldPool.metrics != null) { + oldPool.metrics.removeRunningQueries(1); + } // One cannot simply reuse the session if there are other queries waiting; to maintain // fairness, we'll try to take a query slot instantly, and if that fails we'll return // this session back to the pool and give the user a new session later. @@ -1113,6 +1138,7 @@ private void queueGetRequestOnMasterThread( req.sessionToReuse.setPoolName(poolName); req.sessionToReuse.setQueueName(yarnQueue); req.sessionToReuse.setQueryId(req.queryId); + // Do not update metrics - we didn't update on removal. pool.sessions.add(req.sessionToReuse); if (pool != oldPool) { poolsToRedistribute.add(poolName); @@ -1123,6 +1149,9 @@ private void queueGetRequestOnMasterThread( // Otherwise, queue the session and make sure we update this pool. pool.queue.addLast(req); + if (pool.metrics != null) { + pool.metrics.addQueuedQuery(); + } poolsToRedistribute.add(poolName); } @@ -1134,7 +1163,7 @@ private void processPoolChangesOnMasterThread( // 1. First, start the queries from the queue. int queriesToStart = Math.min(pool.queue.size(), - pool.queryParallelism - pool.getTotalActiveSessions()); + pool.queryParallelism - pool.getTotalActiveSessions()); if (queriesToStart > 0) { LOG.info("Starting {} queries in pool {}", queriesToStart, pool); @@ -1145,6 +1174,9 @@ private void processPoolChangesOnMasterThread( } for (int i = 0; i < queriesToStart; ++i) { GetRequest queueReq = pool.queue.pollFirst(); + if (pool.metrics != null) { + pool.metrics.moveQueuedToRunning(); + } assert queueReq.sessionToReuse == null; // Note that in theory, we are guaranteed to have a session waiting for us here, but // the expiration, failures, etc. may cause one to be missing pending restart. @@ -1170,7 +1202,14 @@ private void processPoolChangesOnMasterThread( // logic to be consistent between all the separate calls in one master thread processing round. // Note: If allocation manager does not have cluster state, it won't update anything. When the // cluster state changes, it will notify us, and we'd update the queries again. - allocationManager.updateSessionsAsync(totalAlloc, pool.sessions); + int cpusAllocated = allocationManager.updateSessionsAsync(totalAlloc, pool.sessions); + if (pool.metrics != null) { + pool.metrics.setExecutors(cpusAllocated); + if (cpusAllocated > 0) { + // Update max executors now that cluster info is definitely available. + pool.metrics.setMaxExecutors(allocationManager.translateAllocationToCpus(totalAlloc)); + } + } } private void returnSessionOnFailedReuse( @@ -1181,7 +1220,7 @@ private void returnSessionOnFailedReuse( session.setQueryId(null); if (poolsToRedistribute != null) { RemoveSessionResult rr = checkAndRemoveSessionFromItsPool( - session, poolsToRedistribute, true); + session, poolsToRedistribute, true, true); // The session cannot have been killed just now; this happens after all the kills in // the current iteration, so we would have cleared sessionToReuse when killing this. boolean isOk = (rr == RemoveSessionResult.OK); @@ -1217,8 +1256,8 @@ private void returnSessionOnFailedReuse( * thread (so we are dealing with an outdated request); null if the session should be * in WM but wasn't found in the requisite pool (internal error?). */ - private RemoveSessionResult checkAndRemoveSessionFromItsPool( - WmTezSession session, Set poolsToRedistribute, Boolean isSessionOk) { + private RemoveSessionResult checkAndRemoveSessionFromItsPool(WmTezSession session, + Set poolsToRedistribute, Boolean isSessionOk, boolean updateMetrics) { // It is possible for some request to be queued after a main thread has decided to kill this // session; on the next iteration, we'd be processing that request with an irrelevant session. if (session.isIrrelevantForWm()) { @@ -1237,6 +1276,9 @@ private RemoveSessionResult checkAndRemoveSessionFromItsPool( PoolState pool = pools.get(poolName); session.clearWm(); if (pool != null && pool.sessions.remove(session)) { + if (updateMetrics && pool.metrics != null) { + pool.metrics.removeRunningQueries(1); + } return RemoveSessionResult.OK; } } @@ -1255,6 +1297,9 @@ private Boolean checkAndAddSessionToAnotherPool( PoolState destPool = pools.get(destPoolName); if (destPool != null && destPool.sessions.add(session)) { + if (destPool.metrics != null) { + destPool.metrics.addRunningQuery(); + } session.setPoolName(destPoolName); updateTriggers(session); poolsToRedistribute.add(destPoolName); @@ -1675,6 +1720,7 @@ Runnable getTriggerValidatorRunnable() { // Note: the list is expected to be a few items; if it's longer we may want an IHM. private final LinkedList sessions = new LinkedList<>(); private final LinkedList queue = new LinkedList<>(); + private final WmPoolMetrics metrics; private final String fullName; private double finalFraction; @@ -1684,8 +1730,9 @@ Runnable getTriggerValidatorRunnable() { private WMPoolSchedulingPolicy schedulingPolicy; public PoolState(String fullName, int queryParallelism, double fraction, - String schedulingPolicy) { + String schedulingPolicy, MetricsSystem ms) { this.fullName = fullName; + this.metrics = ms == null ? null : WmPoolMetrics.create(fullName, ms); update(queryParallelism, fraction, null, null, schedulingPolicy); } @@ -1697,6 +1744,9 @@ public void update(int queryParallelism, double fraction, WmThreadSyncWork syncWork, EventState e, String schedulingPolicy) { this.finalFraction = this.finalFractionRemaining = fraction; this.queryParallelism = queryParallelism; + if (metrics != null) { + metrics.setParallelQueries(queryParallelism); + } try { this.schedulingPolicy = MetaStoreUtils.parseSchedulingPolicy(schedulingPolicy); } catch (IllegalArgumentException ex) { @@ -1716,6 +1766,10 @@ public void update(int queryParallelism, double fraction, // We will requeue, and not kill, the queries that are not running yet. // Insert them all before the get requests from this iteration. GetRequest req; + if (metrics != null) { + metrics.removeQueuedQueries(queue.size()); + } + while ((req = queue.pollLast()) != null) { e.getRequests.addFirst(req); } @@ -1727,6 +1781,10 @@ public void destroy(WmThreadSyncWork syncWork, // All the pending get requests should just be requeued elsewhere. // Note that we never queue session reuse so sessionToReuse would be null. globalQueue.addAll(0, queue); + if (metrics != null) { + metrics.removeQueuedQueries(queue.size()); + metrics.destroy(); + } queue.clear(); } @@ -1774,6 +1832,7 @@ public String toString() { private void extractAllSessionsToKill(String killReason, IdentityHashMap toReuse, WmThreadSyncWork syncWork) { + int totalCount = sessions.size() + initializingSessions.size(); for (WmTezSession sessionToKill : sessions) { resetRemovedSessionToKill(syncWork.toKillQuery, new KillQueryContext(sessionToKill, killReason), toReuse); @@ -1791,6 +1850,9 @@ private void extractAllSessionsToKill(String killReason, new KillQueryContext(sessionToKill, killReason), toReuse); } initializingSessions.clear(); + if (metrics != null) { + metrics.removeRunningQueries(totalCount); + } } public void setTriggers(final LinkedList triggers) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java index a14cdb609a..b0c1659bfe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java @@ -94,6 +94,7 @@ if (progress != null) { // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0 + // TODO: can we pass custom things thru the progress? results.add( Arrays.asList( getNameWithProgress(vertexName, progress.succeededTaskCount, progress.totalTaskCount), diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 20a5947291..fb32e90516 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -107,8 +107,9 @@ public void stop() { } @Override - public void updateSessionsAsync(Double totalMaxAlloc, List sessions) { + public int updateSessionsAsync(Double totalMaxAlloc, List sessions) { isCalled = true; + return 0; } @Override @@ -123,6 +124,11 @@ void assertWasCalledAndReset() { @Override public void setClusterChangedCallback(Runnable clusterChangedCallback) { } + + @Override + public int translateAllocationToCpus(double allocation) { + return 0; + } } public static WMResourcePlan plan() {