diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 440d761f03..58f719a5f8 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4358,6 +4358,40 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The listener which is called when new Llap Daemon statistics is received on AM side.\n" + "The listener should implement the " + "org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsListener interface."), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_SERVED_TASKS( + "hive.llap.task.scheduler.blacklisting.metrics.listener.min.served.tasks", 2000, + "If the number of tasks served by a node is below this number then we will ignore the node\n" + + "when calculating the status of the cluster.\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_CHANGE_DELAY( + "hive.llap.task.scheduler.blacklisting.metrics.listener.min.change.delay", "300s", + new TimeValidator(TimeUnit.SECONDS), + "The minimum time which should elapse between blacklisting nodes, in seconds.\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_TIME_THRESHOLD( + "hive.llap.task.scheduler.blacklisting.metrics.listener.time.threshold", 1.5f, + "If the average response time of this node divided by the average response time of all the other nodes\n" + + "is greater than this threshold and the other conditions are satisfied too,\n" + + "then this node should be blacklisted.\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_EMPTY_EXECUTORS( + "hive.llap.task.scheduler.blacklisting.metrics.listener.empty.executors.threshold", 2.0f, + "If a node is slow (hive.llap.task.scheduler.blacklisting.metrics.listener.time.threshold)\n" + + "and the other nodes have\n" + + "hive.llap.task.scheduler.blacklisting.metrics.listener.empty.executors.threshold times more free executors\n" + + "than the configured executors of the slow node and the other conditions are satisfied too,\n" + + "then the node should be blacklisted\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), + LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MAX_LISTED_NODES( + "hive.llap.task.scheduler.blacklisting.metrics.listener.max.listed.nodes", 1, + "The maximum number of blacklisted nodes. If there are at least this number of blacklisted nodes\n" + + "the listener will not blacklist further nodes even if all the conditions are met.\n" + + "Only used if hive.llap.task.scheduler.am.collect.daemon.metrics.listener is set to\n" + + "org.apache.hadoop.hive.llap.tezplugins.metrics.BlacklistingLlapMetricsListener"), LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap", "AM registry name for LLAP task scheduler plugin to register with."), LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "", diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index ea824a1617..2d05bdac4e 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -1,16 +1,21 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + package org.apache.hadoop.hive.llap.registry.impl; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; @@ -19,11 +24,13 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; @@ -142,6 +149,25 @@ public void updateRegistration(Iterable> attributes) t } } + /** + * Locks the Llap Cluster for configuration change for the given time window. + * @param windowStart The beginning of the time window when no other configuration change is allowed. + * @param windowEnd The end of the time window when no other configuration change is allowed. + * @return The result of the change (success if the lock is succeeded, and the next possible + * configuration change time + */ + public ConfigChangeLockResult lockForConfigChange(long windowStart, long windowEnd) { + if (this.registry == null) { + throw new IllegalStateException("Not allowed to call lockForConfigChange before serviceInit"); + } + if (isDynamic) { + LlapZookeeperRegistryImpl zkRegisty = (LlapZookeeperRegistryImpl)registry; + return zkRegisty.lockForConfigChange(windowStart, windowEnd); + } else { + throw new UnsupportedOperationException("Acquiring config lock is only allowed for dynamic registries"); + } + } + public LlapServiceInstanceSet getInstances() throws IOException { return getInstances(0); } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 58a99f4cf7..0ba446df89 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -1,18 +1,26 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + package org.apache.hadoop.hive.llap.registry.impl; +import com.google.common.annotations.VisibleForTesting; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; import org.apache.hadoop.registry.client.binding.RegistryUtils; import com.google.common.collect.Sets; @@ -67,6 +75,8 @@ private final static String NAMESPACE_PREFIX = "llap-"; private static final String SLOT_PREFIX = "slot-"; private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient"; + private static final String CONFIG_CHANGE_PATH = "config-change"; + private static final String CONFIG_CHANGE_NODE = "window-end"; private SlotZnode slotZnode; @@ -75,6 +85,8 @@ // to be used by clients of ServiceRegistry TODO: this is unnecessary private DynamicServiceInstanceSet instances; + private DistributedAtomicLong lockWindowEnd; + public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) { super(instanceName, conf, HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX, @@ -433,4 +445,70 @@ protected String getZkPathUser(Configuration conf) { // rather than relying on RegistryUtils.currentUser(). return HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); } + + /** + * Locks the Llap Cluster for configuration change for the given time window. + * @param windowStart The beginning of the time window when no other configuration change is allowed. + * @param windowEnd The end of the time window when no other configuration change is allowed. + * @return The result of the change (success if the lock is succeeded, and the next possible + * configuration change time + */ + public ConfigChangeLockResult lockForConfigChange(long windowStart, long windowEnd) { + if (windowEnd < windowStart) { + throw new IllegalArgumentException( + "WindowStart=" + windowStart + " can not be smaller than WindowEnd=" + windowEnd); + } + try { + if (lockWindowEnd == null) { + // Create the node with the /llap-sasl/hiveuser/hostname/config-change/next-change path without retry + lockWindowEnd = new DistributedAtomicLong(zooKeeperClient, + String.join("/", workersPath.substring(0, workersPath.lastIndexOf('/')), CONFIG_CHANGE_PATH, + CONFIG_CHANGE_NODE), (i, j, sleeper) -> false); + lockWindowEnd.initialize(0L); + } + AtomicValue current = lockWindowEnd.get(); + if (!current.succeeded()) { + LOG.debug("Can not get the current configuration lock time"); + return new ConfigChangeLockResult(false, -1L); + } + if (current.postValue() > windowStart) { + LOG.debug("Can not lock window {}-{}. Current value is {}.", windowStart, windowEnd, current.postValue()); + return new ConfigChangeLockResult(false, current.postValue()); + } + current = lockWindowEnd.compareAndSet(current.postValue(), windowEnd); + if (!current.succeeded()) { + LOG.debug("Can not lock window {}-{}. Current value is changed to {}.", windowStart, windowEnd, + current.postValue()); + return new ConfigChangeLockResult(false, current.postValue()); + } + return new ConfigChangeLockResult(true, current.postValue()); + } catch (Throwable t) { + LOG.info("Can not reserve configuration change lock", t); + return new ConfigChangeLockResult(false, -1L); + } + } + + public static class ConfigChangeLockResult { + private final boolean success; + private final long nextConfigChangeTime; + + @VisibleForTesting + public ConfigChangeLockResult(boolean success, long nextConfigChangeTime) { + this.success = success; + this.nextConfigChangeTime = nextConfigChangeTime; + } + + public boolean isSuccess() { + return success; + } + + public long getNextConfigChangeTime() { + return nextConfigChangeTime; + } + + @Override + public String toString() { + return "ConfigChangeLockResult [" + success + "," + nextConfigChangeTime + "]"; + } + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java similarity index 100% rename from llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java diff --git llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java new file mode 100644 index 0000000000..ce5c92071e --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.registry.impl; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; + +public class TestLlapRegistryService { + private static MiniLlapCluster cluster = null; + private static HiveConf conf = new HiveConf(); + + @BeforeClass + public static void setUp() throws Exception { + cluster = MiniLlapCluster.create("llap01", null, 1, 2L, false, false, 1L, 1); + HiveConf.setVar(conf, HiveConf.ConfVars.LLAP_DAEMON_XMX_HEADROOM, "1"); + cluster.serviceInit(conf); + cluster.serviceStart(); + } + + @AfterClass + public static void tearDown() throws IOException { + if (cluster != null) { + cluster.serviceStop(); + } + } + + @Test + public void testLockForConfigChange() throws IOException { + LlapRegistryService client1 = null; + LlapRegistryService client2 = null; + ConfigChangeLockResult result; + + try { + client1 = new LlapRegistryService(false); + client1.init(conf); + client1.start(); + + client2 = new LlapRegistryService(false); + client2.init(conf); + client2.start(); + + assertTrue(client1.lockForConfigChange(10000, 20000).isSuccess()); + assertTrue(client2.lockForConfigChange(30000, 40000).isSuccess()); + + // Can not set to before + result = client1.lockForConfigChange(20000, 30000); + assertFalse(result.isSuccess()); + assertEquals(result.getNextConfigChangeTime(), 40000); + + result = client1.lockForConfigChange(30000, 40000); + assertFalse(result.isSuccess()); + assertEquals(result.getNextConfigChangeTime(), 40000); + + result = client1.lockForConfigChange(35000, 45000); + assertFalse(result.isSuccess()); + assertEquals(result.getNextConfigChangeTime(), 40000); + + // Can start from the previous end timestamp + result = client1.lockForConfigChange(40000, 50000); + assertTrue(result.isSuccess()); + assertEquals(result.getNextConfigChangeTime(), 50000); + } finally { + if (client1 != null) { + client1.close(); + } + if (client2 != null) { + client2.close(); + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void testLockForConfigChangeInvalid() throws IOException{ + LlapRegistryService client = null; + + try { + client = new LlapRegistryService(false); + client.init(conf); + client.start(); + + client.lockForConfigChange(20000, 10000); + } finally { + if (client != null) { + client.close(); + } + } + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java new file mode 100644 index 0000000000..d2acc5d054 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Test classes for registry implementations. + */ + +package org.apache.hadoop.hive.llap.registry.impl; diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java new file mode 100644 index 0000000000..7b7efdcd1e --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of MetricsListener which blacklists slow nodes based on the statistics. + */ +public class BlacklistingLlapMetricsListener implements LlapMetricsListener { + private static final Logger LOG = LoggerFactory.getLogger(BlacklistingLlapMetricsListener.class); + private LlapRegistryService registry; + private LlapManagementProtocolClientImplFactory clientFactory; + private int minServedTasksNumber; + private int maxBlacklistedNodes; + private long minConfigChangeDelayMs; + private float timeThreshold; + private float emptyExecutorsThreshold; + @VisibleForTesting + long nextCheckTime = Long.MIN_VALUE; + + @VisibleForTesting + void init(Configuration conf, LlapRegistryService registry, LlapManagementProtocolClientImplFactory clientFactory) { + this.registry = registry; + this.clientFactory = clientFactory; + this.minServedTasksNumber = + HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_SERVED_TASKS); + this.minConfigChangeDelayMs = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MIN_CHANGE_DELAY, + TimeUnit.MILLISECONDS); + this.timeThreshold = + HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_TIME_THRESHOLD); + this.emptyExecutorsThreshold = + HiveConf.getFloatVar(conf, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_EMPTY_EXECUTORS); + this.maxBlacklistedNodes = + HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_BLACKLISTING_METRICS_LISTENER_MAX_LISTED_NODES); + + Preconditions.checkArgument(minServedTasksNumber > 0, + "Minimum served tasks should be greater than 0"); + Preconditions.checkArgument(minConfigChangeDelayMs > 0, + "Minimum config change delay should be greater than 0"); + Preconditions.checkArgument(timeThreshold > 1.0f, + "The time threshold should be greater than 1"); + Preconditions.checkArgument(maxBlacklistedNodes > 0, + "The maximum number of blacklisted node should be greater than 1"); + Preconditions.checkNotNull(registry, "Registry should not be null"); + Preconditions.checkNotNull(clientFactory, "ClientFactory should not be null"); + + LOG.info("BlacklistingLlapMetricsListener initialized with " + + "minServedTasksNumber={}, " + + "minConfigChangeDelayMs={}, " + + "timeThreshold={}, " + + "emptyExecutorsThreshold={}, " + + "maxBlacklistedNodes={}", + minServedTasksNumber, minConfigChangeDelayMs, timeThreshold, emptyExecutorsThreshold, maxBlacklistedNodes); + } + + @Override + public void init(Configuration conf, LlapRegistryService registry) { + init(conf, registry, LlapManagementProtocolClientImplFactory.basicInstance(conf)); + } + + @Override + public void newDaemonMetrics(String workerIdentity, LlapMetricsCollector.LlapMetrics newMetrics) { + // no op + } + + @Override + public void newClusterMetrics(Map newMetrics) { + long sumAverageTime = 0; + long sumEmptyExecutors = 0; + long maxAverageTime = 0; + long maxAverageTimeEmptyExecutors = 0; + long maxAverageTimeMaxExecutors = 0; + long workerNum = 0; + int blacklistedNodes = 0; + String maxAverageTimeIdentity = null; + for (String workerIdentity : newMetrics.keySet()) { + Map metrics = newMetrics.get(workerIdentity).getMetrics(); + long requestHandled = metrics.get(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name()); + long averageTime = metrics.get(LlapDaemonExecutorInfo.AverageResponseTime.name()); + long emptyExecutor = + metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name()); + long maxExecutors = metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutors.name()); + + LOG.debug("Checking node {} with data: " + + "requestHandled={}, " + + "averageTime={}, " + + "emptyExecutors={}, " + + "maxExecutors={}", + workerIdentity, requestHandled, averageTime, emptyExecutor, maxExecutors); + + if (maxExecutors == 0) { + blacklistedNodes++; + if (blacklistedNodes >= this.maxBlacklistedNodes) { + LOG.info("Already enough blacklisted nodes {}. Skipping.", blacklistedNodes); + return; + } else { + // We do not interested in the data for the blacklisted nodes + continue; + } + } + + if (requestHandled > this.minServedTasksNumber) { + workerNum++; + sumAverageTime += averageTime; + if (averageTime > maxAverageTime) { + maxAverageTime = averageTime; + maxAverageTimeEmptyExecutors = emptyExecutor; + maxAverageTimeMaxExecutors = maxExecutors; + maxAverageTimeIdentity = workerIdentity; + } + sumEmptyExecutors += emptyExecutor; + } + } + + // If we do not have enough data then return. + if (workerNum < 2) { + return; + } + + LOG.debug("Found slowest node {} with data: " + + "sumAverageTime={}, " + + "sumEmptyExecutors={}, " + + "maxAverageTime={}, " + + "maxAverageTimeEmptyExecutors={}, " + + "maxAverageTimeMaxExecutors={}, " + + "workerNum={}, " + + "maxAverageTimeIdentity={}, " + + "blacklistedNodes={}", + sumAverageTime, sumEmptyExecutors, maxAverageTime, maxAverageTimeEmptyExecutors, + maxAverageTimeMaxExecutors, workerNum, maxAverageTimeIdentity, blacklistedNodes); + // Check if the slowest node is at least timeThreshold times slower than the average + double averageTimeWithoutSlowest = (double)(sumAverageTime - maxAverageTime) / (workerNum - 1); + if (averageTimeWithoutSlowest * this.timeThreshold < maxAverageTime) { + // We have a candidate, let's see if we have enough empty executors. + long emptyExecutorsWithoutSlowest = sumEmptyExecutors - maxAverageTimeEmptyExecutors; + if (emptyExecutorsWithoutSlowest > maxAverageTimeMaxExecutors * this.emptyExecutorsThreshold) { + // Seems like a good candidate, let's try to blacklist + try { + LOG.debug("Trying to blacklist node: " + maxAverageTimeIdentity); + setCapacity(maxAverageTimeIdentity, 0, 0); + } catch (Throwable t) { + LOG.debug("Can not blacklist node: " + maxAverageTimeIdentity, t); + } + } + } + } + + protected void setCapacity(String workerIdentity, int newExecutorNum, int newWaitQueueSize) + throws IOException, ServiceException { + long currentTime = System.currentTimeMillis(); + if (currentTime > nextCheckTime) { + LlapZookeeperRegistryImpl.ConfigChangeLockResult lockResult = + registry.lockForConfigChange(currentTime, currentTime + this.minConfigChangeDelayMs); + + LOG.debug("Got result for lock check: {}", lockResult); + if (lockResult.isSuccess()) { + LOG.info("Setting capacity for workerIdentity={} to newExecutorNum={}, newWaitQueueSize={}", + workerIdentity, newExecutorNum, newWaitQueueSize); + LlapServiceInstance serviceInstance = registry.getInstances().getInstance(workerIdentity); + LlapManagementProtocolClientImpl client = clientFactory.create(serviceInstance); + client.setCapacity(null, + SetCapacityRequestProto.newBuilder() + .setExecutorNum(newExecutorNum) + .setQueueSize(newWaitQueueSize) + .build()); + } + if (lockResult.getNextConfigChangeTime() > -1L) { + nextCheckTime = lockResult.getNextConfigChangeTime(); + } + } else { + LOG.debug("Skipping check. Current time {} and we are waiting for {}.", currentTime, nextCheckTime); + } + } +} diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java new file mode 100644 index 0000000000..22f08246c9 --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; + +import javax.net.SocketFactory; +import java.util.concurrent.TimeUnit; + +/** + * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance. + */ +public class LlapManagementProtocolClientImplFactory { + private final Configuration conf; + private final RetryPolicy retryPolicy; + private final SocketFactory socketFactory; + + public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy, + SocketFactory socketFactory) { + this.conf = conf; + this.retryPolicy = retryPolicy; + this.socketFactory = socketFactory; + } + + public static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) { + return new LlapManagementProtocolClientImplFactory( + conf, + RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS), + NetUtils.getDefaultSocketFactory(conf)); + } + + public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) { + return new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), + serviceInstance.getManagementPort(), retryPolicy, + socketFactory); + } +} diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java index 2ca7ed69e0..a1c02df8db 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java @@ -29,16 +29,11 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; -import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.SocketFactory; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -95,7 +90,7 @@ public LlapMetricsCollector(Configuration conf, LlapRegistryService registry) { HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS); String listenerClass = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER); - if (Strings.isBlank(listenerClass)) { + if (listenerClass == null || listenerClass.isEmpty()) { listener = null; } else { try { @@ -210,53 +205,35 @@ public LlapMetrics getMetrics(String workerIdentity) { return Collections.unmodifiableMap(instanceStatisticsMap); } - /** - * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance. - */ - public static class LlapManagementProtocolClientImplFactory { - private final Configuration conf; - private final RetryPolicy retryPolicy; - private final SocketFactory socketFactory; - - public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy, - SocketFactory socketFactory) { - this.conf = conf; - this.retryPolicy = retryPolicy; - this.socketFactory = socketFactory; - } - - private static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) { - return new LlapManagementProtocolClientImplFactory( - conf, - RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS), - NetUtils.getDefaultSocketFactory(conf)); - } - - public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) { - LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), - serviceInstance.getManagementPort(), retryPolicy, - socketFactory); - return client; - } - } - /** * Stores the metrics retrieved from the llap daemons, along with the retrieval timestamp. */ public static class LlapMetrics { private final long timestamp; - private final LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics; + private final Map metrics; + + @VisibleForTesting + LlapMetrics(long timestamp, Map metrics) { + this.timestamp = timestamp; + this.metrics = metrics; + } public LlapMetrics(LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics) { this.timestamp = System.currentTimeMillis(); - this.metrics = metrics; + this.metrics = new HashMap(metrics.getMetricsCount()); + metrics.getMetricsList().forEach(entry -> this.metrics.put(entry.getKey(), entry.getValue())); } public long getTimestamp() { return timestamp; } - public LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto getMetrics() { + /** + * The metric values in the map. The keys are the enum names (See: LlapDaemonExecutorInfo), and + * the values are the actual values. + * @return + */ + public Map getMetrics() { return metrics; } } diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java new file mode 100644 index 0000000000..dec75865c9 --- /dev/null +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityResponseProto; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo; +import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult; +import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector.LlapMetrics; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +/** + * Test class to test BlacklistingLlapMetricsListener object. + */ +public class TestBlacklistingLlapMetricsListener { + private static final SetCapacityResponseProto TEST_RESPONSE = + SetCapacityResponseProto.getDefaultInstance(); + + private BlacklistingLlapMetricsListener listener; + + private Configuration conf; + + @Mock + private LlapRegistryService mockRegistry; + + @Mock + private LlapManagementProtocolClientImplFactory mockClientFactory; + + @Mock + private LlapManagementProtocolClientImpl mockClient; + + @Mock + private LlapServiceInstanceSet mockInstanceSet; + + @Before + public void setUp() throws Exception { + initMocks(this); + + conf = new HiveConf(); + when(mockRegistry.getInstances()).thenReturn(mockInstanceSet); + when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn( + new ConfigChangeLockResult(true, Long.MIN_VALUE)); + when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient); + when(mockClient.setCapacity( + any(RpcController.class), + any(SetCapacityRequestProto.class))).thenReturn(TEST_RESPONSE); + + listener = new BlacklistingLlapMetricsListener(); + listener.init(conf, mockRegistry, mockClientFactory); + } + + @Test(timeout = 2000) + public void testBlacklist() throws ServiceException { + Map data = generateClusterMetrics(); + listener.newClusterMetrics(data); + + // Then + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + + verify(mockClient, times(1)).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + verify(mockInstanceSet, times(1)).getInstance(argumentCaptor.capture()); + assertEquals("3", argumentCaptor.getValue()); + } + + @Test(timeout = 2000) + public void testNotEnoughCapacity() throws ServiceException { + Map data = generateClusterMetrics(); + data.get("0").getMetrics().put(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name(), 5L); + listener.newClusterMetrics(data); + + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + } + + @Test(timeout = 2000) + public void testNoOutstandingResponseTime() throws ServiceException { + Map data = generateClusterMetrics(); + data.get("3").getMetrics().put(LlapDaemonExecutorInfo.AverageResponseTime.name(), 1500L); + listener.newClusterMetrics(data); + + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + } + + @Test(timeout = 2000) + public void testAlreadyBlacklisted() throws ServiceException { + Map data = generateClusterMetrics(); + data.get("3").getMetrics().put(LlapDaemonExecutorInfo.ExecutorNumExecutors.name(), 0L); + listener.newClusterMetrics(data); + + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + } + + @Test(timeout = 2000) + public void testCheckTime() throws Exception { + Map data = generateClusterMetrics(); + + // Return that we can not yet blacklist a node + long targetTime = System.currentTimeMillis() + 10000; + when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn( + new ConfigChangeLockResult(false, targetTime)); + listener.newClusterMetrics(data); + + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + assertEquals(targetTime, listener.nextCheckTime); + + // reset mock stuff + reset(mockRegistry); + when(mockRegistry.getInstances()).thenReturn(mockInstanceSet); + + // We will not try to set the capacity, or even lock until the time is reached + listener.newClusterMetrics(data); + verify(mockRegistry, never()).lockForConfigChange(anyLong(), anyLong()); + verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + + // If the time is reached, then we lock and blacklist + listener.nextCheckTime = System.currentTimeMillis() - 1; + when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn( + new ConfigChangeLockResult(true, targetTime)); + listener.newClusterMetrics(data); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + verify(mockClient, times(1)).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class)); + verify(mockInstanceSet, times(1)).getInstance(argumentCaptor.capture()); + assertEquals("3", argumentCaptor.getValue()); + } + + private Map generateClusterMetrics() { + Map data = new HashMap<>(4); + data.put("0", generateSingleNodeMetrics(3000, 1000, 7, 10)); + data.put("1", generateSingleNodeMetrics(3000, 1000, 7, 10)); + data.put("2", generateSingleNodeMetrics(3000, 1000, 7, 10)); + data.put("3", generateSingleNodeMetrics(3000, 2000, 5, 10)); + return data; + } + + private LlapMetrics generateSingleNodeMetrics(long totalRequests, long averageResponseTime, + long availableExecutors, long allExecutors) { + Map metricsMap = new HashMap<>(4); + metricsMap.put(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name(), totalRequests); + metricsMap.put(LlapDaemonExecutorInfo.AverageResponseTime.name(), averageResponseTime); + metricsMap.put(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name(), availableExecutors); + metricsMap.put(LlapDaemonExecutorInfo.ExecutorNumExecutors.name(), allExecutors); + return new LlapMetrics(0, metricsMap); + } +} diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java index d9e1f713ec..f212ac6638 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java @@ -63,7 +63,7 @@ private Configuration mockConf; @Mock - private LlapMetricsCollector.LlapManagementProtocolClientImplFactory mockClientFactory; + private LlapManagementProtocolClientImplFactory mockClientFactory; @Mock private LlapManagementProtocolClientImpl mockClient;