diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8d762c9fe4..71927f2e0b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4356,6 +4356,36 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER("hive.llap.task.scheduler.am.collect.daemon.metrics.listener", "", "The listener which is called when new Llap Daemon statistics is received on AM side.\n" + "The listener should implement the org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsListener interface."), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_SERVED_TASKS( + "hive.llap.task.scheduler.blacklisting.metrics.listener.min.served.tasks", 2000, + "The minimum number of served tasks served by a node before starting to consider a it in decision making.\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_CHANGE_DELAY( + "hive.llap.task.scheduler.blacklisting.metrics.listener.min.change.delay", "300s", + new TimeValidator(TimeUnit.SECONDS), + "The minimum time which should be elapsed between configuration changes in seconds.\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_TIME_THRESHOLD( + "hive.llap.task.scheduler.blacklisting.metrics.listener.time.threshold", 1.5f, + "If the average response time of this node divided by the average response time of all the other nodes\n" + + "is greater than this threshold this node could be blacklisted.\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_EMPTY_EXECUTORS( + "hive.llap.task.scheduler.blacklisting.metrics.listener.empty.executors.threshold", 2.0f, + "If a node is slow (time.threshold) and the other nodes have the\n" + + "hive.llap.task.scheduler.blacklisting.metrics.listener.empty.executors.threshold time more executors\n" + + "than the slow node then the node could be blacklisted\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MAX_LISTED_NODES( + "hive.llap.task.scheduler.blacklisting.metrics.listener.max.listed.nodes", 1, + "The maximum number of blacklisted nodes. If there are at least this number of blacklisted nodes\n" + + "the listener will not balcklist further nodes even if all the conditions are met.\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap", "AM registry name for LLAP task scheduler plugin to register with."), LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "", diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 3bda40b7e9..f3c0075671 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -19,12 +19,13 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; -import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; @@ -132,6 +133,18 @@ private void unregisterWorker() throws IOException { } } + public ConfigChangeLockResult lockForConfigChange(long nextMinConfigChangeTime) { + if (this.registry == null) { + throw new IllegalStateException("Not allowed to call lockForConfigChange before serviceInit"); + } + if (isDynamic) { + LlapZookeeperRegistryImpl zkRegisty = (LlapZookeeperRegistryImpl)registry; + return zkRegisty.lockForConfigChange(nextMinConfigChangeTime); + } else { + throw new UnsupportedOperationException("Acquiring config lock is only allowed for dynamic registries"); + } + } + public LlapServiceInstanceSet getInstances() throws IOException { return getInstances(0); } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index f5d6202e6f..bf3fcc3ee0 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -13,6 +13,9 @@ */ package org.apache.hadoop.hive.llap.registry.impl; +import com.google.common.annotations.VisibleForTesting; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; import org.apache.hadoop.registry.client.binding.RegistryUtils; import com.google.common.collect.Sets; @@ -67,6 +70,8 @@ private final static String NAMESPACE_PREFIX = "llap-"; private static final String SLOT_PREFIX = "slot-"; private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient"; + private static final String CONFIG_CHANGE_PATH = "config-change"; + private static final String CONFIG_CHANGE_NODE = "next-change"; private SlotZnode slotZnode; @@ -74,6 +79,8 @@ // to be used by clients of ServiceRegistry TODO: this is unnecessary private DynamicServiceInstanceSet instances; + private DistributedAtomicLong nextChangeTime; + public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { super(instanceName, conf, HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX, @@ -416,4 +423,65 @@ protected String getZkPathUser(Configuration conf) { // rather than relying on RegistryUtils.currentUser(). return HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); } + + /** + * Locks the Llap Cluster for configuration change by setting the next possible configuration + * change time. Until this time is reached the configuration should not be changed. + * @param nextMinConfigChangeTime The next time when the cluster can be reconfigured + * @return The result of the change (success if the lock is succeeded, and the next possible + * configuration change time + */ + public ConfigChangeLockResult lockForConfigChange(long nextMinConfigChangeTime) { + try { + if (nextChangeTime == null) { + // Create the node with the /llap-sasl/hiveuser/hostname/config-change/next-change path without retry + nextChangeTime = new DistributedAtomicLong(zooKeeperClient, + String.join("/", workersPath.substring(0, workersPath.lastIndexOf('/')), CONFIG_CHANGE_PATH, + CONFIG_CHANGE_NODE), (i, j, sleeper) -> false); + nextChangeTime.initialize(0L); + } + AtomicValue current = nextChangeTime.get(); + if (!current.succeeded()) { + LOG.debug("Can not get the current configuration lock time"); + return new ConfigChangeLockResult(false, -1L); + } + if (current.postValue() >= nextMinConfigChangeTime) { + LOG.debug("Can not set {}. Current value is {}.", nextMinConfigChangeTime, current.postValue()); + return new ConfigChangeLockResult(false, current.postValue()); + } + current = nextChangeTime.compareAndSet(current.postValue(), nextMinConfigChangeTime); + if (!current.succeeded()) { + LOG.debug("Can not set {}. Current value is changed to {}.", nextMinConfigChangeTime, current.postValue()); + return new ConfigChangeLockResult(false, current.postValue()); + } + return new ConfigChangeLockResult(true, current.postValue()); + } catch (Throwable t) { + LOG.info("Can not reserve configuration change lock", t); + return new ConfigChangeLockResult(false, -1L); + } + } + + public static class ConfigChangeLockResult { + boolean success; + long nextConfigChangeTime; + + @VisibleForTesting + public ConfigChangeLockResult(boolean success, long nextConfigChangeTime) { + this.success = success; + this.nextConfigChangeTime = nextConfigChangeTime; + } + + public boolean isSuccess() { + return success; + } + + public long getNextConfigChangeTime() { + return nextConfigChangeTime; + } + + @Override + public String toString() { + return "ConfigChangeLockResult [" + success + "," + nextConfigChangeTime + "]"; + } + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java similarity index 100% rename from llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java diff --git llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java new file mode 100644 index 0000000000..31688dca86 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry.impl; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; + +public class TestLlapRegistryService { + private static MiniLlapCluster cluster = null; + private static HiveConf conf = new HiveConf(); + + @BeforeClass + public static void setUp() throws Exception { + cluster = MiniLlapCluster.create("llap01", null, 1, 2L, false, false, 1L, 1); + HiveConf.setVar(conf, HiveConf.ConfVars.LLAP_DAEMON_XMX_HEADROOM, "1"); + cluster.serviceInit(conf); + cluster.serviceStart(); + } + + @AfterClass + public static void tearDown() throws IOException { + if (cluster != null) { + cluster.serviceStop(); + } + } + + @Test + public void testLockForConfigChange() { + LlapRegistryService client1 = new LlapRegistryService(false); + client1.init(conf); + client1.start(); + + LlapRegistryService client2 = new LlapRegistryService(false); + client2.init(conf); + client2.start(); + ConfigChangeLockResult result = null; + + assertTrue(client1.lockForConfigChange(10000).success); + assertTrue(client2.lockForConfigChange(30000).success); + // Can not set to before + result = client1.lockForConfigChange(20000); + assertFalse(result.success); + assertEquals(result.nextConfigChangeTime, 30000); + // Can not set to the same timestamp + result = client1.lockForConfigChange(30000); + assertFalse(result.success); + assertEquals(result.nextConfigChangeTime, 30000); + // Check return value in case of success + result = client1.lockForConfigChange(40000); + assertTrue(result.success); + assertEquals(result.nextConfigChangeTime, 40000); + } +} diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java new file mode 100644 index 0000000000..a540212030 --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java @@ -0,0 +1,200 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of MetricsListener which blacklists slow nodes based on the statistics. + */ +public class BlacklistingLlapMetricsListener implements LlapMetricsListener { + private static final Logger LOG = LoggerFactory.getLogger(BlacklistingLlapMetricsListener.class); + private LlapRegistryService registry; + private LlapManagementProtocolClientImplFactory clientFactory; + private int minServedTasksNumber; + private int maxBlacklistedNodes; + private long minConfigChangeDelayMs; + private float timeThreshold; + private float emptyExecutorsThreshold; + @VisibleForTesting + long nextCheckTime = Long.MIN_VALUE; + + @VisibleForTesting + void init(Configuration conf, LlapRegistryService registry, LlapManagementProtocolClientImplFactory clientFactory) { + this.registry = registry; + this.clientFactory = clientFactory; + this.minServedTasksNumber = + HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_SERVED_TASKS); + this.minConfigChangeDelayMs = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_CHANGE_DELAY, + TimeUnit.MILLISECONDS); + this.timeThreshold = + HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_TIME_THRESHOLD); + this.emptyExecutorsThreshold = + HiveConf.getFloatVar(conf, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_EMPTY_EXECUTORS); + this.maxBlacklistedNodes = + HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MAX_LISTED_NODES); + + Preconditions.checkArgument(minServedTasksNumber > 0, + "Minimum served tasks should be greater than 0"); + Preconditions.checkArgument(minConfigChangeDelayMs > 0, + "Minimum config change delay should be greater than 0"); + Preconditions.checkArgument(timeThreshold > 1.0f, + "The time threshold should be greater than 1"); + Preconditions.checkArgument(maxBlacklistedNodes > 0, + "The maximum number of blacklisted node should be greater than 1"); + + LOG.debug("BlacklistingLlapMetricsListener initialized with " + + "minServedTasksNumber={}, " + + "minConfigChangeDelayMs={}, " + + "timeThreshold={}, " + + "emptyExecutorsThreshold={}, " + + "maxBlacklistedNodes={}", + minServedTasksNumber, minConfigChangeDelayMs, timeThreshold, emptyExecutorsThreshold, maxBlacklistedNodes); + } + + @Override + public void init(Configuration conf, LlapRegistryService registry) { + init(conf, registry, LlapManagementProtocolClientImplFactory.basicInstance(conf)); + } + + @Override + public void newDaemonMetrics(String workerIdentity, LlapMetricsCollector.LlapMetrics newMetrics) { + // no op + } + + @Override + public void newClusterMetrics(Map newMetrics) { + long sumAverageTime = 0; + long sumEmptyExecutors = 0; + long maxAverageTime = 0; + long maxAverageTimeEmptyExecutors = 0; + long maxAverageTimeMaxExecutors = 0; + long workerNum = 0; + int blacklistedNodes = 0; + String maxAverageTimeIdentity = null; + for (String workerIdentity : newMetrics.keySet()) { + Map metrics = newMetrics.get(workerIdentity).getMetrics(); + long requestHandled = metrics.get(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name()); + long averageTime = metrics.get(LlapDaemonExecutorInfo.AverageResponseTime.name()); + long emptyExecutor = + metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name()); + long maxExecutors = metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance.name()); + + LOG.debug("Checking node {} with data: " + + "requestHandled={}, " + + "averageTime={}, " + + "emptyExecutors={}, " + + "maxExecutors={}", + workerIdentity, requestHandled, averageTime, emptyExecutor, maxExecutors); + + if (maxExecutors == 0) { + blacklistedNodes++; + if (blacklistedNodes >= this.maxBlacklistedNodes) { + LOG.debug("Already too many blacklisted nodes. Skipping."); + return; + } + } + + if (requestHandled > this.minServedTasksNumber) { + workerNum++; + sumAverageTime += averageTime; + if ( averageTime > maxAverageTime) { + maxAverageTime = averageTime; + maxAverageTimeEmptyExecutors = emptyExecutor; + maxAverageTimeMaxExecutors = maxExecutors; + maxAverageTimeIdentity = workerIdentity; + } + sumEmptyExecutors += emptyExecutor; + } + } + + // If we do not have enough data then return. + if (workerNum < 2) { + return; + } + + LOG.debug("Found slowest node {} with data: " + + "sumAverageTime={}, " + + "sumEmptyExecutors={}, " + + "maxAverageTime={}, " + + "maxAverageTimeEmptyExecutors={}, " + + "maxAverageTimeMaxExecutors={}, " + + "workerNum={}, " + + "maxAverageTimeIdentity={}, " + + "blacklistedNodes={}", + sumAverageTime, sumEmptyExecutors, maxAverageTime, maxAverageTimeEmptyExecutors, + maxAverageTimeMaxExecutors, workerNum, maxAverageTimeIdentity, blacklistedNodes); + // Check if the slowest node is at least timeThreshold times slower than the average + long averageTimeWithoutSlowest = (sumAverageTime - maxAverageTime) / (workerNum - 1); + if (averageTimeWithoutSlowest * this.timeThreshold < maxAverageTime) { + // We have a candidate, let's see if we have enough empty executors. + long emptyExecutorsWithoutSlowest = sumEmptyExecutors - maxAverageTimeEmptyExecutors; + if (emptyExecutorsWithoutSlowest > maxAverageTimeMaxExecutors * this.emptyExecutorsThreshold) { + // Seems like a good candidate, let's try to blacklist + try { + LOG.debug("Trying to blacklist node: " + maxAverageTimeIdentity); + setCapacity(maxAverageTimeIdentity, 0, 0); + } catch (Throwable t) { + LOG.debug("Can not blacklist node: " + maxAverageTimeIdentity, t); + } + } + } + } + + protected void setCapacity(String workerIdentity, int newExecutorNum, int newWaitQueueSize) + throws IOException, ServiceException { + long currentTime = System.currentTimeMillis(); + if (currentTime > nextCheckTime) { + LlapZookeeperRegistryImpl.ConfigChangeLockResult lockResult = + registry.lockForConfigChange(currentTime + this.minConfigChangeDelayMs); + + LOG.debug("Got result for lock check: {}", lockResult); + if (lockResult.isSuccess()) { + LOG.info("Setting capacity for workerIdentity={} to newExecutorNum={}, newWaitQueueSize={}", + workerIdentity, newExecutorNum, newWaitQueueSize); + LlapServiceInstance serviceInstance = registry.getInstances().getInstance(workerIdentity); + LlapManagementProtocolClientImpl client = clientFactory.create(serviceInstance); + client.setCapacity(null, + SetCapacityRequestProto.newBuilder() + .setExecutorNum(newExecutorNum) + .setQueueSize(newWaitQueueSize) + .build()); + } + if (lockResult.getNextConfigChangeTime() > -1L) { + nextCheckTime = lockResult.getNextConfigChangeTime(); + } + } else { + LOG.debug("Skipping check. Current time {} and we are waiting for {}.", currentTime, nextCheckTime); + } + } +} diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java new file mode 100644 index 0000000000..586748f3dc --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; + +import javax.net.SocketFactory; +import java.util.concurrent.TimeUnit; + +/** + * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance. + */ +public class LlapManagementProtocolClientImplFactory { + private final Configuration conf; + private final RetryPolicy retryPolicy; + private final SocketFactory socketFactory; + + public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy, + SocketFactory socketFactory) { + this.conf = conf; + this.retryPolicy = retryPolicy; + this.socketFactory = socketFactory; + } + + public static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) { + return new LlapManagementProtocolClientImplFactory( + conf, + RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS), + NetUtils.getDefaultSocketFactory(conf)); + } + + public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) { + LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), + serviceInstance.getManagementPort(), retryPolicy, + socketFactory); + return client; + } +} \ No newline at end of file diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java index a050b5b3ff..8f5b6748af 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java @@ -25,21 +25,15 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.SocketFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -202,53 +196,30 @@ public LlapMetrics getMetrics(String workerIdentity) { return Collections.unmodifiableMap(instanceStatisticsMap); } - /** - * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance. - */ - public static class LlapManagementProtocolClientImplFactory { - private final Configuration conf; - private final RetryPolicy retryPolicy; - private final SocketFactory socketFactory; - - public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy, - SocketFactory socketFactory) { - this.conf = conf; - this.retryPolicy = retryPolicy; - this.socketFactory = socketFactory; - } - - private static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) { - return new LlapManagementProtocolClientImplFactory( - conf, - RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS), - NetUtils.getDefaultSocketFactory(conf)); - } - - public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) { - LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), - serviceInstance.getManagementPort(), retryPolicy, - socketFactory); - return client; - } - } - /** * Stores the metrics retrieved from the llap daemons, along with the retrieval timestamp. */ public static class LlapMetrics { private final long timestamp; - private final LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics; + private final Map metrics; + + @VisibleForTesting + LlapMetrics(long timestamp, Map metrics) { + this.timestamp = timestamp; + this.metrics = metrics; + } public LlapMetrics(LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics) { this.timestamp = System.currentTimeMillis(); - this.metrics = metrics; + this.metrics = new HashMap(metrics.getMetricsCount()); + metrics.getMetricsList().forEach(entry -> this.metrics.put(entry.getKey(), entry.getValue())); } public long getTimestamp() { return timestamp; } - public LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto getMetrics() { + public Map getMetrics() { return metrics; } } diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java new file mode 100644 index 0000000000..535dfbcff8 --- /dev/null +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java @@ -0,0 +1,173 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityResponseProto; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult; +import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector.LlapMetrics; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestBlacklistingLlapMetricsListener { + private static final SetCapacityResponseProto TEST_RESPONSE = + SetCapacityResponseProto.getDefaultInstance(); + + private BlacklistingLlapMetricsListener listener; + + private Configuration conf; + + @Mock + private LlapRegistryService mockRegistry; + + @Mock + private LlapManagementProtocolClientImplFactory mockClientFactory; + + @Mock + private LlapManagementProtocolClientImpl mockClient; + + @Mock + private LlapServiceInstanceSet mockInstanceSet; + + @Before + public void setUp() throws Exception { + initMocks(this); + + conf = new HiveConf(); + when(mockRegistry.getInstances()).thenReturn(mockInstanceSet); + when(mockRegistry.lockForConfigChange(anyLong())).thenReturn(new ConfigChangeLockResult(true, Long.MIN_VALUE)); + when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient); + when(mockClient.setCapacity( + any(RpcController.class), + any(SetCapacityRequestProto.class))).thenReturn(TEST_RESPONSE); + + listener = new BlacklistingLlapMetricsListener(); + listener.init(conf, mockRegistry, mockClientFactory); + } + + @Test(timeout = 2000) + public void testBlacklist() throws ServiceException { + Map data = generateClusterMetrics(); + listener.newClusterMetrics(data); + + // Then + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + + verify(mockClient, times(1)).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + verify(mockInstanceSet, times(1)).getInstance(argumentCaptor.capture()); + assertEquals("3", argumentCaptor.getValue()); + } + + @Test(timeout = 2000) + public void testNotEnoughCapacity() throws ServiceException { + Map data = generateClusterMetrics(); + data.get("0").getMetrics().put(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name(), 5L); + listener.newClusterMetrics(data); + + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + } + + @Test(timeout = 2000) + public void testNoOutstandingResponseTime() throws ServiceException { + Map data = generateClusterMetrics(); + data.get("3").getMetrics().put(LlapDaemonExecutorInfo.AverageResponseTime.name(), 1500L); + listener.newClusterMetrics(data); + + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + } + + @Test(timeout = 2000) + public void testAlreadyBlacklisted() throws ServiceException { + Map data = generateClusterMetrics(); + data.get("3").getMetrics().put(LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance.name(), 0L); + listener.newClusterMetrics(data); + + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + } + + @Test(timeout = 2000) + public void testCheckTime() throws Exception { + Map data = generateClusterMetrics(); + + // Return that we can not yet blacklist a node + long targetTime = System.currentTimeMillis() + 10000; + when(mockRegistry.lockForConfigChange(anyLong())).thenReturn(new ConfigChangeLockResult(false, targetTime)); + listener.newClusterMetrics(data); + + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + assertEquals(targetTime, listener.nextCheckTime); + + // reset mock stuff + reset(mockRegistry); + when(mockRegistry.getInstances()).thenReturn(mockInstanceSet); + + // We will not try to set the capacity, or even lock until the time is reached + listener.newClusterMetrics(data); + verify(mockRegistry, never()).lockForConfigChange(anyLong()); + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + + // If the time is reached, then we lock and blacklist + listener.nextCheckTime = System.currentTimeMillis() - 1; + when(mockRegistry.lockForConfigChange(anyLong())).thenReturn(new ConfigChangeLockResult(true, targetTime)); + listener.newClusterMetrics(data); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + verify(mockClient, times(1)).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + verify(mockInstanceSet, times(1)).getInstance(argumentCaptor.capture()); + assertEquals("3", argumentCaptor.getValue()); + } + + private Map generateClusterMetrics() { + Map data = new HashMap<>(4); + data.put("0", generateSingleNodeMetrics(3000, 1000, 7, 10)); + data.put("1", generateSingleNodeMetrics(3000, 1000, 7, 10)); + data.put("2", generateSingleNodeMetrics(3000, 1000, 7, 10)); + data.put("3", generateSingleNodeMetrics(3000, 2000, 5, 10)); + return data; + } + + private LlapMetrics generateSingleNodeMetrics(long totalRequests, long averageResponseTime, + long availableExecutors, long allExecutors) { + Map metricsMap = new HashMap<>(4); + metricsMap.put(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name(), totalRequests); + metricsMap.put(LlapDaemonExecutorInfo.AverageResponseTime.name(), averageResponseTime); + metricsMap.put(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name(), availableExecutors); + metricsMap.put(LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance.name(), allExecutors); + return new LlapMetrics(0, metricsMap); + } +} diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java index e24c979baa..48eb2cdbf7 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java @@ -63,7 +63,7 @@ private Configuration mockConf; @Mock - private LlapMetricsCollector.LlapManagementProtocolClientImplFactory mockClientFactory; + private LlapManagementProtocolClientImplFactory mockClientFactory; @Mock private LlapManagementProtocolClientImpl mockClient;