diff --git llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java similarity index 100% rename from llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java rename to llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java new file mode 100644 index 0000000000..586748f3dc --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; + +import javax.net.SocketFactory; +import java.util.concurrent.TimeUnit; + +/** + * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance. + */ +public class LlapManagementProtocolClientImplFactory { + private final Configuration conf; + private final RetryPolicy retryPolicy; + private final SocketFactory socketFactory; + + public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy, + SocketFactory socketFactory) { + this.conf = conf; + this.retryPolicy = retryPolicy; + this.socketFactory = socketFactory; + } + + public static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) { + return new LlapManagementProtocolClientImplFactory( + conf, + RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS), + NetUtils.getDefaultSocketFactory(conf)); + } + + public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) { + LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), + serviceInstance.getManagementPort(), retryPolicy, + socketFactory); + return client; + } +} \ No newline at end of file diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java index a050b5b3ff..0f6bcf9508 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java @@ -202,53 +202,24 @@ public LlapMetrics getMetrics(String workerIdentity) { return Collections.unmodifiableMap(instanceStatisticsMap); } - /** - * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance. - */ - public static class LlapManagementProtocolClientImplFactory { - private final Configuration conf; - private final RetryPolicy retryPolicy; - private final SocketFactory socketFactory; - - public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy, - SocketFactory socketFactory) { - this.conf = conf; - this.retryPolicy = retryPolicy; - this.socketFactory = socketFactory; - } - - private static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) { - return new LlapManagementProtocolClientImplFactory( - conf, - RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS), - NetUtils.getDefaultSocketFactory(conf)); - } - - public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) { - LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(), - serviceInstance.getManagementPort(), retryPolicy, - socketFactory); - return client; - } - } - /** * Stores the metrics retrieved from the llap daemons, along with the retrieval timestamp. */ public static class LlapMetrics { private final long timestamp; - private final LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics; + private final Map metrics; public LlapMetrics(LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics) { this.timestamp = System.currentTimeMillis(); - this.metrics = metrics; + this.metrics = new HashMap(metrics.getMetricsCount()); + metrics.getMetricsList().forEach(entry -> this.metrics.put(entry.getKey(), entry.getValue())); } public long getTimestamp() { return timestamp; } - public LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto getMetrics() { + public Map getMetrics() { return metrics; } } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/ZooKeeperLlapMetricsListener.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/ZooKeeperLlapMetricsListener.java new file mode 100644 index 0000000000..3addb82cb1 --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/ZooKeeperLlapMetricsListener.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto; +import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo; +import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +public class ZooKeeperLlapMetricsListener implements LlapMetricsListener { + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLlapMetricsListener.class); + private Configuration conf; + private LlapRegistryService registry; + private LlapManagementProtocolClientImplFactory clientFactory; + private static final long MIN_DATA_POINT = 2000; + + @Override + public void init(Configuration conf, LlapRegistryService registry) { + this.conf = conf; + this.registry = registry; + this.clientFactory = LlapManagementProtocolClientImplFactory.basicInstance(conf); + } + + @Override + public void newDaemonMetrics(String workerIdentity, LlapMetricsCollector.LlapMetrics newMetrics) { + // no op + } + + @Override + public void newClusterMetrics(Map newMetrics) { + long sumAverageTime = 0; + long sumEmptyExecutors = 0; + long maxAverageTime = 0; + long maxAverageTimeEmptyExecutors = 0; + long maxAverageTimeMaxExecutors = 0; + long workerNum = 0; + String maxAverageTimeIdentity = null; + for (String workerIdentity : newMetrics.keySet()) { + Map metrics = newMetrics.get(workerIdentity).getMetrics(); + long requestHandled = metrics.get(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name()); + long averageTime = metrics.get(LlapDaemonExecutorInfo.AverageResponseTime.name()); + long emptyExecutor = + metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name()); + long maxExecutors = metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutorsPerInstance.name()); + + LOG.debug("Checking node {} with data: " + + "requestHandled={}, " + + "averageTime={}, " + + "emptyExecutors={}, " + + "maxExecutors={}", + workerIdentity, requestHandled, averageTime, emptyExecutor, maxExecutors); + + if (requestHandled > MIN_DATA_POINT) { + workerNum++; + sumAverageTime += averageTime; + if ( averageTime > maxAverageTime) { + maxAverageTime = averageTime; + maxAverageTimeEmptyExecutors = emptyExecutor; + maxAverageTimeMaxExecutors = maxExecutors; + maxAverageTimeIdentity = workerIdentity; + } + sumEmptyExecutors += emptyExecutor; + } + } + + // If we do not have enough data then return. + if (workerNum < 2) { + return; + } + + LOG.debug("Found slowest node {} with data: " + + "sumAverageTime={}, " + + "sumEmptyExecutors={}, " + + "maxAverageTime={}, " + + "maxAverageTimeEmptyExecutors={}, " + + "maxAverageTimeMaxExecutors={}, " + + "workerNum={}, " + + "maxAverageTimeIdentity={}", + sumAverageTime, sumEmptyExecutors, maxAverageTime, maxAverageTimeEmptyExecutors, + maxAverageTimeMaxExecutors, workerNum, maxAverageTimeIdentity); + // Check if the slowest node is at least 50% slower than the average + long averageTimeWithoutSlowest = (sumAverageTime - maxAverageTime) / (workerNum - 1); + if (averageTimeWithoutSlowest * 1.5 < maxAverageTime) { + // We have a candidate, let's see if we have enough empty executors. + // We aim for at least 2 times more free executor on average than we plan remove + long emptyExecutorsWithoutSlowest = sumEmptyExecutors - maxAverageTimeEmptyExecutors; + if (emptyExecutorsWithoutSlowest > maxAverageTimeMaxExecutors * 2) { + // Seems like a good candidate, let's try to blacklist + try { + LOG.warn("Trying to blacklist node: " + maxAverageTimeIdentity); + setCapacity(maxAverageTimeIdentity, 0, 0); + } catch (Throwable t) { + LOG.warn("Can not blacklist node: " + maxAverageTimeIdentity, t); + } + } + } + } + + protected void setCapacity(String workerIdentity, int newExecutorNum, int newWaitQueueSize) + throws IOException, ServiceException { + LlapServiceInstance serviceInstance = registry.getInstances().getInstance(workerIdentity); + LlapManagementProtocolClientImpl client = clientFactory.create(serviceInstance); + client.setCapacity(null, + SetCapacityRequestProto.newBuilder() + .setExecutorNum(newExecutorNum) + .setQueueSize(newWaitQueueSize) + .build()); + } +} diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java index e24c979baa..48eb2cdbf7 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java @@ -63,7 +63,7 @@ private Configuration mockConf; @Mock - private LlapMetricsCollector.LlapManagementProtocolClientImplFactory mockClientFactory; + private LlapManagementProtocolClientImplFactory mockClientFactory; @Mock private LlapManagementProtocolClientImpl mockClient;