diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 49d748c..fb0bd02 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -330,6 +330,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_PORT.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WEB_SSL.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); @@ -2763,6 +2764,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " to a location other than the ones requested. Set to -1 for an infinite delay, 0" + "for a no delay. Currently these are the only two supported values" ), + LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS( + "hive.llap.daemon.task.preemption.metrics.intervals", "30,60,300", + "Comma-delimited set of integers denoting the desired rollover intervals (in seconds)\n" + + " for percentile latency metrics. Used by LLAP daemon task scheduler metrics for\n" + + " time taken to kill task (due to pre-emption) and useful time wasted by the task that\n" + + " is about to be preempted." + ), LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size", 10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"), LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME( diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java index 9f7e5c9..d78c1e0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java @@ -40,7 +40,7 @@ private final long maxSize; private final boolean isDirect; private final LlapDaemonCacheMetrics metrics; - + // We don't know the acceptable size for Java array, so we'll use 1Gb boundary. // That is guaranteed to fit any maximum allocation. private static final int MAX_ARENA_SIZE = 1024*1024*1024; @@ -113,9 +113,6 @@ public BuddyAllocator(boolean isDirectVal, int minAllocVal, int maxAllocVal, int this.metrics = metrics; metrics.incrAllocatedArena(); - metrics.setArenaSize(arenaSize); - metrics.setMinAllocationSize(minAllocation); - metrics.setMaxAllocationSize(maxAllocation); } // TODO: would it make sense to return buffers asynchronously? diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index e80fb15..3d45c7a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -106,7 +106,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi String waitQueueSchedulerClassName = HiveConf.getVar( conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, - waitQueueSchedulerClassName, enablePreemption, classLoader); + waitQueueSchedulerClassName, enablePreemption, classLoader, metrics); addIfService(executorService); @@ -218,8 +218,9 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws .setSubmissionState(SubmissionStateProto.valueOf(submissionState.name())) .build(); } - metrics.incrExecutorTotalRequestsHandled(); - metrics.incrExecutorNumQueuedRequests(); + if (metrics != null) { + metrics.incrExecutorTotalRequestsHandled(); + } } finally { NDC.pop(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 33b41e8..63cb16b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -20,7 +20,9 @@ import java.lang.management.MemoryType; import java.net.InetSocketAddress; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -68,6 +70,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; public class LlapDaemon extends CompositeService implements ContainerRunner, LlapDaemonMXBean { @@ -193,12 +196,25 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName(); String sessionId = MetricsUtils.getUUID(); daemonConf.set("llap.daemon.metrics.sessionid", sessionId); - this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors); + String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf, + HiveConf.ConfVars.LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS); + List intervalList = new ArrayList<>(); + if (strIntervals != null) { + for (String strInterval : strIntervals) { + try { + intervalList.add(Integer.valueOf(strInterval)); + } catch (NumberFormatException e) { + LOG.warn("Ignoring task pre-emption metrics interval {} from {} as it is invalid", + strInterval, Arrays.toString(strIntervals)); + } + } + } + this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, + Ints.toArray(intervalList)); this.metrics.setMemoryPerInstance(executorMemoryBytes); this.metrics.setCacheMemoryPerInstance(ioMemoryBytes); this.metrics.setJvmMaxMemory(maxJvmMemory); this.metrics.setWaitQueueSize(waitQueueSize); - this.metrics.setRpcNumHandlers(numHandlers); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); this.llapDaemonInfoBean = MBeans.register("LlapDaemon", "LlapDaemonInfo", this); LOG.info("Started LlapMetricsSystem with displayName: " + displayName + diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 57dd828..f621af2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.service.AbstractService; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; @@ -104,10 +105,11 @@ final ConcurrentMap knownTasks = new ConcurrentHashMap<>(); private final Object lock = new Object(); + private final LlapDaemonExecutorMetrics metrics; public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption, - ClassLoader classLoader) { + ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics) { super(TaskExecutorService.class.getSimpleName()); LOG.info("TaskExecutorService is being setup with parameters: " + "numExecutors=" + numExecutors @@ -127,6 +129,10 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, new PreemptionQueueComparator()); this.enablePreemption = enablePreemption; this.numSlotsAvailable = new AtomicInteger(numExecutors); + this.metrics = metrics; + if (metrics != null) { + metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); + } // single threaded scheduler for tasks from wait queue to executor threads ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() @@ -267,7 +273,11 @@ public void run() { trySchedule(task); // wait queue could have been re-ordered in the mean time because of concurrent task // submission. So remove the specific task instead of the head task. - waitQueue.remove(task); + if (waitQueue.remove(task)) { + if (metrics != null) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } + } } catch (RejectedExecutionException e) { rejectedException = e; } @@ -361,6 +371,9 @@ public SubmissionState schedule(TaskRunnerCallable task) { if (isDebugEnabled) { LOG.debug("{} is {} as wait queue is full", taskWrapper.getRequestId(), result); } + if (metrics != null) { + metrics.incrTotalRejectedRequests(); + } return result; } } @@ -392,11 +405,17 @@ public SubmissionState schedule(TaskRunnerCallable task) { LOG.info("{} evicted from wait queue in favor of {} because of lower priority", evictedTask.getRequestId(), task.getRequestId()); } + if (metrics != null) { + metrics.incrTotalEvictedFromWaitQueue(); + } } synchronized (lock) { lock.notify(); } + if (metrics != null) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } return result; } @@ -411,7 +430,11 @@ public void killFragment(String fragmentId) { LOG.debug("Removing {} from waitQueue", fragmentId); } taskWrapper.setIsInWaitQueue(false); - waitQueue.remove(taskWrapper); + if (waitQueue.remove(taskWrapper)) { + if (metrics != null) { + metrics.setExecutorNumQueuedRequests(waitQueue.size()); + } + } } if (taskWrapper.isInPreemptionQueue()) { if (isDebugEnabled) { @@ -419,6 +442,9 @@ public void killFragment(String fragmentId) { } taskWrapper.setIsInPreemptableQueue(false); preemptionQueue.remove(taskWrapper); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } taskWrapper.getTaskRunnerCallable().killTask(); } else { @@ -460,6 +486,9 @@ private void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecution } } numSlotsAvailable.decrementAndGet(); + if (metrics != null) { + metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); + } } private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) { @@ -511,11 +540,17 @@ private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishab LOG.debug("Removing {} from preemption queue because it's state changed to {}", taskWrapper.getRequestId(), newFinishableState); preemptionQueue.remove(taskWrapper.getTaskRunnerCallable()); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } else if (newFinishableState == false && !taskWrapper.isInPreemptionQueue() && !taskWrapper.isInWaitQueue()) { LOG.debug("Adding {} to preemption queue since finishable state changed to {}", taskWrapper.getRequestId(), newFinishableState); preemptionQueue.offer(taskWrapper); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } lock.notify(); } @@ -525,6 +560,9 @@ private void addToPreemptionQueue(TaskWrapper taskWrapper) { synchronized (lock) { preemptionQueue.add(taskWrapper); taskWrapper.setIsInPreemptableQueue(true); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } } @@ -534,6 +572,9 @@ private TaskWrapper removeAndGetFromPreemptionQueue() { taskWrapper = preemptionQueue.remove(); if (taskWrapper != null) { taskWrapper.setIsInPreemptableQueue(false); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } } return taskWrapper; @@ -582,9 +623,15 @@ private void updatePreemptionListAndNotify(EndReason reason) { .getTaskIdentifierString(taskWrapper.getTaskRunnerCallable().getRequest()) + " request " + state + "! Removed from preemption list."); } + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } } numSlotsAvailable.incrementAndGet(); + if (metrics != null) { + metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); + } if (isDebugEnabled) { LOG.debug("Task {} complete. WaitQueueSize={}, numSlotsAvailable={}, preemptionQueueSize={}", taskWrapper.getRequestId(), waitQueue.size(), numSlotsAvailable.get(), diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 2a60123..fcfa940 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -413,10 +413,15 @@ public void onSuccess(TaskRunner2Result result) { // Only the KILLED case requires a message to be sent out to the AM. case SUCCESS: LOG.debug("Successfully finished {}", requestId); - metrics.incrExecutorTotalSuccess(); + if (metrics != null) { + metrics.incrExecutorTotalSuccess(); + } break; case CONTAINER_STOP_REQUESTED: LOG.info("Received container stop request (AM preemption) for {}", requestId); + if (metrics != null) { + metrics.incrExecutorTotalKilled(); + } break; case KILL_REQUESTED: LOG.info("Killed task {}", requestId); @@ -424,17 +429,26 @@ public void onSuccess(TaskRunner2Result result) { killtimerWatch.stop(); long elapsed = killtimerWatch.elapsedMillis(); LOG.info("Time to die for task {}", elapsed); + if (metrics != null) { + metrics.addMetricsPreemptionTimeToKill(elapsed); + } + } + if (metrics != null) { + metrics.addMetricsPreemptionTimeLost(runtimeWatch.elapsedMillis()); + metrics.incrExecutorTotalKilled(); } - metrics.incrPreemptionTimeLost(runtimeWatch.elapsedMillis()); - metrics.incrExecutorTotalKilled(); break; case COMMUNICATION_FAILURE: LOG.info("Failed to run {} due to communication failure", requestId); - metrics.incrExecutorTotalExecutionFailed(); + if (metrics != null) { + metrics.incrExecutorTotalExecutionFailed(); + } break; case TASK_ERROR: LOG.info("Failed to run {} due to task error", requestId); - metrics.incrExecutorTotalExecutionFailed(); + if (metrics != null) { + metrics.incrExecutorTotalExecutionFailed(); + } break; } fragmentCompletionHanler.fragmentComplete(fragmentInfo); @@ -448,7 +462,6 @@ public void onSuccess(TaskRunner2Result result) { request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, taskRunnerCallable.startTime, true); - metrics.decrExecutorNumQueuedRequests(); } @Override @@ -466,9 +479,6 @@ public void onFailure(Throwable t) { request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, taskRunnerCallable.startTime, false); - if (metrics != null) { - metrics.decrExecutorNumQueuedRequests(); - } } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 6a72b4c..fea3dc7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -139,7 +139,6 @@ private LlapIoImpl(Configuration conf) throws IOException { int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE); executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build())); - ioMetrics.setIoThreadPoolSize(numThreads); // TODO: this should depends on input format and be in a map, or something. this.cvp = new OrcColumnVectorProducer( metadataCache, orcCache, bufferManager, conf, cacheMetrics, ioMetrics); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java index 0ba7c09..6b54b30 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -53,7 +53,6 @@ public void resetBeforeOffer(ColumnVectorBatch t) { // Don't reset anything, we are reusing column vectors. } }); - this.ioMetrics.setColumnVectorBatchPoolSize(cvbPool.size()); } public void init(ConsumerFeedback upstreamFeedback, @@ -62,10 +61,6 @@ public void init(ConsumerFeedback upstreamFeedback, this.readCallable = readCallable; } - public LlapDaemonIOMetrics getIOMetrics() { - return ioMetrics; - } - @Override public Callable getReadCallable() { return readCallable; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 83011fb..7effe69 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -182,9 +182,6 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff } catch (IOException e) { throw new RuntimeException(e); } - LlapDaemonIOMetrics ioMetrics = consumer.getIOMetrics(); - ioMetrics.setColumnStreamDataPoolSize(CSD_POOL.size()); - ioMetrics.setEncodedColumnBatchPoolSize(ECB_POOL.size()); } @Override diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java index 191345e..427a0b1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheInfo.java @@ -26,6 +26,7 @@ */ public enum LlapDaemonCacheInfo implements MetricsInfo { CacheMetrics("Llap daemon cache related metrics"), + CacheCapacityRemainingPercentage("Percentage of memory available in cache"), CacheCapacityRemaining("Amount of memory available in cache in bytes"), CacheCapacityTotal("Total amount of memory allocated for cache in bytes"), CacheCapacityUsed("Amount of memory used in cache in bytes"), @@ -34,10 +35,7 @@ CacheHitRatio("Ratio of disk ranges cached vs requested"), CacheReadRequests("Number of disk range requests to cache"), CacheAllocatedArena("Number of arenas allocated"), - CacheNumLockedBuffers("Number of locked buffers in cache"), - CacheArenaSize("Size of arena used by allocator"), - CacheMinAllocationSize("Minimum allocation size used by allocator"), - CacheMaxAllocationSize("Maximum allocation size used by allocator"); + CacheNumLockedBuffers("Number of locked buffers in cache"); private final String desc; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java index bb76da5..5f30b2d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java @@ -18,18 +18,16 @@ package org.apache.hadoop.hive.llap.metrics; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheAllocatedArena; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheArenaSize; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityRemaining; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityRemainingPercentage; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityTotal; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheCapacityUsed; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheHitBytes; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheHitRatio; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheMaxAllocationSize; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheMinAllocationSize; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheMetrics; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheNumLockedBuffers; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheReadRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheRequestedBytes; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheInfo.CacheMetrics; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -68,12 +66,6 @@ MutableCounterLong cacheAllocatedArena; @Metric MutableCounterLong cacheNumLockedBuffers; - @Metric - MutableGaugeLong arenaSize; - @Metric - MutableGaugeLong minAllocationSize; - @Metric - MutableGaugeLong maxAllocationSize; private LlapDaemonCacheMetrics(String name, String sessionId) { this.name = name; @@ -115,18 +107,6 @@ public void incrCacheNumLockedBuffers() { cacheNumLockedBuffers.incr(); } - public void setArenaSize(long value) { - arenaSize.set(value); - } - - public void setMinAllocationSize(long value) { - minAllocationSize.set(value); - } - - public void setMaxAllocationSize(long value) { - maxAllocationSize.set(value); - } - public void decrCacheNumLockedBuffers() { cacheNumLockedBuffers.incr(-1); } @@ -158,7 +138,11 @@ private void getCacheStats(MetricsRecordBuilder rb) { float cacheHitRatio = cacheRequestedBytes.value() == 0 ? 0.0f : (float) cacheHitBytes.value() / (float) cacheRequestedBytes.value(); - rb.addCounter(CacheCapacityRemaining, cacheCapacityTotal.value() - cacheCapacityUsed.value()) + long cacheCapacityRemaining = cacheCapacityTotal.value() - cacheCapacityUsed.value(); + float cacheRemainingPercent = cacheCapacityTotal.value() == 0 ? 0.0f : + (float) cacheCapacityRemaining / (float) cacheCapacityTotal.value(); + rb.addCounter(CacheCapacityRemaining, cacheCapacityRemaining) + .addGauge(CacheCapacityRemainingPercentage, cacheRemainingPercent) .addCounter(CacheCapacityTotal, cacheCapacityTotal.value()) .addCounter(CacheCapacityUsed, cacheCapacityUsed.value()) .addCounter(CacheReadRequests, cacheReadRequests.value()) @@ -166,10 +150,7 @@ private void getCacheStats(MetricsRecordBuilder rb) { .addCounter(CacheHitBytes, cacheHitBytes.value()) .addCounter(CacheAllocatedArena, cacheAllocatedArena.value()) .addCounter(CacheNumLockedBuffers, cacheNumLockedBuffers.value()) - .addGauge(CacheHitRatio, cacheHitRatio) - .addGauge(CacheArenaSize, arenaSize.value()) - .addGauge(CacheMinAllocationSize, minAllocationSize.value()) - .addGauge(CacheMaxAllocationSize, maxAllocationSize.value()); + .addGauge(CacheHitRatio, cacheHitRatio); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index 941d926..db5fd4f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -26,21 +26,32 @@ */ public enum LlapDaemonExecutorInfo implements MetricsInfo { ExecutorMetrics("Llap daemon cache related metrics"), - ExecutorThreadCountPerInstance("Total number of executor threads per node"), + ExecutorMaxFreeSlots("Sum of wait queue size and number of executors"), + ExecutorNumExecutorsPerInstance("Total number of executor threads per node"), + ExecutorNumExecutorsAvailable("Total number of executor threads per node that are free"), + ExecutorAvailableFreeSlots("Number of free slots available"), + ExecutorAvailableFreeSlotsPercent("Percent of free slots available"), + ExecutorThreadCPUTime("Cpu time in nanoseconds"), ExecutorMemoryPerInstance("Total memory for executors per node in bytes"), ExecutorCacheMemoryPerInstance("Total Cache memory per node in bytes"), ExecutorJvmMaxMemory("Max memory available for JVM in bytes"), ExecutorWaitQueueSize("Size of wait queue per node"), - ExecutorRpcNumHandlers("Number of RPC handlers per node"), - ExecutorThreadCPUTime("Cpu time in nanoseconds"), ExecutorThreadUserTime("User time in nanoseconds"), ExecutorTotalRequestsHandled("Total number of requests handled by the container"), ExecutorNumQueuedRequests("Number of requests queued by the container for processing"), + ExecutorNumPreemptableRequests("Number of queued requests that are pre-emptable"), + ExecutorTotalRejectedRequests("Total number of requests rejected as wait queue being full"), ExecutorTotalSuccess("Total number of requests handled by the container that succeeded"), - ExecutorTotalExecutionFailure("Total number of requests handled by the container that failed execution"), - ExecutorTotalInterrupted("Total number of requests handled by the container that got interrupted"), + ExecutorTotalFailed("Total number of requests handled by the container that failed execution"), + ExecutorTotalKilled("Total number of requests handled by the container that got interrupted"), ExecutorTotalAskedToDie("Total number of requests handled by the container that were asked to die"), - PreemptionTimeLost("Total time lost due to task preemptions"); + ExecutorTotalPreemptionTimeToKill("Total amount of time taken for killing tasks due to pre-emption"), + ExecutorTotalPreemptionTimeLost("Total useful cluster time lost because of pre-emption"), + ExecutorPercentileTimeToKill("Percentile time to kill for pre-empted tasks"), + ExecutorPercentileTimeLost("Percentile cluster time wasted due to pre-emption"), + ExecutorMaxPreemptionTimeToKill("Max time for killing pre-empted task"), + ExecutorMaxPreemptionTimeLost("Max cluster time lost due to pre-emption"), + ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority"); private final String desc; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 894880f..1110683 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -17,22 +17,30 @@ */ package org.apache.hadoop.hive.llap.metrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlots; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlotsPercent; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorJvmMaxMemory; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxFreeSlots; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeToKill; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMemoryPerInstance; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailable; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumPreemptableRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumQueuedRequests; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorRpcNumHandlers; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCPUTime; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadCountPerInstance; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorThreadUserTime; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalAskedToDie; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalExecutionFailure; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalInterrupted; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalEvictedFromWaitQueue; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalFailed; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalKilled; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRejectedRequests; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalSuccess; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMetrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeToKill; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSize; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.PreemptionTimeLost; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -54,6 +62,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.metrics2.source.JvmMetrics; /** @@ -70,13 +79,23 @@ private final ThreadMXBean threadMXBean; private final Map cpuMetricsInfoMap; private final Map userMetricsInfoMap; + private long maxTimeLost = Long.MIN_VALUE; + private long maxTimeToKill = Long.MIN_VALUE; final MutableGaugeLong[] executorThreadCpuTime; final MutableGaugeLong[] executorThreadUserTime; @Metric MutableCounterLong executorTotalRequestHandled; @Metric - MutableCounterLong executorNumQueuedRequests; + MutableGaugeInt executorNumQueuedRequests; + @Metric + MutableGaugeInt executorNumPreemptableRequests; + @Metric + MutableGaugeInt numExecutorsAvailable; + @Metric + MutableCounterLong totalRejectedRequests; + @Metric + MutableCounterLong totalEvictedFromWaitQueue; @Metric MutableCounterLong executorTotalSuccess; @Metric @@ -84,8 +103,6 @@ @Metric MutableCounterLong executorTotalExecutionFailed; @Metric - MutableCounterLong preemptionTimeLost; - @Metric MutableGaugeLong cacheMemoryPerInstance; @Metric MutableGaugeLong memoryPerInstance; @@ -94,10 +111,20 @@ @Metric MutableGaugeInt waitQueueSize; @Metric - MutableGaugeInt rpcNumHandlers; + MutableCounterLong totalPreemptionTimeToKill; + @Metric + MutableCounterLong totalPreemptionTimeLost; + @Metric + MutableGaugeLong maxPreemptionTimeToKill; + @Metric + MutableGaugeLong maxPreemptionTimeLost; + @Metric + final MutableQuantiles[] percentileTimeToKill; + @Metric + final MutableQuantiles[] percentileTimeLost; private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, - int numExecutors) { + int numExecutors, final int[] intervals) { this.name = displayName; this.jvmMetrics = jm; this.sessionId = sessionId; @@ -110,6 +137,21 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess this.cpuMetricsInfoMap = new ConcurrentHashMap<>(); this.userMetricsInfoMap = new ConcurrentHashMap<>(); + final int len = intervals == null ? 0 : intervals.length; + this.percentileTimeToKill = new MutableQuantiles[len]; + this.percentileTimeLost = new MutableQuantiles[len]; + for (int i=0; i maxTimeLost) { + maxTimeLost = value; + maxPreemptionTimeLost.set(maxTimeLost); + } + + for (MutableQuantiles q : percentileTimeLost) { + q.add(value); + } + } + + public void addMetricsPreemptionTimeToKill(long value) { + totalPreemptionTimeToKill.incr(value); + + if (value > maxTimeToKill) { + maxTimeToKill = value; + maxPreemptionTimeToKill.set(maxTimeToKill); + } + + for (MutableQuantiles q : percentileTimeToKill) { + q.add(value); + } } public void incrExecutorTotalKilled() { @@ -183,25 +259,43 @@ public void setWaitQueueSize(int size) { waitQueueSize.set(size); } - public void setRpcNumHandlers(int numHandlers) { - rpcNumHandlers.set(numHandlers); - } - private void getExecutorStats(MetricsRecordBuilder rb) { updateThreadMetrics(rb); + final int totalSlots = waitQueueSize.value() + numExecutors; + final int slotsAvailableInQueue = waitQueueSize.value() - executorNumQueuedRequests.value(); + final int slotsAvailableTotal = slotsAvailableInQueue + numExecutorsAvailable.value(); + final float slotsAvailablePercent = totalSlots <= 0 ? 0.0f : + (float) slotsAvailableTotal / (float) totalSlots; rb.addCounter(ExecutorTotalRequestsHandled, executorTotalRequestHandled.value()) - .addCounter(ExecutorNumQueuedRequests, executorNumQueuedRequests.value()) .addCounter(ExecutorTotalSuccess, executorTotalSuccess.value()) - .addCounter(ExecutorTotalExecutionFailure, executorTotalExecutionFailed.value()) - .addCounter(ExecutorTotalInterrupted, executorTotalIKilled.value()) - .addCounter(PreemptionTimeLost, preemptionTimeLost.value()) - .addGauge(ExecutorThreadCountPerInstance, numExecutors) + .addCounter(ExecutorTotalFailed, executorTotalExecutionFailed.value()) + .addCounter(ExecutorTotalKilled, executorTotalIKilled.value()) + .addCounter(ExecutorTotalEvictedFromWaitQueue, totalEvictedFromWaitQueue.value()) + .addCounter(ExecutorTotalRejectedRequests, totalRejectedRequests.value()) + .addGauge(ExecutorNumQueuedRequests, executorNumQueuedRequests.value()) + .addGauge(ExecutorNumPreemptableRequests, executorNumPreemptableRequests.value()) .addGauge(ExecutorMemoryPerInstance, memoryPerInstance.value()) .addGauge(ExecutorCacheMemoryPerInstance, cacheMemoryPerInstance.value()) .addGauge(ExecutorJvmMaxMemory, jvmMaxMemory.value()) + .addGauge(ExecutorMaxFreeSlots, totalSlots) + .addGauge(ExecutorNumExecutorsPerInstance, numExecutors) .addGauge(ExecutorWaitQueueSize, waitQueueSize.value()) - .addGauge(ExecutorRpcNumHandlers, rpcNumHandlers.value()); + .addGauge(ExecutorNumExecutorsAvailable, numExecutorsAvailable.value()) + .addGauge(ExecutorAvailableFreeSlots, slotsAvailableTotal) + .addGauge(ExecutorAvailableFreeSlotsPercent, slotsAvailablePercent) + .addCounter(ExecutorTotalPreemptionTimeToKill, totalPreemptionTimeToKill.value()) + .addCounter(ExecutorTotalPreemptionTimeLost, totalPreemptionTimeLost.value()) + .addGauge(ExecutorMaxPreemptionTimeToKill, maxPreemptionTimeToKill.value()) + .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value()); + + for (MutableQuantiles q : percentileTimeToKill) { + q.snapshot(rb, true); + } + + for (MutableQuantiles q : percentileTimeLost) { + q.snapshot(rb, true); + } } private void updateThreadMetrics(MetricsRecordBuilder rb) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOInfo.java index 79f004b..f0fde62 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOInfo.java @@ -26,10 +26,6 @@ */ public enum LlapDaemonIOInfo implements MetricsInfo { IOMetrics("Llap daemon I/O elevator metrics"), - IoThreadPoolSize("Size of the thread pool used by IO elevator"), - EncodedColumnBatchPoolSize("Size of the object pool that stores encoded column batches"), - ColumnStreamDataPoolSize("Size of the object pool that stores column stream data"), - ColumnVectorBatchPoolSize("Size of the object pool that stores column vector batches"), PercentileDecodingTime("Percentile decoding time for encoded column batch"), MaxDecodingTime("Max time for decoding an encoded column batch"); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java index f3def75..36eb0e5 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonIOMetrics.java @@ -17,12 +17,8 @@ */ package org.apache.hadoop.hive.llap.metrics; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.ColumnStreamDataPoolSize; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.ColumnVectorBatchPoolSize; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.EncodedColumnBatchPoolSize; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.IoThreadPoolSize; -import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.MaxDecodingTime; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.IOMetrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonIOInfo.MaxDecodingTime; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -33,7 +29,6 @@ import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.MetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.metrics2.lib.MutableRate; @@ -52,14 +47,6 @@ private long maxTime = Long.MIN_VALUE; @Metric - MutableGaugeInt encodedColumnBatchPoolSize; - @Metric - MutableGaugeInt columnStreamDataPoolSize; - @Metric - MutableGaugeInt columnVectorBatchPool; - @Metric - MutableGaugeInt ioThreadPoolSize; - @Metric MutableRate rateOfDecoding; final MutableQuantiles[] decodingTimes; @Metric @@ -101,22 +88,6 @@ public String getName() { return name; } - public void setEncodedColumnBatchPoolSize(int size) { - encodedColumnBatchPoolSize.set(size); - } - - public void setColumnStreamDataPoolSize(int size) { - columnStreamDataPoolSize.set(size); - } - - public void setColumnVectorBatchPoolSize(int size) { - columnVectorBatchPool.set(size); - } - - public void setIoThreadPoolSize(int size) { - ioThreadPoolSize.set(size); - } - public void addDecodeBatchTime(long latency) { rateOfDecoding.add(latency); if (latency > maxTime) { @@ -129,11 +100,7 @@ public void addDecodeBatchTime(long latency) { } private void getIoStats(MetricsRecordBuilder rb) { - rb.addGauge(EncodedColumnBatchPoolSize, encodedColumnBatchPoolSize.value()) - .addGauge(ColumnStreamDataPoolSize, columnStreamDataPoolSize.value()) - .addGauge(ColumnVectorBatchPoolSize, columnVectorBatchPool.value()) - .addGauge(IoThreadPoolSize, ioThreadPoolSize.value()) - .addGauge(MaxDecodingTime, maxDecodingTime.value()); + rb.addGauge(MaxDecodingTime, maxDecodingTime.value()); rateOfDecoding.snapshot(rb, true); for (MutableQuantiles q : decodingTimes) { diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index d1edd12..506f611 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -190,7 +190,7 @@ public void testWaitQueuePreemption() throws InterruptedException { public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption) { super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, - Thread.currentThread().getContextClassLoader()); + Thread.currentThread().getContextClassLoader(), null); } private ConcurrentMap completionListeners = new ConcurrentHashMap<>(); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 5ecbf79..c3d3a1d 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -56,10 +56,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics; +import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -173,12 +177,16 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting StatsPerDag dagStats = new StatsPerDag(); + private final LlapTaskSchedulerMetrics metrics; + private final JvmPauseMonitor pauseMonitor; + public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { - this(taskSchedulerContext, new SystemClock()); + this(taskSchedulerContext, new SystemClock(), true); } @VisibleForTesting - public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock) { + public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock, + boolean initMetrics) { super(taskSchedulerContext); this.clock = clock; try { @@ -236,6 +244,24 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw); + if (initMetrics) { + // Initialize the metrics system + LlapMetricsSystem.initialize("LlapDaemon"); + this.pauseMonitor = new JvmPauseMonitor(conf); + pauseMonitor.start(); + String displayName = "LlapTaskSchedulerMetrics-" + MetricsUtils.getHostName(); + String sessionId = conf.get("llap.daemon.metrics.sessionid"); + // TODO: Not sure about the use of this. Should we instead use workerIdentity as sessionId? + this.metrics = LlapTaskSchedulerMetrics.create(displayName, sessionId); + this.metrics.setNumExecutors(executorsPerInstance); + this.metrics.setMemoryPerInstance(memoryPerInstance * 1024L * 1024L); + this.metrics.setCpuCoresPerInstance(coresPerExecutor); + this.metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); + } else { + this.metrics = null; + this.pauseMonitor = null; + } + LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance=" + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor @@ -280,7 +306,8 @@ public void onFailure(Throwable t) { registry.registerStateChangeListener(new NodeStateChangeListener()); activeInstances = registry.getInstances(); for (ServiceInstance inst : activeInstances.getAll().values()) { - addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, + metrics)); } } finally { writeLock.unlock(); @@ -293,14 +320,14 @@ public void onFailure(Throwable t) { @Override public void onCreate(final ServiceInstance serviceInstance) { addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock, - numSchedulableTasksPerNode)); + numSchedulableTasksPerNode, metrics)); LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity()); } @Override public void onUpdate(final ServiceInstance serviceInstance) { instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance, - nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics)); LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity()); } @@ -309,6 +336,9 @@ public void onRemove(final ServiceInstance serviceInstance) { // FIXME: disabling this for now // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity()); + if (metrics != null) { + metrics.setClusterNodeCount(activeInstances.size()); + } // if there are no more nodes. Signal timeout monitor to start timer if (activeInstances.size() == 0) { LOG.info("No node found. Signalling scheduler timeout monitor thread to start timer."); @@ -378,6 +408,15 @@ public void shutdown() { if (registry != null) { registry.stop(); } + + if (pauseMonitor != null) { + pauseMonitor.stop(); + } + + if (metrics != null) { + LlapMetricsSystem.shutdown(); + } + } } finally { writeLock.unlock(); @@ -454,6 +493,9 @@ public void dagComplete() { // This is effectively DAG completed, and can be used to reset statistics being tracked. LOG.info("DAG: " + dagCounter.get() + " completed. Scheduling stats: " + dagStats); dagCounter.incrementAndGet(); + if (metrics != null) { + metrics.incrCompletedDagCount(); + } dagStats = new StatsPerDag(); } @@ -544,9 +586,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd // Also reset commFailures since a task was able to communicate back and indicate success. nodeInfo.enableNode(); // Re-insert into the queue to force the poll thread to remove the element. - if ( disabledNodesQueue.remove(nodeInfo)) { - disabledNodesQueue.add(nodeInfo); - } + reinsertNodeInfo(nodeInfo); } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); @@ -562,9 +602,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd // Also reset commFailures since a task was able to communicate back and indicate success. nodeInfo.enableNode(); // Re-insert into the queue to force the poll thread to remove the element. - if (disabledNodesQueue.remove(nodeInfo)) { - disabledNodesQueue.add(nodeInfo); - } + reinsertNodeInfo(nodeInfo); } // In case of success, trigger a scheduling run for pending tasks. trySchedulingPendingTasks(); @@ -599,6 +637,15 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd return true; } + private void reinsertNodeInfo(final NodeInfo nodeInfo) { + if ( disabledNodesQueue.remove(nodeInfo)) { + disabledNodesQueue.add(nodeInfo); + } + if (metrics != null) { + metrics.setDisabledNodeCount(disabledNodesQueue.size()); + } + } + @Override public Object deallocateContainer(ContainerId containerId) { LOG.debug("Ignoring deallocateContainer for containerId: " + containerId); @@ -704,7 +751,8 @@ private void scanForNodeChanges() { if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) { /* that's a good node, not added to the allocations yet */ LOG.info("Found a new node: " + inst + "."); - addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode)); + addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, + metrics)); } } } finally { @@ -720,6 +768,9 @@ private void addNode(ServiceInstance inst, NodeInfo node) { stopTimeoutMonitor(); } instanceToNodeMap.put(inst.getWorkerIdentity(), node); + if (metrics != null) { + metrics.setClusterNodeCount(activeInstances.size()); + } // Trigger scheduling since a new node became available. trySchedulingPendingTasks(); } @@ -752,6 +803,9 @@ private void disableInstance(ServiceInstance instance, boolean isCommFailure) { nodeInfo.disableNode(isCommFailure); // TODO: handle task to container map events in case of hard failures disabledNodesQueue.add(nodeInfo); + if (metrics != null) { + metrics.setDisabledNodeCount(disabledNodesQueue.size()); + } } } finally { writeLock.unlock(); @@ -768,6 +822,9 @@ private void addPendingTask(TaskInfo taskInfo) { } tasksAtPriority.add(taskInfo); knownTasks.putIfAbsent(taskInfo.task, taskInfo); + if (metrics != null) { + metrics.incrPendingTasksCount(); + } } finally { writeLock.unlock(); } @@ -799,6 +856,9 @@ private void registerRunningTask(TaskInfo taskInfo) { runningTasks.put(priority, tasksAtpriority); } tasksAtpriority.add(taskInfo); + if (metrics != null) { + metrics.decrPendingTasksCount(); + } } finally { writeLock.unlock(); } @@ -1034,6 +1094,9 @@ private void registerPendingPreemption(String host) { writeLock.lock(); try { pendingPreemptions.incrementAndGet(); + if (metrics != null) { + metrics.incrPendingPreemptionTasksCount(); + } MutableInt val = pendingPreemptionsPerHost.get(host); if (val == null) { val = new MutableInt(1); @@ -1049,6 +1112,9 @@ private void unregisterPendingPreemption(String host) { writeLock.lock(); try { pendingPreemptions.decrementAndGet(); + if (metrics != null) { + metrics.decrPendingPreemptionTasksCount(); + } MutableInt val = pendingPreemptionsPerHost.get(host); Preconditions.checkNotNull(val); val.decrement(); @@ -1199,23 +1265,24 @@ public void shutdown() { private int numPreemptedTasks = 0; private int numScheduledTasks = 0; private final int numSchedulableTasks; - + private final LlapTaskSchedulerMetrics metrics; /** * Create a NodeInfo bound to a service instance - * - * @param serviceInstance the associated serviceInstance + * @param serviceInstance the associated serviceInstance * @param blacklistConf blacklist configuration * @param clock clock to use to obtain timing information * @param numSchedulableTasksConf number of schedulable tasks on the node. 0 represents auto - * detect based on the serviceInstance, -1 indicates indicates - * unlimited capacity +* detect based on the serviceInstance, -1 indicates indicates + * @param metrics */ - NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, int numSchedulableTasksConf) { + NodeInfo(ServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock, + int numSchedulableTasksConf, final LlapTaskSchedulerMetrics metrics) { Preconditions.checkArgument(numSchedulableTasksConf >= -1, "NumSchedulableTasks must be >=-1"); this.serviceInstance = serviceInstance; this.blacklistConf = blacklistConf; this.clock = clock; + this.metrics = metrics; if (numSchedulableTasksConf == 0) { int pendingQueueuCapacity = 0; @@ -1234,6 +1301,9 @@ public void shutdown() { } else { this.numSchedulableTasks = numSchedulableTasksConf; } + if (metrics != null) { + metrics.incrSchedulableTasksCount(numSchedulableTasks); + } LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks); } @@ -1275,17 +1345,33 @@ void disableNode(boolean commFailure) { void registerTaskScheduled() { numScheduledTasks++; + if (metrics != null) { + metrics.incrRunningTasksCount(); + metrics.decrSchedulableTasksCount(); + } } void registerTaskSuccess() { numSuccessfulTasks++; numScheduledTasks--; + if (metrics != null) { + metrics.incrSuccessfulTasksCount(); + metrics.decrRunningTasksCount(); + metrics.incrSchedulableTasksCount(); + } } void registerUnsuccessfulTaskEnd(boolean wasPreempted) { numScheduledTasks--; + if (metrics != null) { + metrics.decrRunningTasksCount(); + metrics.incrSchedulableTasksCount(); + } if (wasPreempted) { numPreemptedTasks++; + if (metrics != null) { + metrics.incrPreemptedTasksCount(); + } } } diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java new file mode 100644 index 0000000..c190be8 --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import org.apache.hadoop.metrics2.MetricsInfo; + +import com.google.common.base.Objects; + +/** + * Metrics information for llap task scheduler. + */ +public enum LlapTaskSchedulerInfo implements MetricsInfo { + SchedulerMetrics("Llap task scheduler related metrics"), + SchedulerClusterNodeCount("Number of nodes in the cluster"), + SchedulerExecutorsPerInstance("Total number of executor threads per node"), + SchedulerMemoryPerInstance("Total memory for executors per node in bytes"), + SchedulerCpuCoresPerInstance("Total CPU vCores per node"), + SchedulerDisabledNodeCount("Number of nodes disabled temporarily"), + SchedulerPendingTaskCount("Number of pending tasks"), + SchedulerSchedulableTaskCount("Current slots available for scheduling tasks"), + SchedulerSuccessfulTaskCount("Total number of successful tasks"), + SchedulerRunningTaskCount("Total number of running tasks"), + SchedulerPendingPreemptionTaskCount("Total number of tasks pending for pre-emption"), + SchedulerPreemptedTaskCount("Total number of tasks pre-empted"), + SchedulerCompletedDagCount("Number of DAGs completed"); + + private final String desc; + + LlapTaskSchedulerInfo(String desc) { + this.desc = desc; + } + + @Override + public String description() { + return desc; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("name", name()).add("description", desc) + .toString(); + } +} \ No newline at end of file diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java new file mode 100644 index 0000000..b3230e2 --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerClusterNodeCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerCompletedDagCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerCpuCoresPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerDisabledNodeCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerExecutorsPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerMemoryPerInstance; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerMetrics; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingPreemptionTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPreemptedTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerRunningTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSchedulableTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSuccessfulTaskCount; +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.source.JvmMetrics; + +/** + * Metrics about the llap daemon task scheduler. + */ +@Metrics(about = "LlapDaemon Task Scheduler Metrics", context = "scheduler") +public class LlapTaskSchedulerMetrics implements MetricsSource { + + private final String name; + private final JvmMetrics jvmMetrics; + private final String sessionId; + private final MetricsRegistry registry; + @Metric + MutableGaugeInt numExecutors; + @Metric + MutableGaugeLong memoryPerInstance; + @Metric + MutableGaugeInt cpuCoresPerInstance; + @Metric + MutableGaugeInt clusterNodeCount; + @Metric + MutableGaugeInt disabledNodeCount; + @Metric + MutableCounterInt pendingTasksCount; + @Metric + MutableCounterInt schedulableTasksCount; + @Metric + MutableCounterInt runningTasksCount; + @Metric + MutableCounterInt successfulTasksCount; + @Metric + MutableCounterInt preemptedTasksCount; + @Metric + MutableCounterInt completedDagcount; + @Metric + MutableCounterInt pendingPreemptionTasksCount; + + private LlapTaskSchedulerMetrics(String displayName, JvmMetrics jm, String sessionId) { + this.name = displayName; + this.jvmMetrics = jm; + this.sessionId = sessionId; + this.registry = new MetricsRegistry("LlapTaskSchedulerMetricsRegistry"); + this.registry.tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME).tag(SessionId, sessionId); + } + + public static LlapTaskSchedulerMetrics create(String displayName, String sessionId) { + MetricsSystem ms = LlapMetricsSystem.instance(); + JvmMetrics jm = JvmMetrics.create(MetricsUtils.METRICS_PROCESS_NAME, sessionId, ms); + return ms.register(displayName, "Llap Task Scheduler Metrics", + new LlapTaskSchedulerMetrics(displayName, jm, sessionId)); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean b) { + MetricsRecordBuilder rb = collector.addRecord(SchedulerMetrics) + .setContext("scheduler") + .tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME) + .tag(SessionId, sessionId); + getTaskSchedulerStats(rb); + } + + public void setNumExecutors(int value) { + numExecutors.set(value); + } + + public void setMemoryPerInstance(long value) { + memoryPerInstance.set(value); + } + + public void setCpuCoresPerInstance(int value) { + cpuCoresPerInstance.set(value); + } + + public void setClusterNodeCount(int value) { + clusterNodeCount.set(value); + } + + public void setDisabledNodeCount(int value) { + disabledNodeCount.set(value); + } + + public void incrPendingTasksCount() { + pendingTasksCount.incr(); + } + + public void decrPendingTasksCount() { + pendingTasksCount.incr(-1); + } + + public void incrSchedulableTasksCount(int delta) { + schedulableTasksCount.incr(delta); + } + + public void incrSchedulableTasksCount() { + schedulableTasksCount.incr(); + } + + public void decrSchedulableTasksCount() { + schedulableTasksCount.incr(-1); + } + + public void incrSuccessfulTasksCount() { + successfulTasksCount.incr(); + } + + public void incrRunningTasksCount() { + runningTasksCount.incr(); + } + + public void decrRunningTasksCount() { + runningTasksCount.incr(-1); + } + + public void incrPreemptedTasksCount() { + preemptedTasksCount.incr(); + } + + public void incrCompletedDagCount() { + completedDagcount.incr(); + } + + public void incrPendingPreemptionTasksCount() { + pendingPreemptionTasksCount.incr(); + } + + public void decrPendingPreemptionTasksCount() { + pendingPreemptionTasksCount.incr(-1); + } + + private void getTaskSchedulerStats(MetricsRecordBuilder rb) { + rb.addGauge(SchedulerClusterNodeCount, clusterNodeCount.value()) + .addGauge(SchedulerExecutorsPerInstance, numExecutors.value()) + .addGauge(SchedulerMemoryPerInstance, memoryPerInstance.value()) + .addGauge(SchedulerCpuCoresPerInstance, cpuCoresPerInstance.value()) + .addGauge(SchedulerDisabledNodeCount, disabledNodeCount.value()) + .addCounter(SchedulerPendingTaskCount, pendingTasksCount.value()) + .addCounter(SchedulerSchedulableTaskCount, schedulableTasksCount.value()) + .addCounter(SchedulerRunningTaskCount, runningTasksCount.value()) + .addCounter(SchedulerSuccessfulTaskCount, successfulTasksCount.value()) + .addCounter(SchedulerPendingPreemptionTaskCount, pendingPreemptionTasksCount.value()) + .addCounter(SchedulerPreemptedTaskCount, preemptedTasksCount.value()) + .addCounter(SchedulerCompletedDagCount, completedDagcount.value()); + } + + public JvmMetrics getJvmMetrics() { + return jvmMetrics; + } + + public String getName() { + return name; + } +} diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 36d8ffd..b2cd55e 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -627,7 +627,7 @@ void rejectExecution(Object task) { public LlapTaskSchedulerServiceForTest( TaskSchedulerContext appClient, Clock clock) { - super(appClient, clock); + super(appClient, clock, false); } @Override