diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e50ff997a9..3e79dc4981 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4342,6 +4342,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Number of threads to use in LLAP task plugin client."), LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS("hive.llap.daemon.download.permanent.fns", false, "Whether LLAP daemon should localize the resources for permanent UDFs."), + LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS("hive.llap.task.scheduler.am.collect.daemon.metrics", "0ms", + new TimeValidator(TimeUnit.MILLISECONDS), "Collect llap daemon metrics in the AM every given milliseconds." + + "If it's set to 0, then the feature is disabled."), LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap", "AM registry name for LLAP task scheduler plugin to register with."), LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "", diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index cdf767f1db..1b9c157855 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -16,6 +16,7 @@ import com.google.common.io.ByteArrayDataOutput; +import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector; import org.apache.hadoop.io.Text; import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; @@ -292,6 +293,7 @@ public void setError(Void v, Throwable t) { private final LlapTaskSchedulerMetrics metrics; private final JvmPauseMonitor pauseMonitor; private final Random random = new Random(); + private final LlapMetricsCollector llapMetricsCollector; private int totalGuaranteed = 0, unusedGuaranteed = 0; @@ -400,6 +402,8 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimedLogThread") .build()); + this.llapMetricsCollector = new LlapMetricsCollector(conf); + String instanceId = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); Preconditions.checkNotNull(instanceId, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname @@ -790,6 +794,8 @@ public void run() { } }, 0, 10000L, TimeUnit.MILLISECONDS); + llapMetricsCollector.start(); + nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable); Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG)); @@ -803,6 +809,7 @@ public void run() { registry.start(); registry.registerStateChangeListener(new NodeStateChangeListener()); + registry.registerStateChangeListener(llapMetricsCollector); activeInstances = registry.getInstances(); for (LlapServiceInstance inst : activeInstances.getAll()) { registerAndAddNode(new NodeInfo(inst, nodeBlacklistConf, clock, @@ -903,6 +910,7 @@ public void shutdown() { try { if (!this.isStopped.getAndSet(true)) { scheduledLoggingExecutor.shutdownNow(); + llapMetricsCollector.shutdown(); nodeEnablerCallable.shutdown(); if (nodeEnablerFuture != null) { diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java new file mode 100644 index 0000000000..f9d6cd1dab --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java @@ -0,0 +1,194 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.SocketFactory; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Collect metrics from the llap daemons in every given milliseconds. + */ +public class LlapMetricsCollector implements ServiceInstanceStateChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(LlapMetricsCollector.class); + private static final String THREAD_NAME = "LlapTaskSchedulerMetricsCollectorThread"; + private static final long INITIAL_DELAY_MSEC = 60000L; + + private final Configuration conf; + private final ScheduledExecutorService scheduledMetricsExecutor; + private final LlapManagementProtocolClientImplFactory clientFactory; + private final Map llapClients; + private final Map instanceStatisticsMap; + private final long metricsCollectionMs; + + + public LlapMetricsCollector(Configuration conf) { + this( + conf, + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(THREAD_NAME) + .build()), + LlapManagementProtocolClientImplFactory.basicInstance(conf)); + } + + @VisibleForTesting + LlapMetricsCollector(Configuration conf, ScheduledExecutorService scheduledMetricsExecutor, + LlapManagementProtocolClientImplFactory clientFactory) { + this.conf = conf; + this.scheduledMetricsExecutor = scheduledMetricsExecutor; + this.clientFactory = clientFactory; + this.llapClients = new HashMap<>(); + this.instanceStatisticsMap = new ConcurrentHashMap<>(); + this.metricsCollectionMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS); + } + + public void start() { + if (enabled()) { + scheduledMetricsExecutor.scheduleAtFixedRate(() -> { + collectMetrics(); + }, INITIAL_DELAY_MSEC, metricsCollectionMs, TimeUnit.MILLISECONDS); + } + } + + private boolean enabled() { + return metricsCollectionMs > 0; + } + + public void shutdown() { + scheduledMetricsExecutor.shutdownNow(); + } + + @VisibleForTesting + void collectMetrics() { + if (enabled()) { + for (Map.Entry entry : llapClients.entrySet()) { + String identity = entry.getKey(); + LlapManagementProtocolClientImpl client = entry.getValue(); + try { + LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics = + client.getDaemonMetrics(null, + LlapDaemonProtocolProtos.GetDaemonMetricsRequestProto.newBuilder().build()); + instanceStatisticsMap.put(identity, new LlapMetrics(metrics)); + + } catch (ServiceException ex) { + LOG.error(ex.getMessage(), ex); + } + } + } + } + + @Override + public void onCreate(LlapServiceInstance serviceInstance, int ephSeqVersion) { + if (enabled()) { + LlapManagementProtocolClientImpl client = clientFactory.create(serviceInstance); + llapClients.put(serviceInstance.getWorkerIdentity(), client); + } + } + + @Override + public void onUpdate(LlapServiceInstance serviceInstance, int ephSeqVersion) { + //NOOP + } + + @Override + public void onRemove(LlapServiceInstance serviceInstance, int ephSeqVersion) { + if (enabled()) { + String workerIdentity = serviceInstance.getWorkerIdentity(); + llapClients.remove(workerIdentity); + instanceStatisticsMap.remove(workerIdentity); + } + } + + public LlapMetrics getMetrics(String workerIdentity) { + return instanceStatisticsMap.get(workerIdentity); + } + + public Map getMetrics() { + return Collections.unmodifiableMap(instanceStatisticsMap); + } + + /** + * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance. + */ + public static class LlapManagementProtocolClientImplFactory { + private final Configuration conf; + private final RetryPolicy retryPolicy; + private final SocketFactory socketFactory; + + public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy, + SocketFactory socketFactory) { + this.conf = conf; + this.retryPolicy = retryPolicy; + this.socketFactory = socketFactory; + } + + private static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) { + return new LlapManagementProtocolClientImplFactory( + conf, + RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS), + NetUtils.getDefaultSocketFactory(conf)); + } + + public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) { + LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), + serviceInstance.getManagementPort(), retryPolicy, + socketFactory); + return client; + } + } + + /** + * Stores the metrics retrieved from the llap daemons, along with the retrieval timestamp. + */ + public static class LlapMetrics { + private final long timestamp; + private final LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics; + + public LlapMetrics(LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics) { + this.timestamp = System.currentTimeMillis(); + this.metrics = metrics; + } + + public long getTimestamp() { + return timestamp; + } + + public LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto getMetrics() { + return metrics; + } + } +} diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java new file mode 100644 index 0000000000..68cbadd2b6 --- /dev/null +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java @@ -0,0 +1,184 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestLlapMetricsCollector { + + private static final long DEFAULT_TIMEOUT = 1000; + private static final int TEST_SEQ_VERSION = -1; + private static final String TEST_IDENTITY_1 = "testInstance_1"; + private static final String TEST_IDENTITY_2 = "testInstance_2"; + + private static final LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto TEST_RESPONSE = + LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto.getDefaultInstance(); + + private LlapMetricsCollector collector; + + @Mock + private Configuration mockConf; + + @Mock + private LlapMetricsCollector.LlapManagementProtocolClientImplFactory mockClientFactory; + + @Mock + private LlapManagementProtocolClientImpl mockClient; + + @Mock + private ScheduledExecutorService mockExecutor; + + @Before + public void setUp() throws ServiceException { + // HiveConf.setBoolVar(); + initMocks(this); + + when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.varname, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.defaultStrVal)).thenReturn("30000ms"); + when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient); + when(mockClient.getDaemonMetrics( + any(RpcController.class), + any(LlapDaemonProtocolProtos.GetDaemonMetricsRequestProto.class))).thenReturn(TEST_RESPONSE); + collector = new LlapMetricsCollector(mockConf, mockExecutor, mockClientFactory); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testAddService() { + // Given + LlapServiceInstance mockService = mock(LlapServiceInstance.class); + when(mockService.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + + // When + collector.onCreate(mockService, TEST_SEQ_VERSION); + collector.collectMetrics(); + + // Then + assertEquals(1, collector.getMetrics().size()); + + } + + + @Test(timeout = DEFAULT_TIMEOUT) + public void testRemoveService() { + // Given + LlapServiceInstance mockService = mock(LlapServiceInstance.class); + when(mockService.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + + // When + collector.onCreate(mockService, TEST_SEQ_VERSION); + collector.collectMetrics(); + collector.onRemove(mockService, TEST_SEQ_VERSION); + + // Then + assertEquals(0, collector.getMetrics().size()); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testMultipleCollectOnSameInstance() { + // Given + LlapServiceInstance mockService = mock(LlapServiceInstance.class); + when(mockService.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + + // When + collector.onCreate(mockService, TEST_SEQ_VERSION); + collector.collectMetrics(); + LlapMetricsCollector.LlapMetrics metrics1 = collector.getMetrics(mockService.getWorkerIdentity()); + collector.collectMetrics(); + LlapMetricsCollector.LlapMetrics metrics2 = collector.getMetrics(mockService.getWorkerIdentity()); + + // Then + assertEquals(1, collector.getMetrics().size()); + assertNotEquals(metrics1, metrics2); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testCollectOnMultipleInstances() { + // Given + LlapServiceInstance mockService1 = mock(LlapServiceInstance.class); + when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + LlapServiceInstance mockService2 = mock(LlapServiceInstance.class); + when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2); + + // When + collector.onCreate(mockService1, TEST_SEQ_VERSION); + collector.onCreate(mockService2, TEST_SEQ_VERSION); + collector.collectMetrics(); + + // Then + assertEquals(2, collector.getMetrics().size()); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testMultipleCollectOnMultipleInstances() { + // Given + LlapServiceInstance mockService1 = mock(LlapServiceInstance.class); + when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + LlapServiceInstance mockService2 = mock(LlapServiceInstance.class); + when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2); + + // When + collector.onCreate(mockService1, TEST_SEQ_VERSION); + collector.onCreate(mockService2, TEST_SEQ_VERSION); + collector.collectMetrics(); + collector.collectMetrics(); + collector.collectMetrics(); + + // Then + assertEquals(2, collector.getMetrics().size()); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testStartStartsScheduling() { + // When + collector.start(); + + // Then + verify(mockExecutor, times(1)) + .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + + } + + + @Test(timeout = DEFAULT_TIMEOUT) + public void testShutdown() { + // When + collector.shutdown(); + + // Then + verify(mockExecutor, times(1)).shutdownNow(); + } +}