diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 33dd63e..fca9907 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -61,13 +61,13 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRunner { + public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor "; + public static final String THREAD_NAME_FORMAT = THREAD_NAME_FORMAT_PREFIX + "%d"; private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class); - private final int numExecutors; private final ListeningExecutorService executorService; private final AtomicReference localAddress; private final String[] localDirsBase; - private final int localShufflePort; private final Map localEnv = new HashMap(); private volatile FileSystem localFs; private final long memoryPerExecutor; @@ -80,13 +80,11 @@ public ContainerRunnerImpl(int numExecutors, String[] localDirsBase, int localSh super("ContainerRunnerImpl"); Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); - this.numExecutors = numExecutors; this.localDirsBase = localDirsBase; - this.localShufflePort = localShufflePort; this.localAddress = localAddress; ExecutorService raw = Executors.newFixedThreadPool(numExecutors, - new ThreadFactoryBuilder().setNameFormat("ContainerExecutor %d").build()); + new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build()); this.executorService = MoreExecutors.listeningDecorator(raw); AuxiliaryServiceHelper.setServiceDataIntoEnv( TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java index 80e78ea..1aabcbf 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java @@ -68,6 +68,7 @@ private LlapDaemonCacheMetrics(String name, String sessionId) { this.name = name; this.sessionId = sessionId; this.registry = new MetricsRegistry("LlapDaemonCacheRegistry"); + this.registry.tag(ProcessName, "LlapDaemon").tag(SessionId, sessionId); } public static LlapDaemonCacheMetrics create(String displayName, String sessionId) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCustomMetricsInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCustomMetricsInfo.java new file mode 100644 index 0000000..964ef47 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCustomMetricsInfo.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.metrics; + +import org.apache.hadoop.metrics2.MetricsInfo; + +/** + * + */ +public class LlapDaemonCustomMetricsInfo implements MetricsInfo { + private String name; + private String desc; + + public LlapDaemonCustomMetricsInfo(String name, String desc) { + this.name = name; + this.desc = desc; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return desc; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index b7e06d5..154e723 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -28,7 +28,6 @@ LLAP_DAEMON_EXECUTOR_METRICS("Llap daemon cache related metrics"), EXECUTOR_THREAD_CPU_TIME("Cpu time in nanoseconds"), EXECUTOR_THREAD_USER_TIME("User time in nanoseconds"), - EXECUTOR_THREAD_SYSTEM_TIME("System time in nanoseconds"), EXECUTOR_TOTAL_REQUESTS_HANDLED("Total number of requests handled by the container"), EXECUTOR_NUM_QUEUED_REQUESTS("Number of requests queued by the container for processing"), EXECUTOR_TOTAL_SUCCESS("Total number of requests handled by the container that succeeded"), diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index e376495..80ee7a1 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.llap.metrics; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_NUM_QUEUED_REQUESTS; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_THREAD_CPU_TIME; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_THREAD_USER_TIME; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_ASKED_TO_DIE; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_EXECUTION_FAILURE; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.EXECUTOR_TOTAL_INTERRUPTED; @@ -27,7 +29,15 @@ import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.hive.llap.daemon.impl.ContainerRunnerImpl; import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; @@ -35,6 +45,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.source.JvmMetrics; /** @@ -48,13 +59,12 @@ private final String sessionId; private final MetricsRegistry registry; private final int numExecutors; + private final ThreadMXBean threadMXBean; + private final Map cpuMetricsInfoMap; + private final Map userMetricsInfoMap; - @Metric - MutableCounterLong[] executorThreadCpuTime; - @Metric - MutableCounterLong[] executorThreadUserTime; - @Metric - MutableCounterLong[] executorThreadSystemTime; + final MutableGaugeLong[] executorThreadCpuTime; + final MutableGaugeLong[] executorThreadUserTime; @Metric MutableCounterLong executorTotalRequestHandled; @Metric @@ -74,10 +84,24 @@ private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sess this.jvmMetrics = jm; this.sessionId = sessionId; this.registry = new MetricsRegistry("LlapDaemonExecutorRegistry"); + this.registry.tag(ProcessName, "LlapDaemon").tag(SessionId, sessionId); this.numExecutors = numExecutors; - this.executorThreadCpuTime = new MutableCounterLong[numExecutors]; - this.executorThreadUserTime = new MutableCounterLong[numExecutors]; - this.executorThreadSystemTime = new MutableCounterLong[numExecutors]; + this.threadMXBean = ManagementFactory.getThreadMXBean(); + this.executorThreadCpuTime = new MutableGaugeLong[numExecutors]; + this.executorThreadUserTime = new MutableGaugeLong[numExecutors]; + this.cpuMetricsInfoMap = new ConcurrentHashMap<>(); + this.userMetricsInfoMap = new ConcurrentHashMap<>(); + + for (int i = 0; i < numExecutors; i++) { + MetricsInfo mic = new LlapDaemonCustomMetricsInfo(EXECUTOR_THREAD_CPU_TIME.name() + "_" + i, + EXECUTOR_THREAD_CPU_TIME.description()); + MetricsInfo miu = new LlapDaemonCustomMetricsInfo(EXECUTOR_THREAD_USER_TIME.name() + "_" + i, + EXECUTOR_THREAD_USER_TIME.description()); + this.cpuMetricsInfoMap.put(i, mic); + this.userMetricsInfoMap.put(i, miu); + this.executorThreadCpuTime[i] = registry.newGauge(mic, 0L); + this.executorThreadUserTime[i] = registry.newGauge(miu, 0L); + } } public static LlapDaemonExecutorMetrics create(String displayName, String sessionId, @@ -96,19 +120,6 @@ public void getMetrics(MetricsCollector collector, boolean b) { getExecutorStats(rb); } - // Assumption here is threadId is from 0 to numExecutors - 1 - public void incrExecutorThreadCpuTime(int threadId, int delta) { - executorThreadCpuTime[threadId].incr(delta); - } - - public void incrExecutorThreadUserTime(int threadId, int delta) { - executorThreadUserTime[threadId].incr(delta); - } - - public void incrExecutorThreadSystemTime(int threadId, int delta) { - executorThreadSystemTime[threadId].incr(delta); - } - public void incrExecutorTotalRequestsHandled() { executorTotalRequestHandled.incr(); } @@ -138,12 +149,7 @@ public void incrExecutorTotalAskedToDie() { } private void getExecutorStats(MetricsRecordBuilder rb) { - // TODO: Enable this after adding InstrumentedThreadPool executor -// for (int i = 0; i < numExecutors; i++) { -// rb.addCounter(EXECUTOR_THREAD_CPU_TIME, executorThreadCpuTime[i].value()) -// .addCounter(EXECUTOR_THREAD_USER_TIME, executorThreadUserTime[i].value()) -// .addCounter(EXECUTOR_THREAD_SYSTEM_TIME, executorThreadSystemTime[i].value()); -// } + updateThreadMetrics(rb); rb.addCounter(EXECUTOR_TOTAL_REQUESTS_HANDLED, executorTotalRequestHandled.value()) .addCounter(EXECUTOR_NUM_QUEUED_REQUESTS, executorNumQueuedRequests.value()) @@ -153,6 +159,29 @@ private void getExecutorStats(MetricsRecordBuilder rb) { .addCounter(EXECUTOR_TOTAL_ASKED_TO_DIE, executorTotalAskedToDie.value()); } + private void updateThreadMetrics(MetricsRecordBuilder rb) { + if (threadMXBean.isThreadCpuTimeSupported() && threadMXBean.isThreadCpuTimeEnabled()) { + final long[] ids = threadMXBean.getAllThreadIds(); + final ThreadInfo[] infos = threadMXBean.getThreadInfo(ids); + for (int i = 0; i < ids.length; i++) { + ThreadInfo threadInfo = infos[i]; + String threadName = threadInfo.getThreadName(); + long threadId = ids[i]; + for (int j = 0; j < numExecutors; j++) { + if (threadName.equals(ContainerRunnerImpl.THREAD_NAME_FORMAT_PREFIX + j)) { + executorThreadCpuTime[j].set(threadMXBean.getThreadCpuTime(threadId)); + executorThreadUserTime[j].set(threadMXBean.getThreadUserTime(threadId)); + } + } + } + + for (int i=0; i