diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DumpingMetricsCollector.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DumpingMetricsCollector.java new file mode 100644 index 0000000000..efa72da155 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/DumpingMetricsCollector.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsTag; + +import java.util.Map; + +public final class DumpingMetricsCollector implements MetricsCollector { + private MetricsRecordBuilder mrb; + + public DumpingMetricsCollector(Map data) { + mrb = new DumpingMetricsRecordBuilder(data); + } + + @Override + public MetricsRecordBuilder addRecord(String s) { + return mrb; + } + + @Override + public MetricsRecordBuilder addRecord(MetricsInfo metricsInfo) { + return mrb; + } + + private final class DumpingMetricsRecordBuilder extends MetricsRecordBuilder { + private Map data; + + private DumpingMetricsRecordBuilder(Map data) { + this.data = data; + } + + @Override + public MetricsCollector parent() { + throw new UnsupportedOperationException(); + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo metricsInfo, String s) { + return this; + } + + @Override + public MetricsRecordBuilder add(MetricsTag metricsTag) { + return this; + } + + @Override + public MetricsRecordBuilder add(AbstractMetric abstractMetric) { + throw new UnsupportedOperationException(); + } + + @Override + public MetricsRecordBuilder setContext(String s) { + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo metricsInfo, int i) { + data.put(metricsInfo.name(), Long.valueOf(i)); + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo metricsInfo, long l) { + data.put(metricsInfo.name(), l); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, int i) { + data.put(metricsInfo.name(), Long.valueOf(i)); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, long l) { + data.put(metricsInfo.name(), l); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, float v) { + data.put(metricsInfo.name(), Long.valueOf(Math.round(v))); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, double v) { + data.put(metricsInfo.name(), Math.round(v)); + return this; + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index cbc5336e5d..526fbf66a0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -284,13 +284,12 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor } } } - this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, + this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, waitQueueSize, Ints.toArray(intervalList), timedWindowAverageDataPoints, timedWindowAverageWindowLength, simpleAverageWindowDataSize); this.metrics.setMemoryPerInstance(executorMemoryPerInstance); this.metrics.setCacheMemoryPerInstance(ioMemoryBytes); this.metrics.setJvmMaxMemory(maxJvmMemory); - this.metrics.setWaitQueueSize(waitQueueSize); this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this); LOG.info("Started LlapMetricsSystem with displayName: " + displayName + diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index bb03727e1d..c2ac583661 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -31,11 +31,6 @@ import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; -import org.apache.hadoop.metrics2.AbstractMetric; -import org.apache.hadoop.metrics2.MetricsCollector; -import org.apache.hadoop.metrics2.MetricsInfo; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.metrics2.MetricsTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -348,91 +343,4 @@ private boolean determineIfSigningIsRequired(UserGroupInformation callingUser) { default: throw new AssertionError("Unknown value " + isSigningRequiredConfig); } } - - private final class DumpingMetricsRecordBuilder extends MetricsRecordBuilder { - private Map data; - - private DumpingMetricsRecordBuilder(Map data) { - this.data = data; - } - - @Override - public MetricsCollector parent() { - throw new UnsupportedOperationException(); - } - - @Override - public MetricsRecordBuilder tag(MetricsInfo metricsInfo, String s) { - return this; - } - - @Override - public MetricsRecordBuilder add(MetricsTag metricsTag) { - return this; - } - - @Override - public MetricsRecordBuilder add(AbstractMetric abstractMetric) { - throw new UnsupportedOperationException(); - } - - @Override - public MetricsRecordBuilder setContext(String s) { - return this; - } - - @Override - public MetricsRecordBuilder addCounter(MetricsInfo metricsInfo, int i) { - data.put(metricsInfo.name(), Long.valueOf(i)); - return this; - } - - @Override - public MetricsRecordBuilder addCounter(MetricsInfo metricsInfo, long l) { - data.put(metricsInfo.name(), l); - return this; - } - - @Override - public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, int i) { - data.put(metricsInfo.name(), Long.valueOf(i)); - return this; - } - - @Override - public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, long l) { - data.put(metricsInfo.name(), l); - return this; - } - - @Override - public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, float v) { - data.put(metricsInfo.name(), Long.valueOf(Math.round(v))); - return this; - } - - @Override - public MetricsRecordBuilder addGauge(MetricsInfo metricsInfo, double v) { - data.put(metricsInfo.name(), Math.round(v)); - return this; - } - } - - private final class DumpingMetricsCollector implements MetricsCollector { - private MetricsRecordBuilder mrb; - - DumpingMetricsCollector(Map data) { - mrb = new DumpingMetricsRecordBuilder(data); - } - - @Override - public MetricsRecordBuilder addRecord(String s) { - return mrb; - } - - @Override - public MetricsRecordBuilder addRecord(MetricsInfo metricsInfo) { - return mrb; - } - } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index dd6689113c..246989467c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -67,6 +67,8 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static com.google.common.base.Preconditions.checkNotNull; + /** * Task executor service provides method for scheduling tasks. Tasks submitted to executor service * are submitted to wait queue for scheduling. Wait queue tasks are ordered based on the priority @@ -141,6 +143,11 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption, ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics, Clock clock) { super(TaskExecutorService.class.getSimpleName()); + + checkNotNull(waitQueueComparatorClassName, "required argument 'waitQueueComparatorClassName' is null"); + checkNotNull(classLoader, "required argument 'classLoader' is null"); + checkNotNull(metrics, "required argument 'metrics' is null"); + LOG.info("TaskExecutorService is being setup with parameters: " + "numExecutors=" + numExecutors + ", waitQueueSize=" + waitQueueSize @@ -164,9 +171,9 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, this.enablePreemption = enablePreemption; this.numSlotsAvailable = new AtomicInteger(numExecutors); this.metrics = metrics; - if (metrics != null) { - metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); - } + this.metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); + this.metrics.setNumExecutors(numExecutors); + this.metrics.setWaitQueueSize(waitQueueSize); // single threaded scheduler for tasks from wait queue to executor threads ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() @@ -213,6 +220,8 @@ public synchronized void setCapacity(int newNumExecutors, int newWaitQueueSize) numSlotsAvailable.addAndGet(newNumExecutors - maxParallelExecutors); maxParallelExecutors = newNumExecutors; waitQueue.setWaitQueueSize(newWaitQueueSize); + metrics.setNumExecutors(newNumExecutors); + metrics.setWaitQueueSize(newWaitQueueSize); LOG.info("TaskExecutorService is setting capacity to: numExecutors=" + newNumExecutors + ", waitQueueSize=" + newWaitQueueSize); } @@ -409,9 +418,7 @@ public void run() { // Wait queue could have been re-ordered in the mean time because of concurrent task // submission. So remove the specific task instead of the head task. if (waitQueue.remove(task)) { - if (metrics != null) { - metrics.setExecutorNumQueuedRequests(waitQueue.size()); - } + metrics.setExecutorNumQueuedRequests(waitQueue.size()); } lastKillTimeMs = null; // We have filled the spot we may have killed for (if any). } catch (RejectedExecutionException e) { @@ -531,9 +538,7 @@ public SubmissionState schedule(TaskRunnerCallable task) { if (LOG.isDebugEnabled()) { LOG.debug("{} is {} as wait queue is full", taskWrapper.getRequestId(), result); } - if (metrics != null) { - metrics.incrTotalRejectedRequests(); - } + metrics.incrTotalRejectedRequests(); return result; } @@ -575,17 +580,13 @@ public SubmissionState schedule(TaskRunnerCallable task) { // to go out before the previous submissions has completed. Handled in the AM evictedTask.getTaskRunnerCallable().killTask(); } - if (metrics != null) { - metrics.incrTotalEvictedFromWaitQueue(); - } + metrics.incrTotalEvictedFromWaitQueue(); } synchronized (lock) { lock.notifyAll(); } + metrics.setExecutorNumQueuedRequests(waitQueue.size()); - if (metrics != null) { - metrics.setExecutorNumQueuedRequests(waitQueue.size()); - } return result; } @@ -656,9 +657,7 @@ public void killFragment(String fragmentId) { taskWrapper.setIsInWaitQueue(false); taskWrapper.getTaskRunnerCallable().setWmCountersDone(); if (waitQueue.remove(taskWrapper)) { - if (metrics != null) { - metrics.setExecutorNumQueuedRequests(waitQueue.size()); - } + metrics.setExecutorNumQueuedRequests(waitQueue.size()); } } if (taskWrapper.isInPreemptionQueue()) { @@ -748,9 +747,7 @@ void tryScheduleUnderLock(final TaskWrapper taskWrapper) throws RejectedExecutio } } numSlotsAvailable.decrementAndGet(); - if (metrics != null) { - metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); - } + metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); } private boolean handleScheduleAttemptedRejection(TaskWrapper rejected) { @@ -863,9 +860,7 @@ private void addToPreemptionQueue(TaskWrapper taskWrapper) { synchronized (lock) { insertIntoPreemptionQueueOrFailUnlocked(taskWrapper); taskWrapper.setIsInPreemptableQueue(true); - if (metrics != null) { - metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); - } + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); } } @@ -893,9 +888,8 @@ private boolean removeFromPreemptionQueueUnlocked( TaskWrapper taskWrapper) { boolean removed = preemptionQueue.remove(taskWrapper); taskWrapper.setIsInPreemptableQueue(false); - if (metrics != null) { - metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); - } + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + return removed; } @@ -913,9 +907,8 @@ private TaskWrapper getSuitableVictimFromPreemptionQueue(TaskWrapper candidate) return null; } taskWrapper.setIsInPreemptableQueue(false); - if (metrics != null) { - metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); - } + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + return taskWrapper; } } @@ -970,10 +963,8 @@ public void onSuccess(TaskRunner2Result result) { taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); taskWrapper.getTaskRunnerCallable().setWmCountersDone(); - if (metrics != null) { - metrics.addMetricsQueueTime(taskWrapper.getTaskRunnerCallable().getQueueTime()); - metrics.addMetricsRunningTime(taskWrapper.getTaskRunnerCallable().getRunningTime()); - } + metrics.addMetricsQueueTime(taskWrapper.getTaskRunnerCallable().getQueueTime()); + metrics.addMetricsRunningTime(taskWrapper.getTaskRunnerCallable().getRunningTime()); updatePreemptionListAndNotify(result.getEndReason()); taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); } @@ -1007,9 +998,7 @@ private void updatePreemptionListAndNotify(EndReason reason) { } numSlotsAvailable.incrementAndGet(); - if (metrics != null) { - metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); - } + metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); if (LOG.isDebugEnabled()) { LOG.debug("Task {} complete. WaitQueueSize={}, numSlotsAvailable={}, preemptionQueueSize={}", taskWrapper.getRequestId(), waitQueue.size(), numSlotsAvailable.get(), @@ -1040,19 +1029,13 @@ private void updateFallOffStats( long timeTaken = now - fragmentCompletion.completingTime; switch (fragmentCompletion.state) { case SUCCESS: - if (metrics != null) { - metrics.addMetricsFallOffSuccessTimeLost(timeTaken); - } + metrics.addMetricsFallOffSuccessTimeLost(timeTaken); break; case FAILED: - if (metrics != null) { - metrics.addMetricsFallOffFailedTimeLost(timeTaken); - } + metrics.addMetricsFallOffFailedTimeLost(timeTaken); break; case KILLED: - if (metrics != null) { - metrics.addMetricsFallOffKilledTimeLost(timeTaken); - } + metrics.addMetricsFallOffKilledTimeLost(timeTaken); break; } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index f06390cc4f..6249efcc9f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -28,7 +28,9 @@ public enum LlapDaemonExecutorInfo implements MetricsInfo { ExecutorMetrics("Llap daemon cache related metrics"), ExecutorMaxFreeSlots("Sum of wait queue size and number of executors"), - ExecutorNumExecutorsPerInstance("Total number of executor threads per node"), + ExecutorNumExecutors("Total number of executor threads"), + ExecutorMaxFreeSlotsConfigured("Sum of the configured wait queue size and the configured number of executors"), + ExecutorNumExecutorsConfigured("Total number of executor threads per node"), ExecutorNumExecutorsAvailable("Total number of executor threads per node that are free"), ExecutorNumExecutorsAvailableAverage("Total number of executor threads per node that are free averaged over time"), ExecutorAvailableFreeSlots("Number of free slots available"), @@ -37,7 +39,8 @@ ExecutorMemoryPerInstance("Total memory for executors per node in bytes"), ExecutorCacheMemoryPerInstance("Total Cache memory per node in bytes"), ExecutorJvmMaxMemory("Max memory available for JVM in bytes"), - ExecutorWaitQueueSize("Size of wait queue per node"), + ExecutorWaitQueueSize("Size of wait queue"), + ExecutorWaitQueueSizeConfigured("Size of wait queue configured per node"), ExecutorThreadUserTime("User time in nanoseconds"), ExecutorTotalRequestsHandled("Total number of requests handled by the container"), ExecutorNumQueuedRequests("Number of requests queued by the container for processing"), diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 6fee9f06a7..57547c1dff 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -24,17 +24,19 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffNumCompletedFragments; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorJvmMaxMemory; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxFreeSlotsConfigured; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxFreeSlots; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeToKill; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMemoryPerInstance; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutors; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailable; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsConfigured; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumPreemptableRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequestsAverage; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCPUTime; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadUserTime; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalEvictedFromWaitQueue; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalFailed; @@ -52,6 +54,7 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffFailedMaxTimeLost; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledTimeLost; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledMaxTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSizeConfigured; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -91,7 +94,8 @@ private final JvmMetrics jvmMetrics; private final String sessionId; private final MetricsRegistry registry; - private final int numExecutors; + private final int numExecutorsConfigured; + private final int waitQueueSizeConfigured; private final ThreadMXBean threadMXBean; private final Map cpuMetricsInfoMap; private final Map userMetricsInfoMap; @@ -165,21 +169,22 @@ MutableGaugeLong fallOffMaxFailedTimeLost; @Metric MutableGaugeLong fallOffMaxKilledTimeLost; - - + @Metric + MutableGaugeInt numExecutors; private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, - int numExecutors, final int[] intervals, int timedWindowAverageDataPoints, + int numExecutorsConfigured, int waitQueueSizeConfigured, final int[] intervals, int timedWindowAverageDataPoints, long timedWindowAverageWindowLength, int simpleAverageWindowDataSize) { this.name = displayName; this.jvmMetrics = jm; this.sessionId = sessionId; this.registry = new MetricsRegistry("LlapDaemonExecutorRegistry"); this.registry.tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME).tag(SessionId, sessionId); - this.numExecutors = numExecutors; + this.numExecutorsConfigured = numExecutorsConfigured; + this.waitQueueSizeConfigured = waitQueueSizeConfigured; this.threadMXBean = ManagementFactory.getThreadMXBean(); - this.executorThreadCpuTime = new MutableGaugeLong[numExecutors]; - this.executorThreadUserTime = new MutableGaugeLong[numExecutors]; + this.executorThreadCpuTime = new MutableGaugeLong[numExecutorsConfigured]; + this.executorThreadUserTime = new MutableGaugeLong[numExecutorsConfigured]; this.cpuMetricsInfoMap = new ConcurrentHashMap<>(); this.userMetricsInfoMap = new ConcurrentHashMap<>(); @@ -199,7 +204,7 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess } this.executorNames = Maps.newHashMap(); - for (int i = 0; i < numExecutors; i++) { + for (int i = 0; i < numExecutorsConfigured; i++) { MetricsInfo mic = new LlapDaemonCustomMetricsInfo(ExecutorThreadCPUTime.name() + "_" + i, ExecutorThreadCPUTime.description()); MetricsInfo miu = new LlapDaemonCustomMetricsInfo(ExecutorThreadUserTime.name() + "_" + i, @@ -227,13 +232,13 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess } public static LlapDaemonExecutorMetrics create(String displayName, String sessionId, - int numExecutors, final int[] intervals, int timedWindowAverageDataPoints, + int numExecutorsConfigured, int waitQueueSizeConfigured, final int[] intervals, int timedWindowAverageDataPoints, long timedWindowAverageWindowLength, int simpleAverageWindowDataSize) { MetricsSystem ms = LlapMetricsSystem.instance(); JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms); return ms.register(displayName, "LlapDaemon Executor Metrics", - new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutors, intervals, - timedWindowAverageDataPoints, timedWindowAverageWindowLength, simpleAverageWindowDataSize)); + new LlapDaemonExecutorMetrics(displayName, jm, sessionId, numExecutorsConfigured, waitQueueSizeConfigured, + intervals, timedWindowAverageDataPoints, timedWindowAverageWindowLength, simpleAverageWindowDataSize)); } @Override @@ -367,10 +372,14 @@ public void setJvmMaxMemory(long value) { public void setWaitQueueSize(int size) { waitQueueSize.set(size); } + public void setNumExecutors(int size) { + numExecutors.set(size); + } private void getExecutorStats(MetricsRecordBuilder rb) { updateThreadMetrics(rb); - final int totalSlots = waitQueueSize.value() + numExecutors; + final int totalConfiguredSlots = waitQueueSizeConfigured + numExecutorsConfigured; + final int totalSlots = waitQueueSize.value() + numExecutors.value(); final int slotsAvailableInQueue = waitQueueSize.value() - executorNumQueuedRequests.value(); final int slotsAvailableTotal = slotsAvailableInQueue + numExecutorsAvailable.value(); final float slotsAvailablePercent = totalSlots <= 0 ? 0.0f : @@ -387,8 +396,11 @@ private void getExecutorStats(MetricsRecordBuilder rb) { .addGauge(ExecutorMemoryPerInstance, memoryPerInstance.value()) .addGauge(ExecutorCacheMemoryPerInstance, cacheMemoryPerInstance.value()) .addGauge(ExecutorJvmMaxMemory, jvmMaxMemory.value()) + .addGauge(ExecutorMaxFreeSlotsConfigured, totalConfiguredSlots) .addGauge(ExecutorMaxFreeSlots, totalSlots) - .addGauge(ExecutorNumExecutorsPerInstance, numExecutors) + .addGauge(ExecutorNumExecutors, numExecutors.value()) + .addGauge(ExecutorNumExecutorsConfigured, numExecutorsConfigured) + .addGauge(ExecutorWaitQueueSizeConfigured, waitQueueSizeConfigured) .addGauge(ExecutorWaitQueueSize, waitQueueSize.value()) .addGauge(ExecutorNumExecutorsAvailable, numExecutorsAvailable.value()) .addGauge(ExecutorAvailableFreeSlots, slotsAvailableTotal) @@ -445,7 +457,7 @@ private void updateThreadMetrics(MetricsRecordBuilder rb) { } } - for (int i=0; i(), new AtomicReference(), @@ -157,7 +157,7 @@ public void testSetCapacity() throws ServiceException, IOException { .newBuilder() .build()); LlapDaemonExecutorMetrics executorMetrics = - LlapDaemonExecutorMetrics.create("LLAP", "SessionId", numHandlers, new int[] {30, 60, 300}, 0, 0L, 0); + LlapDaemonExecutorMetrics.create("LLAP", "SessionId", numHandlers, 1, new int[] {30, 60, 300}, 0, 0L, 0); LlapProtocolServerImpl server = new LlapProtocolServerImpl(null, numHandlers, containerRunnerMock, new AtomicReference(), new AtomicReference(), diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 4e396c6194..948a678f83 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -21,7 +21,11 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto; import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.*; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.hive.llap.testhelpers.ControlledClock; @@ -48,10 +52,20 @@ import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.MockRequest; import org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator; import org.apache.tez.runtime.task.TaskRunner2Result; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; public class TestTaskExecutorService { + @Mock + private LlapDaemonExecutorMetrics mockMetrics; + + @Before + public void setUp() { + initMocks(this); + } + @Test(timeout = 5000) public void testPreemptionQueueComparator() throws InterruptedException { TaskWrapper r1 = createTaskWrapper( @@ -114,7 +128,7 @@ public void testDuckPreemptsNonDuck() throws InterruptedException { private void testPreemptionHelper( MockRequest r1, MockRequest r2, boolean isPreemted) throws InterruptedException { TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2, - ShortestJobFirstComparator.class.getName(), true); + ShortestJobFirstComparator.class.getName(), true, mockMetrics); taskExecutorService.init(new Configuration()); taskExecutorService.start(); @@ -154,7 +168,7 @@ public void testPreemptionStateOnTaskFlagChanges() throws InterruptedException { MockRequest r2 = createMockRequest(2, 1, 100, 200, true, 2000000l, true); TaskExecutorServiceForTest taskExecutorService = - new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); + new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true, mockMetrics); taskExecutorService.init(new Configuration()); taskExecutorService.start(); @@ -228,7 +242,7 @@ public void testWaitQueueAcceptAfterAMTaskReport() throws InterruptedException { TaskExecutorServiceForTest taskExecutorService = - new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); + new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true, mockMetrics); // Fourth is lower priority as a result of canFinish being set to false. MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l, false); @@ -302,7 +316,7 @@ private void testWaitQueuePreemptionHelper(MockRequest r1, MockRequest r2, MockRequest r3, MockRequest r4, MockRequest r5) throws InterruptedException { TaskExecutorServiceForTest taskExecutorService = - new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); + new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true, mockMetrics); taskExecutorService.init(new Configuration()); taskExecutorService.start(); @@ -394,7 +408,7 @@ public void testDoKillMultiple() throws InterruptedException { @Test(timeout = 10000) public void testSetCapacity() throws InterruptedException { TaskExecutorServiceForTest taskExecutorService = - new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true); + new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true, mockMetrics); // Fourth is lower priority as a result of canFinish being set to false. MockRequest r1 = createMockRequest(1, 1, 1, 100, 200, true, 20000L, true); @@ -520,31 +534,50 @@ public void testSetCapacity() throws InterruptedException { @Test(timeout = 1000, expected = IllegalArgumentException.class) public void testSetCapacityHighExecutors() { TaskExecutorServiceForTest taskExecutorService = - new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true); + new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true, mockMetrics); taskExecutorService.setCapacity(3, 3); } @Test(timeout = 1000, expected = IllegalArgumentException.class) public void testSetCapacityHighQueueSize() { TaskExecutorServiceForTest taskExecutorService = - new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true); + new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true, mockMetrics); taskExecutorService.setCapacity(2, 5); } @Test(timeout = 1000, expected = IllegalArgumentException.class) public void testSetCapacityNegativeExecutors() { TaskExecutorServiceForTest taskExecutorService = - new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true); + new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true, mockMetrics); taskExecutorService.setCapacity(-3, 3); } @Test(timeout = 1000, expected = IllegalArgumentException.class) public void testSetCapacityNegativeQueueSize() { TaskExecutorServiceForTest taskExecutorService = - new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true); + new TaskExecutorServiceForTest(2, 3, ShortestJobFirstComparator.class.getName(), true, mockMetrics); taskExecutorService.setCapacity(2, -5); } + @Test(timeout = 1000) + public void testCapacityMetricsInitial() { + TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(2, 10, + ShortestJobFirstComparator.class.getName(), true, mockMetrics); + + verify(mockMetrics).setNumExecutors(2); + verify(mockMetrics).setWaitQueueSize(10); + } + + @Test(timeout = 1000) + public void testCapacityMetricsModification() { + TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(2, 10, + ShortestJobFirstComparator.class.getName(), true, mockMetrics); + reset(mockMetrics); + taskExecutorService.setCapacity(1, 5); + + verify(mockMetrics).setNumExecutors(1); + verify(mockMetrics).setWaitQueueSize(5); + } private void runPreemptionGraceTest( MockRequest victim1, MockRequest victim2, int time) throws InterruptedException { @@ -555,7 +588,7 @@ private void runPreemptionGraceTest( ControlledClock clock = new ControlledClock(new SystemClock()); clock.setTime(0); TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest( - 2, 3, ShortestJobFirstComparator.class.getName(), true, clock); + 2, 3, ShortestJobFirstComparator.class.getName(), true, mockMetrics, clock); taskExecutorService.init(new Configuration()); taskExecutorService.start(); @@ -600,14 +633,14 @@ private void awaitStartAndSchedulerRun(MockRequest mockRequest, private int scheduleAttempts = 0; public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, - String waitQueueComparatorClassName, boolean enablePreemption) { - this(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, null); + String waitQueueComparatorClassName, boolean enablePreemption, LlapDaemonExecutorMetrics metrics) { + this(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, metrics, null); } public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, - String waitQueueComparatorClassName, boolean enablePreemption, Clock clock) { + String waitQueueComparatorClassName, boolean enablePreemption, LlapDaemonExecutorMetrics metrics, Clock clock) { super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, - Thread.currentThread().getContextClassLoader(), null, clock); + Thread.currentThread().getContextClassLoader(), metrics, clock); } private ConcurrentMap completionListeners = diff --git llap-server/src/test/org/apache/hadoop/hive/llap/metrics/TestLlapDaemonExecutorMetrics.java llap-server/src/test/org/apache/hadoop/hive/llap/metrics/TestLlapDaemonExecutorMetrics.java index 4942a32035..19d6584427 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/metrics/TestLlapDaemonExecutorMetrics.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/metrics/TestLlapDaemonExecutorMetrics.java @@ -17,9 +17,23 @@ */ package org.apache.hadoop.hive.llap.metrics; +import com.google.common.collect.Maps; +import org.apache.hadoop.hive.llap.daemon.impl.DumpingMetricsCollector; import org.junit.Test; + +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlots; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxFreeSlotsConfigured; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxFreeSlots; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutors; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailable; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsConfigured; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequests; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSize; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSizeConfigured; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics.TimedAverageMetrics; +import java.util.Map; /** * Test class for LlapDaemonExecutorMetrics. @@ -142,6 +156,38 @@ public void testTimedAverageMetricsBigData() { assertEquals("Checking the calculated value", 9000, metrics.value(currentTime + 50L * 1000L * 1000L * 20000L)); } + @Test + public void testSimpleAndDerivedMetricsCalculations() { + int numExecutorsConfigured = 4; + int numExecutors = 2; + int numExecutorsAvailable = 1; + int waitQueueSizeConfigured = 10; + int waitQueueSize = 5; + int queuedRequests = 3; + + LlapDaemonExecutorMetrics metrics = LlapDaemonExecutorMetrics.create("test", "test", numExecutorsConfigured, + waitQueueSizeConfigured, new int[]{1}, 1, 1, 1); + metrics.setNumExecutors(numExecutors); + metrics.setNumExecutorsAvailable(numExecutorsAvailable); + metrics.setWaitQueueSize(waitQueueSize); + metrics.setExecutorNumQueuedRequests(queuedRequests); + Map data = Maps.newHashMap(); + metrics.getMetrics(new DumpingMetricsCollector(data), true); + + // Simple values + assertTrue(numExecutorsConfigured == data.get(ExecutorNumExecutorsConfigured.name())); + assertTrue(numExecutors == data.get(ExecutorNumExecutors.name())); + assertTrue(waitQueueSizeConfigured == data.get(ExecutorWaitQueueSizeConfigured.name())); + assertTrue(waitQueueSize == data.get(ExecutorWaitQueueSize.name())); + assertTrue(queuedRequests == data.get(ExecutorNumQueuedRequests.name())); + assertTrue(numExecutorsAvailable == data.get(ExecutorNumExecutorsAvailable.name())); + + // Derived values + assertTrue((waitQueueSizeConfigured + numExecutorsConfigured) == data.get(ExecutorMaxFreeSlotsConfigured.name())); + assertTrue((waitQueueSize + numExecutors) == data.get(ExecutorMaxFreeSlots.name())); + assertTrue((waitQueueSize + numExecutorsAvailable - queuedRequests) == data.get(ExecutorAvailableFreeSlots.name())); + } + private TimedAverageMetrics generateTimedAverageMetrics(int windowDataSize, long windowTimeSize, int dataNum, long firstData, long dataDelta, long firstTime, long timeDelta) { TimedAverageMetrics metrics =