From 430465b1d919b4ecb12443fff5727965fd0fb618 Mon Sep 17 00:00:00 2001 From: thiruvel Date: Wed, 30 Oct 2013 16:56:57 -0700 Subject: [PATCH] Enable metrics for HiveServer2 --- common/ivy.xml | 2 + .../metrics/HiveThriftMetricsEventHandler.java | 86 ++++++++++++++++++++++ .../apache/hadoop/hive/common/metrics/Metrics.java | 22 +++++- .../thrift/HiveThriftChainedEventHandler.java | 74 +++++++++++++++++++ .../java/org/apache/hadoop/hive/conf/HiveConf.java | 1 + .../hadoop/hive/metastore/HiveMetaStore.java | 2 +- .../hive/service/auth/KerberosSaslHelper.java | 13 +++- .../apache/hive/service/auth/PlainSaslHelper.java | 8 +- .../service/cli/thrift/ThriftBinaryCLIService.java | 11 ++- .../cli/thrift/ThriftCLIMetricsProcessor.java | 74 +++++++++++++++++++ .../hive/service/cli/thrift/ThriftCLIService.java | 19 +++++ 11 files changed, 306 insertions(+), 6 deletions(-) create mode 100644 common/src/java/org/apache/hadoop/hive/common/metrics/HiveThriftMetricsEventHandler.java create mode 100644 common/src/java/org/apache/hadoop/hive/common/thrift/HiveThriftChainedEventHandler.java create mode 100644 service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIMetricsProcessor.java diff --git a/common/ivy.xml b/common/ivy.xml index 95cee84..13d2f1c 100644 --- a/common/ivy.xml +++ b/common/ivy.xml @@ -44,5 +44,7 @@ + + diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/HiveThriftMetricsEventHandler.java b/common/src/java/org/apache/hadoop/hive/common/metrics/HiveThriftMetricsEventHandler.java new file mode 100644 index 0000000..5004c05 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/HiveThriftMetricsEventHandler.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.metrics; + +import java.io.IOException; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TTransport; + +/** + * Event handler for thrift to maintain metrics about connections/requests etc. + */ +public class HiveThriftMetricsEventHandler implements TServerEventHandler { + private static final Log LOG = LogFactory.getLog(HiveThriftMetricsEventHandler.class); + + public static final String METRICS_TOTAL_CONNECTIONS = "TotalConnections"; + public static final String METRICS_ACTIVE_CONNECTIONS = "ActiveConnections"; + public static final String METRICS_TOTAL_REQUESTS = "TotalRequests"; + protected final String serviceName; + + public HiveThriftMetricsEventHandler(final String serviceName) throws IOException { + this.serviceName = serviceName; + + Metrics.set(getMetricsName(METRICS_ACTIVE_CONNECTIONS), 0L); + Metrics.set(getMetricsName(METRICS_TOTAL_CONNECTIONS), 0L); + Metrics.set(getMetricsName(METRICS_TOTAL_REQUESTS), 0L); + } + + @Override + public void preServe() { + } + + @Override + public ServerContext createContext(TProtocol input, TProtocol output) { + try { + Metrics.incrementCounter(getMetricsName(METRICS_ACTIVE_CONNECTIONS)); + Metrics.incrementCounter(getMetricsName(METRICS_TOTAL_CONNECTIONS)); + } catch (IOException e) { + LOG.error("Couldn't increment counter for active connections", e); + } + return null; + } + + @Override + public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { + try { + Metrics.incrementCounter(getMetricsName(METRICS_ACTIVE_CONNECTIONS), -1L); + } catch (IOException e) { + LOG.error("Couldn't increment counter for active connections", e); + } + } + + @Override + public void processContext(ServerContext serverContext, TTransport input, TTransport output) { + try { + Metrics.incrementCounter(getMetricsName(METRICS_TOTAL_REQUESTS)); + } catch (IOException e) { + LOG.error("Couldn't increment counter for active connections", e); + } + } + + private String getMetricsName(final String metricsKey) { + return serviceName + "." + metricsKey; + } +} \ No newline at end of file diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java index 01c9d1d..d52ff00 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java @@ -70,7 +70,18 @@ public class Metrics { this.numCounter = name + ".n"; this.timeCounter = name + ".t"; this.avgTimeCounter = name + ".avg_t"; - open(); + } + + /** + * Resets all metrics associated for a scope to 0, could be used to initialize scope + * @throws IOException + */ + public void resetMetrics() throws IOException { + synchronized (metrics) { + Metrics.set(numCounter, new Long(0)); + Metrics.set(timeCounter, new Long(0)); + Metrics.set(avgTimeCounter, 0.0); + } } public Long getNumCounter() throws IOException { @@ -200,6 +211,14 @@ public class Metrics { return metrics.get(name); } + /** + * Initialize all metrics associated for a scope to 0 + * @param name + */ + public static void initializeScope(String name) throws IOException { + new MetricsScope(name).resetMetrics(); + } + public static MetricsScope startScope(String name) throws IOException{ if (!initialized) { return null; @@ -208,6 +227,7 @@ public class Metrics { threadLocalScopes.get().get(name).open(); } else { threadLocalScopes.get().put(name, new MetricsScope(name)); + threadLocalScopes.get().get(name).open(); } return threadLocalScopes.get().get(name); } diff --git a/common/src/java/org/apache/hadoop/hive/common/thrift/HiveThriftChainedEventHandler.java b/common/src/java/org/apache/hadoop/hive/common/thrift/HiveThriftChainedEventHandler.java new file mode 100644 index 0000000..1e44d5c --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/thrift/HiveThriftChainedEventHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.thrift; + +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TTransport; + +import java.util.ArrayList; +import java.util.List; + +public class HiveThriftChainedEventHandler implements TServerEventHandler { + + protected List eventHandlers = new ArrayList(); + + public HiveThriftChainedEventHandler() { + } + + public void addEventHandler(TServerEventHandler eventHandler) { + eventHandlers.add(eventHandler); + } + + @Override + public void preServe() { + for (TServerEventHandler eventHandler : eventHandlers) { + eventHandler.preServe(); + } + } + + @Override + public ServerContext createContext(TProtocol input, TProtocol output) { + ServerContextList serverContexts = new ServerContextList(); + for (TServerEventHandler eventHandler : eventHandlers) { + serverContexts.add(eventHandler.createContext(input, output)); + } + return serverContexts; + } + + @Override + public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { + ServerContextList serverContexts = (ServerContextList) serverContext; + for (int i = 0; i < eventHandlers.size(); i++) { + eventHandlers.get(i).deleteContext(serverContexts.get(i), input, output); + } + } + + @Override + public void processContext(ServerContext serverContext, TTransport input, TTransport output) { + ServerContextList serverContexts = (ServerContextList) serverContext; + for (int i = 0; i < eventHandlers.size(); i++) { + eventHandlers.get(i).processContext(serverContexts.get(i), input, output); + } + } + + public static class ServerContextList extends ArrayList implements ServerContext { + } +} diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7dcce04..16eb68a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -226,6 +226,7 @@ public class HiveConf extends Configuration { // if true, DROP TABLE/VIEW does not fail if table/view doesn't exist and IF EXISTS is // not specified DROPIGNORESNONEXISTENT("hive.exec.drop.ignorenonexistent", true), + METASTORE_METRICS_ENABLED("hive.metastore.metrics.enabled", false), // ignore the mapjoin hint HIVEIGNOREMAPJOINHINT("hive.ignore.mapjoin.hint", true), diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index e7104db..c39400c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -325,7 +325,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { createDefaultDB(); - if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) { + if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS_ENABLED)) { try { Metrics.init(); } catch (Exception e) { diff --git a/service/src/java/org/apache/hive/service/auth/KerberosSaslHelper.java b/service/src/java/org/apache/hive/service/auth/KerberosSaslHelper.java index 519556c..ea866be 100644 --- a/service/src/java/org/apache/hive/service/auth/KerberosSaslHelper.java +++ b/service/src/java/org/apache/hive/service/auth/KerberosSaslHelper.java @@ -22,10 +22,12 @@ import java.util.Map; import javax.security.sasl.SaslException; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server; import org.apache.hive.service.cli.thrift.TCLIService; +import org.apache.hive.service.cli.thrift.ThriftCLIMetricsProcessor; import org.apache.hive.service.cli.thrift.TCLIService.Iface; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.thrift.TProcessor; @@ -46,8 +48,15 @@ public class KerberosSaslHelper { @Override public TProcessor getProcessor(TTransport trans) { - TProcessor sqlProcessor = new TCLIService.Processor(service); - return saslServer.wrapNonAssumingProcessor(sqlProcessor); + TProcessor sqlProcessor; + if (service.getHiveConf().getBoolVar(HiveConf.ConfVars.METASTORE_METRICS_ENABLED)) { + sqlProcessor = new ThriftCLIMetricsProcessor(service); + } else { + sqlProcessor = new TCLIService.Processor(service); + } + return service.getHiveConf().getBoolean("hive.server2.enable.doAs", false) ? + saslServer.wrapProcessor(sqlProcessor) : saslServer.wrapNonAssumingProcessor(sqlProcessor); + } } diff --git a/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java b/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java index 15b1675..4d011df 100644 --- a/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java +++ b/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.auth.PlainSaslServer.ExternalAuthenticationCallback; import org.apache.hive.service.auth.PlainSaslServer.SaslPlainProvider; import org.apache.hive.service.cli.thrift.TCLIService; +import org.apache.hive.service.cli.thrift.ThriftCLIMetricsProcessor; import org.apache.hive.service.cli.thrift.TCLIService.Iface; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.thrift.TProcessor; @@ -108,7 +109,12 @@ public class PlainSaslHelper { @Override public TProcessor getProcessor(TTransport trans) { - TProcessor baseProcessor = new TCLIService.Processor(service); + TProcessor baseProcessor; + if (service.getHiveConf().getBoolVar(HiveConf.ConfVars.METASTORE_METRICS_ENABLED)) { + baseProcessor = new ThriftCLIMetricsProcessor(service); + } else { + baseProcessor = new TCLIService.Processor(service); + } return doAsEnabled ? new TUGIContainingProcessor(baseProcessor, conf) : new TSetIpAddressProcessor(service); } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 9c8f5c1..17f586a 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -20,11 +20,14 @@ package org.apache.hive.service.cli.thrift; import java.net.InetSocketAddress; +import org.apache.hadoop.hive.common.metrics.HiveThriftMetricsEventHandler; +import org.apache.hadoop.hive.common.thrift.HiveThriftChainedEventHandler; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportFactory; @@ -72,6 +75,12 @@ public class ThriftBinaryCLIService extends ThriftCLIService { .maxWorkerThreads(maxWorkerThreads); server = new TThreadPoolServer(sargs); + HiveThriftChainedEventHandler eventHandlers = new HiveThriftChainedEventHandler(); + if(hiveConf.getBoolVar(ConfVars.METASTORE_METRICS_ENABLED)) { + TServerEventHandler eventHandler = new HiveThriftMetricsEventHandler(HS_SERVICE_METRICS_PREFIX); + eventHandlers.addEventHandler(eventHandler); + } + server.setServerEventHandler(eventHandlers); LOG.info("ThriftBinaryCLIService listening on " + serverAddress); @@ -82,4 +91,4 @@ public class ThriftBinaryCLIService extends ThriftCLIService { } } -} \ No newline at end of file +} diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIMetricsProcessor.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIMetricsProcessor.java new file mode 100644 index 0000000..61b8092 --- /dev/null +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIMetricsProcessor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.thrift; + +import org.apache.hadoop.hive.common.metrics.Metrics; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +public class ThriftCLIMetricsProcessor extends TCLIService.Processor { + private static final Logger LOG = LoggerFactory.getLogger(ThriftCLIMetricsProcessor.class); + public static String METRICS_ACTIVE_REQUESTS = "HS.ActiveRequests"; + + public ThriftCLIMetricsProcessor(I iface) { + super(iface); + } + + public static void initMetrics() { + try { + Metrics.set(METRICS_ACTIVE_REQUESTS, 0L); + } catch (IOException e) { + LOG.error("Couldn't initialize " + METRICS_ACTIVE_REQUESTS + " metrics", e); + } + } + + protected ThriftCLIMetricsProcessor(I iface, Map> processMap) { + super(iface, processMap); + } + + @Override + public boolean process(TProtocol in, TProtocol out) throws TException { + boolean metricsIncremented = false; + try { + try { + if(Metrics.incrementCounter(METRICS_ACTIVE_REQUESTS) != null) { + metricsIncremented = true; + } else{ + LOG.error("Couldn't increment counter for " + METRICS_ACTIVE_REQUESTS); + } + } catch (IOException e) { + LOG.error("Error incrementing counter " + METRICS_ACTIVE_REQUESTS, e); + } + return super.process(in, out); + } finally { + if(metricsIncremented) { + try { + Metrics.incrementCounter(METRICS_ACTIVE_REQUESTS, -1L); + } catch (IOException e) { + LOG.error("Error decrementing counter " + METRICS_ACTIVE_REQUESTS, e); + } + } + } + } +} diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 9b78fd1..8d7e34d 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -97,6 +97,8 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe FetchSplit } + public static final String HS_SERVICE_METRICS_PREFIX = "HS"; + public ThriftCLIService(CLIService cliService, String serviceName) { super(serviceName); this.cliService = cliService; @@ -106,6 +108,23 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; super.init(hiveConf); + if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS_ENABLED)) { + try { + Metrics.init(); + // initialize all metrics to 0, otherwise they become available lazily + // and a system probing for it may fail if metrics are absent initially + for (ThriftCliFunctions function : ThriftCliFunctions.values()) { + Metrics.initializeScope(function.name()); + } + ThriftCLIMetricsProcessor.initMetrics(); + } catch (Exception e) { + // log exception, but ignore inability to start + LOG.error("error in Metrics init: " + e.getClass().getName() + " " + + e.getMessage()); + MetaStoreUtils.printStackTrace(e); + + } + } } @Override -- 1.8.5-rc3