diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e50ff997a9..916549b753 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4342,6 +4342,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Number of threads to use in LLAP task plugin client."), LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS("hive.llap.daemon.download.permanent.fns", false, "Whether LLAP daemon should localize the resources for permanent UDFs."), + LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS("hive.llap.task.scheduler.am.collect.daemon.metrics.ms", "0ms", + new TimeValidator(TimeUnit.MILLISECONDS), "Collect llap daemon metrics in the AM every given milliseconds,\n" + + "so that the AM can use this information, to make better scheduling decisions.\n" + + "If it's set to 0, then the feature is disabled."), LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap", "AM registry name for LLAP task scheduler plugin to register with."), LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "", diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index cdf767f1db..37e2fcd8da 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -16,6 +16,7 @@ import com.google.common.io.ByteArrayDataOutput; +import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector; import org.apache.hadoop.io.Text; import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; @@ -313,6 +314,7 @@ public void setError(Void v, Throwable t) { private final Object outputsLock = new Object(); private TezDAGID depsDagId = null; private Map> transitiveOutputs; + private LlapMetricsCollector llapMetricsCollector; public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { this(taskSchedulerContext, new MonotonicClock(), true); @@ -400,6 +402,12 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimedLogThread") .build()); + if (HiveConf.getTimeVar(conf, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS) > 0) { + this.llapMetricsCollector = new LlapMetricsCollector(conf); + this.registry.registerServiceListener(llapMetricsCollector); + } + String instanceId = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); Preconditions.checkNotNull(instanceId, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java new file mode 100644 index 0000000000..99a25214f5 --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java @@ -0,0 +1,220 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Collect metrics from the llap daemons in every given milliseconds. + */ +public class LlapMetricsCollector implements ServiceStateChangeListener, + ServiceInstanceStateChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(LlapMetricsCollector.class); + private static final String THREAD_NAME = "LlapTaskSchedulerMetricsCollectorThread"; + private static final long INITIAL_DELAY_MSEC = 10000L; + + private final ScheduledExecutorService scheduledMetricsExecutor; + private final LlapManagementProtocolClientImplFactory clientFactory; + private final Map llapClients; + private final Map instanceStatisticsMap; + private final long metricsCollectionMs; + + + public LlapMetricsCollector(Configuration conf) { + this( + conf, + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(THREAD_NAME) + .build()), + LlapManagementProtocolClientImplFactory.basicInstance(conf)); + } + + @VisibleForTesting + LlapMetricsCollector(Configuration conf, ScheduledExecutorService scheduledMetricsExecutor, + LlapManagementProtocolClientImplFactory clientFactory) { + this.scheduledMetricsExecutor = scheduledMetricsExecutor; + this.clientFactory = clientFactory; + this.llapClients = new HashMap<>(); + this.instanceStatisticsMap = new ConcurrentHashMap<>(); + this.metricsCollectionMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS); + } + + public void start() { + if (metricsCollectionMs > 0) { + scheduledMetricsExecutor.scheduleAtFixedRate(() -> { + collectMetrics(); + }, INITIAL_DELAY_MSEC, metricsCollectionMs, TimeUnit.MILLISECONDS); + } + } + + public void shutdown() { + scheduledMetricsExecutor.shutdownNow(); + } + + @VisibleForTesting + void collectMetrics() { + for (Map.Entry entry : llapClients.entrySet()) { + String identity = entry.getKey(); + LlapManagementProtocolClientImpl client = entry.getValue(); + try { + LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics = + client.getDaemonMetrics(null, + LlapDaemonProtocolProtos.GetDaemonMetricsRequestProto.newBuilder().build()); + instanceStatisticsMap.put(identity, new LlapMetrics(metrics)); + + } catch (ServiceException ex) { + LOG.error(ex.getMessage(), ex); + instanceStatisticsMap.remove(identity); + } + } + } + + @Override + public void stateChanged(Service service) { + if (service.getServiceState() == Service.STATE.STARTED) { + if (service instanceof LlapRegistryService) { + setupLlapRegistryService((LlapRegistryService) service); + } + start(); + } else if (service.getServiceState() == Service.STATE.STOPPED) { + shutdown(); + } + } + + private void setupLlapRegistryService(LlapRegistryService service) { + try { + consumeInitialInstances(service); + service.registerStateChangeListener(this); + } catch (IOException ex) { + LOG.error(ex.getMessage(), ex); + } + } + + @VisibleForTesting + void consumeInitialInstances(LlapRegistryService service) throws IOException { + LlapServiceInstanceSet serviceInstances = service.getInstances(); + for (LlapServiceInstance serviceInstance : + serviceInstances.getAll()) { + onCreate(serviceInstance, -1); + } + } + + @Override + public void onCreate(LlapServiceInstance serviceInstance, int ephSeqVersion) { + LlapManagementProtocolClientImpl client = clientFactory.create(serviceInstance); + llapClients.put(serviceInstance.getWorkerIdentity(), client); + + } + + @Override + public void onUpdate(LlapServiceInstance serviceInstance, int ephSeqVersion) { + //NOOP + } + + @Override + public void onRemove(LlapServiceInstance serviceInstance, int ephSeqVersion) { + String workerIdentity = serviceInstance.getWorkerIdentity(); + llapClients.remove(workerIdentity); + instanceStatisticsMap.remove(workerIdentity); + } + + public LlapMetrics getMetrics(String workerIdentity) { + return instanceStatisticsMap.get(workerIdentity); + } + + public Map getMetrics() { + return Collections.unmodifiableMap(instanceStatisticsMap); + } + + /** + * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance. + */ + public static class LlapManagementProtocolClientImplFactory { + private final Configuration conf; + private final RetryPolicy retryPolicy; + private final SocketFactory socketFactory; + + public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy, + SocketFactory socketFactory) { + this.conf = conf; + this.retryPolicy = retryPolicy; + this.socketFactory = socketFactory; + } + + private static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) { + return new LlapManagementProtocolClientImplFactory( + conf, + RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS), + NetUtils.getDefaultSocketFactory(conf)); + } + + public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) { + LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), + serviceInstance.getManagementPort(), retryPolicy, + socketFactory); + return client; + } + } + + /** + * Stores the metrics retrieved from the llap daemons, along with the retrieval timestamp. + */ + public static class LlapMetrics { + private final long timestamp; + private final LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics; + + public LlapMetrics(LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics) { + this.timestamp = System.currentTimeMillis(); + this.metrics = metrics; + } + + public long getTimestamp() { + return timestamp; + } + + public LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto getMetrics() { + return metrics; + } + } +} diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java new file mode 100644 index 0000000000..6da4d8c65a --- /dev/null +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java @@ -0,0 +1,223 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import com.google.common.collect.Lists; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestLlapMetricsCollector { + + private static final long DEFAULT_TIMEOUT = 1000; + private static final int TEST_SEQ_VERSION = -1; + private static final String TEST_IDENTITY_1 = "testInstance_1"; + private static final String TEST_IDENTITY_2 = "testInstance_2"; + + private static final LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto TEST_RESPONSE = + LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto.getDefaultInstance(); + + private LlapMetricsCollector collector; + + @Mock + private Configuration mockConf; + + @Mock + private LlapMetricsCollector.LlapManagementProtocolClientImplFactory mockClientFactory; + + @Mock + private LlapManagementProtocolClientImpl mockClient; + + @Mock + private ScheduledExecutorService mockExecutor; + + @Before + public void setUp() throws ServiceException { + initMocks(this); + + when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.varname, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.defaultStrVal)).thenReturn("30000ms"); + when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient); + when(mockClient.getDaemonMetrics( + any(RpcController.class), + any(LlapDaemonProtocolProtos.GetDaemonMetricsRequestProto.class))).thenReturn(TEST_RESPONSE); + collector = new LlapMetricsCollector(mockConf, mockExecutor, mockClientFactory); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testAddService() { + // Given + LlapServiceInstance mockService = mock(LlapServiceInstance.class); + when(mockService.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + + // When + collector.onCreate(mockService, TEST_SEQ_VERSION); + collector.collectMetrics(); + + // Then + assertEquals(1, collector.getMetrics().size()); + + } + + + @Test(timeout = DEFAULT_TIMEOUT) + public void testRemoveService() { + // Given + LlapServiceInstance mockService = mock(LlapServiceInstance.class); + when(mockService.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + + // When + collector.onCreate(mockService, TEST_SEQ_VERSION); + collector.collectMetrics(); + collector.onRemove(mockService, TEST_SEQ_VERSION); + + // Then + assertEquals(0, collector.getMetrics().size()); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testMultipleCollectOnSameInstance() { + // Given + LlapServiceInstance mockService = mock(LlapServiceInstance.class); + when(mockService.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + + // When + collector.onCreate(mockService, TEST_SEQ_VERSION); + collector.collectMetrics(); + LlapMetricsCollector.LlapMetrics metrics1 = collector.getMetrics(mockService.getWorkerIdentity()); + collector.collectMetrics(); + LlapMetricsCollector.LlapMetrics metrics2 = collector.getMetrics(mockService.getWorkerIdentity()); + + // Then + assertEquals(1, collector.getMetrics().size()); + assertNotEquals(metrics1, metrics2); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testCollectOnMultipleInstances() { + // Given + LlapServiceInstance mockService1 = mock(LlapServiceInstance.class); + when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + LlapServiceInstance mockService2 = mock(LlapServiceInstance.class); + when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2); + + // When + collector.onCreate(mockService1, TEST_SEQ_VERSION); + collector.onCreate(mockService2, TEST_SEQ_VERSION); + collector.collectMetrics(); + + // Then + assertEquals(2, collector.getMetrics().size()); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testMultipleCollectOnMultipleInstances() { + // Given + LlapServiceInstance mockService1 = mock(LlapServiceInstance.class); + when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + LlapServiceInstance mockService2 = mock(LlapServiceInstance.class); + when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2); + + // When + collector.onCreate(mockService1, TEST_SEQ_VERSION); + collector.onCreate(mockService2, TEST_SEQ_VERSION); + collector.collectMetrics(); + collector.collectMetrics(); + collector.collectMetrics(); + + // Then + assertEquals(2, collector.getMetrics().size()); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testStartStartsScheduling() { + // When + collector.start(); + + // Then + verify(mockExecutor, times(1)) + .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + + } + + + @Test(timeout = DEFAULT_TIMEOUT) + public void testShutdown() { + // When + collector.shutdown(); + + // Then + verify(mockExecutor, times(1)).shutdownNow(); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testConsumeInitialInstances() throws IOException { + // Given + LlapServiceInstance mockService = mock(LlapServiceInstance.class); + LlapServiceInstanceSet serviceInstances = mock(LlapServiceInstanceSet.class); + LlapRegistryService mockRegistryService = mock(LlapRegistryService.class); + + when(mockService.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + when(serviceInstances.getAll()).thenReturn(Lists.newArrayList(mockService)); + when(mockRegistryService.getInstances()).thenReturn(serviceInstances); + + // When + collector.consumeInitialInstances(mockRegistryService); + collector.collectMetrics(); + + // Then + assertEquals(1, collector.getMetrics().size()); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testStartWontStartSchedulingIfTheConfigValueIsZeroMs() { + // Given + when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.varname, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.defaultStrVal)).thenReturn("0ms"); + collector = new LlapMetricsCollector(mockConf, mockExecutor, mockClientFactory); + + // When + collector.start(); + + // Then + verify(mockExecutor, never()) + .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + + } +}