diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index dd6689113c..1099d35e2d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -166,6 +166,8 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, this.metrics = metrics; if (metrics != null) { metrics.setNumExecutorsAvailable(numSlotsAvailable.get()); + metrics.setNumExecutorsEnabled(numExecutors); + metrics.setWaitQueueSizeEnabled(waitQueueSize); } // single threaded scheduler for tasks from wait queue to executor threads @@ -213,6 +215,10 @@ public synchronized void setCapacity(int newNumExecutors, int newWaitQueueSize) numSlotsAvailable.addAndGet(newNumExecutors - maxParallelExecutors); maxParallelExecutors = newNumExecutors; waitQueue.setWaitQueueSize(newWaitQueueSize); + if (metrics != null) { + metrics.setNumExecutorsEnabled(newNumExecutors); + metrics.setWaitQueueSizeEnabled(newWaitQueueSize); + } LOG.info("TaskExecutorService is setting capacity to: numExecutors=" + newNumExecutors + ", waitQueueSize=" + newWaitQueueSize); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index f06390cc4f..cd4e165419 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -64,6 +64,8 @@ ExecutorFallOffNumCompletedFragments("Number of completed fragments w.r.t falloff values"), AverageQueueTime("Average queue time for tasks"), AverageResponseTime("Average response time for successful tasks"), + NumExecutorsEnabled("The number of executor threads enabled"), + WaitQueueSizeEnabled("Size of wait queue enabled"), ; private final String desc; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 6fee9f06a7..035aff9c9c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -52,6 +52,8 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffFailedMaxTimeLost; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledTimeLost; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledMaxTimeLost; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.NumExecutorsEnabled; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.WaitQueueSizeEnabled; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -165,7 +167,10 @@ MutableGaugeLong fallOffMaxFailedTimeLost; @Metric MutableGaugeLong fallOffMaxKilledTimeLost; - + @Metric + MutableGaugeInt numExecutorsEnabled; + @Metric + MutableGaugeInt waitQueueSizeEnabled; private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, @@ -368,6 +373,15 @@ public void setWaitQueueSize(int size) { waitQueueSize.set(size); } + public void setWaitQueueSizeEnabled(int size) { + waitQueueSizeEnabled.set(size); + } + + public void setNumExecutorsEnabled(int size) { + numExecutorsEnabled.set(size); + } + + private void getExecutorStats(MetricsRecordBuilder rb) { updateThreadMetrics(rb); final int totalSlots = waitQueueSize.value() + numExecutors; @@ -403,7 +417,9 @@ private void getExecutorStats(MetricsRecordBuilder rb) { .addGauge(ExecutorFallOffFailedMaxTimeLost, fallOffMaxFailedTimeLost.value()) .addCounter(ExecutorFallOffKilledTimeLost, fallOffKilledTimeLost.value()) .addGauge(ExecutorFallOffKilledMaxTimeLost, fallOffMaxKilledTimeLost.value()) - .addCounter(ExecutorFallOffNumCompletedFragments, fallOffNumCompletedFragments.value()); + .addCounter(ExecutorFallOffNumCompletedFragments, fallOffNumCompletedFragments.value()) + .addGauge(NumExecutorsEnabled, numExecutorsEnabled.value()) + .addGauge(WaitQueueSizeEnabled, waitQueueSizeEnabled.value()); if (numExecutorsAvailableAverage != null) { rb.addGauge(ExecutorNumExecutorsAvailableAverage, numExecutorsAvailableAverage.value()); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 4e396c6194..7f009519c5 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -21,7 +21,10 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto; import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.hive.llap.testhelpers.ControlledClock; @@ -545,6 +548,28 @@ public void testSetCapacityNegativeQueueSize() { taskExecutorService.setCapacity(2, -5); } + @Test(timeout = 1000) + public void testCapacityMetricsInitial() { + LlapDaemonExecutorMetrics mockMetrics = mock(LlapDaemonExecutorMetrics.class); + + TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(2, 10, + ShortestJobFirstComparator.class.getName(), true, mockMetrics, null); + + verify(mockMetrics).setNumExecutorsEnabled(2); + verify(mockMetrics).setWaitQueueSizeEnabled(10); + } + + @Test(timeout = 1000) + public void testCapacityMetricsModification() { + LlapDaemonExecutorMetrics mockMetrics = mock(LlapDaemonExecutorMetrics.class); + + TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(2, 10, + ShortestJobFirstComparator.class.getName(), true, mockMetrics, null); + taskExecutorService.setCapacity(1, 5); + + verify(mockMetrics).setNumExecutorsEnabled(2); + verify(mockMetrics).setWaitQueueSizeEnabled(10); + } private void runPreemptionGraceTest( MockRequest victim1, MockRequest victim2, int time) throws InterruptedException { @@ -601,13 +626,18 @@ private void awaitStartAndSchedulerRun(MockRequest mockRequest, public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption) { - this(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, null); + this(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, null,null); } public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption, Clock clock) { + this(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, null, clock); + } + + public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, + String waitQueueComparatorClassName, boolean enablePreemption, LlapDaemonExecutorMetrics metrics, Clock clock) { super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, - Thread.currentThread().getContextClassLoader(), null, clock); + Thread.currentThread().getContextClassLoader(), metrics, clock); } private ConcurrentMap completionListeners =