diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
index ec5ac4a..6ffaf94 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -27,6 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.util.Daemon;
@@ -186,14 +187,14 @@ public void run() {
++numGcWarnThresholdExceeded;
LOG.warn(formatMessage(
extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
- incrementMetricsCounter("jvm.pause.warn-threshold", 1);
+ incrementMetricsCounter(MetricsConstant.JVM_PAUSE_WARN, 1);
} else if (extraSleepTime > infoThresholdMs) {
++numGcInfoThresholdExceeded;
LOG.info(formatMessage(
extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
- incrementMetricsCounter("jvm.pause.info-threshold", 1);
+ incrementMetricsCounter(MetricsConstant.JVM_PAUSE_INFO, 1);
}
- incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime);
+ incrementMetricsCounter(MetricsConstant.JVM_EXTRA_SLEEP, extraSleepTime);
totalGcExtraSleepTime += extraSleepTime;
gcTimesBeforeSleep = gcTimesAfterSleep;
}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
index e811339..2722ea9 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.common.metrics;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import java.io.IOException;
@@ -162,11 +163,11 @@ public LegacyMetrics(HiveConf conf) throws Exception {
mbs.registerMBean(metrics, oname);
}
- public Long incrementCounter(String name) throws IOException{
+ public Long incrementCounter(String name) throws IOException {
return incrementCounter(name,Long.valueOf(1));
}
- public Long incrementCounter(String name, long increment) throws IOException{
+ public Long incrementCounter(String name, long increment) throws IOException {
Long value;
synchronized(metrics) {
if (!metrics.hasKey(name)) {
@@ -180,6 +181,29 @@ public Long incrementCounter(String name, long increment) throws IOException{
return value;
}
+ public Long decrementCounter(String name) throws IOException{
+ return decrementCounter(name, Long.valueOf(1));
+ }
+
+ public Long decrementCounter(String name, long increment) throws IOException {
+ Long value;
+ synchronized(metrics) {
+ if (!metrics.hasKey(name)) {
+ value = Long.valueOf(increment);
+ set(name, -value);
+ } else {
+ value = ((Long)get(name)) - increment;
+ set(name, value);
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public void addGauge(String name, MetricsVariable variable) {
+ //Not implemented.
+ }
+
public void set(String name, Object value) throws IOException{
metrics.put(name,value);
}
@@ -210,6 +234,8 @@ public void endScope(String name) throws IOException{
}
}
+
+
/**
* Resets the static context state to initial.
* Used primarily for testing purposes.
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
index 27b69cc..757ee35 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
@@ -61,4 +61,31 @@
* @throws IOException
*/
public Long incrementCounter(String name, long increment) throws IOException;
+
+
+ /**
+ * Decrements a counter of the given name by 1.
+ * @param name
+ * @return
+ * @throws IOException
+ */
+ public Long decrementCounter(String name) throws IOException;
+
+ /**
+ * Decrements a counter of the given name by "increment"
+ * @param name
+ * @param increment
+ * @return
+ * @throws IOException
+ */
+ public Long decrementCounter(String name, long increment) throws IOException;
+
+
+ /**
+ * Adds a metrics-gauge to track variable. For example, number of open database connections.
+ * @param name name of gauge
+ * @param variable variable to track.
+ * @throws IOException
+ */
+ public void addGauge(String name, final MetricsVariable variable);
}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
new file mode 100644
index 0000000..d1ebe12
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+/**
+ * This class defines some metrics generated by Hive processes.
+ */
+public class MetricsConstant {
+
+ public static String JVM_PAUSE_INFO = "jvm.pause.info-threshold";
+ public static String JVM_PAUSE_WARN = "jvm.pause.warn-threshold";
+ public static String JVM_EXTRA_SLEEP = "jvm.pause.extraSleepTime";
+
+ public static String OPEN_CONNECTIONS = "open_connections";
+
+ public static String JDO_ACTIVE_TRANSACTIONS = "active_jdo_transactions";
+ public static String JDO_ROLLBACK_TRANSACTIONS = "rollbacked_jdo_transactions";
+ public static String JDO_COMMIT_TRANSACTIONS = "committed_jdo_transactions";
+ public static String JDO_OPEN_TRANSACTIONS = "opened_jdo_transactions";
+}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java
new file mode 100644
index 0000000..8cf6608
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+/**
+ * Interface for metrics variables.
For example a the database service could expose the number of
+ * currently active connections.
+ */
+public interface MetricsVariable {
+ public T getValue();
+}
\ No newline at end of file
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
index ae353d0..abc3ded 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
@@ -21,6 +21,7 @@
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Counter;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Gauge;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
@@ -44,6 +45,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import java.io.BufferedReader;
@@ -52,12 +54,14 @@
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.lang.management.ManagementFactory;
+import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -73,9 +77,11 @@
public final MetricRegistry metricRegistry = new MetricRegistry();
private final Lock timersLock = new ReentrantLock();
private final Lock countersLock = new ReentrantLock();
+ private final Lock gaugesLock = new ReentrantLock();
private LoadingCache timers;
private LoadingCache counters;
+ private ConcurrentHashMap gauges;
private HiveConf conf;
private final Set reporters = new HashSet();
@@ -161,6 +167,7 @@ public Counter load(String key) throws Exception {
}
}
);
+ gauges = new ConcurrentHashMap();
//register JVM metrics
registerAll("gc", new GarbageCollectorMetricSet());
@@ -218,7 +225,7 @@ public void endScope(String name) throws IOException {
}
public Long incrementCounter(String name) throws IOException {
- return incrementCounter(name, 1);
+ return incrementCounter(name, 1L);
}
public Long incrementCounter(String name, long increment) throws IOException {
@@ -234,6 +241,45 @@ public Long incrementCounter(String name, long increment) throws IOException {
}
}
+ public Long decrementCounter(String name) throws IOException {
+ return decrementCounter(name, 1L);
+ }
+
+ public Long decrementCounter(String name, long increment) throws IOException {
+ String key = name;
+ try {
+ countersLock.lock();
+ counters.get(key).dec(increment);
+ return counters.get(key).getCount();
+ } catch(ExecutionException ee) {
+ throw new RuntimeException(ee);
+ } finally {
+ countersLock.unlock();
+ }
+ }
+
+ public void addGauge(String name, final MetricsVariable variable) {
+ Gauge gauge = new Gauge() {
+ @Override
+ public Object getValue() {
+ return variable.getValue();
+ }
+ };
+ try {
+ gaugesLock.lock();
+ gauges.put(name, gauge);
+ // Metrics throws an Exception if we don't do this when the key already exists
+ if (metricRegistry.getGauges().containsKey(name)) {
+ LOGGER.warn("A Gauge with name [" + name + "] already exists. "
+ + " The old gauge will be overwritten, but this is not recommended");
+ metricRegistry.remove(name);
+ }
+ metricRegistry.register(name, gauge);
+ } finally {
+ gaugesLock.unlock();
+ }
+ }
+
// This method is necessary to synchronize lazy-creation to the timers.
private Timer getTimer(String name) throws IOException {
String key = name;
@@ -312,11 +358,19 @@ public void run() {
try {
String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
Path tmpPath = new Path(pathString + ".tmp");
- FileSystem fs = FileSystem.get(conf);
+ URI tmpPathURI = tmpPath.toUri();
+ FileSystem fs = null;
+ if (tmpPathURI.getScheme() == null && tmpPathURI.getAuthority() == null) {
+ //default local
+ fs = FileSystem.getLocal(conf);
+ } else {
+ fs = FileSystem.get(tmpPathURI, conf);
+ }
fs.delete(tmpPath, true);
bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true)));
bw.write(json);
bw.close();
+ fs.setPermission(tmpPath, FsPermission.createImmutable((short) 0644));
Path path = new Path(pathString);
fs.rename(tmpPath, path);
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6d0cf15..4549105 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1725,8 +1725,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
"Hive metrics subsystem implementation class."),
HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX",
"Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated list of JMX, CONSOLE, JSON_FILE"),
- HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/report.json",
- "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of JSON metrics file. " +
+ HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "/tmp/report.json",
+ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of local JSON metrics file. " +
"This file will get overwritten at every interval."),
HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", "5s",
new TimeValidator(TimeUnit.MILLISECONDS),
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
index 954b388..a3aa549 100644
--- a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
@@ -22,7 +22,9 @@
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.junit.After;
@@ -135,4 +137,44 @@ public void testFileReporting() throws Exception {
JsonNode countNode = methodCounterNode.path("count");
Assert.assertEquals(countNode.asInt(), 5);
}
+
+ class TestMetricsVariable implements MetricsVariable {
+ private int gaugeVal;
+
+ @Override
+ public Object getValue() {
+ return gaugeVal;
+ }
+ public void setValue(int gaugeVal) {
+ this.gaugeVal = gaugeVal;
+ }
+ };
+
+ @Test
+ public void testGauge() throws Exception {
+ TestMetricsVariable testVar = new TestMetricsVariable();
+ testVar.setValue(20);
+
+ MetricsFactory.getInstance().addGauge("gauge1", testVar);
+ Thread.sleep(2000);
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode gaugesNode = rootNode.path("gauges");
+ JsonNode methodGaugeNode = gaugesNode.path("gauge1");
+ JsonNode countNode = methodGaugeNode.path("value");
+ Assert.assertEquals(countNode.asInt(), testVar.getValue());
+
+ testVar.setValue(40);
+ Thread.sleep(2000);
+
+ jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+
+ rootNode = objectMapper.readTree(jsonData);
+ gaugesNode = rootNode.path("gauges");
+ methodGaugeNode = gaugesNode.path("gauge1");
+ countNode = methodGaugeNode.path("value");
+ Assert.assertEquals(countNode.asInt(), testVar.getValue());
+ }
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
index 25f34d1..c9da95a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
@@ -24,8 +24,10 @@
import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.service.server.HiveServer2;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -37,9 +39,11 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.Map;
/**
* Tests Hive Metastore Metrics.
+ *
*/
public class TestMetaStoreMetrics {
@@ -49,9 +53,8 @@
private static HiveConf hiveConf;
private static Driver driver;
-
- @Before
- public void before() throws Exception {
+ @BeforeClass
+ public static void before() throws Exception {
int port = MetaStoreUtils.findFreePort();
@@ -86,9 +89,58 @@ public void testMetricsFile() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode rootNode = objectMapper.readTree(jsonData);
- JsonNode countersNode = rootNode.path("timers");
- JsonNode methodCounterNode = countersNode.path("api_get_all_databases");
- JsonNode countNode = methodCounterNode.path("count");
- Assert.assertTrue(countNode.asInt() > 0);
+ JsonNode timersNode = rootNode.path("timers");
+ JsonNode methodCounterNode = timersNode.path("api_get_all_databases");
+ JsonNode methodCountNode = methodCounterNode.path("count");
+ Assert.assertTrue(methodCountNode.asInt() > 0);
+
+ JsonNode countersNode = rootNode.path("counters");
+ JsonNode committedJdoTxNode = countersNode.path("committed_jdo_transactions");
+ JsonNode committedCountNode = committedJdoTxNode.path("count");
+ Assert.assertTrue(committedCountNode.asInt() > 0);
+ }
+
+
+ @Test
+ public void testConnections() throws Exception {
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode countersNode = rootNode.path("counters");
+ JsonNode openCnxNode = countersNode.path("open_connections");
+ JsonNode openCnxCountNode = openCnxNode.path("count");
+ Assert.assertTrue(openCnxCountNode.asInt() == 1);
+
+ //create a second connection
+ HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
+ HiveMetaStoreClient msc2 = new HiveMetaStoreClient(hiveConf);
+ Thread.sleep(2000);
+
+ jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ rootNode = objectMapper.readTree(jsonData);
+ countersNode = rootNode.path("counters");
+ openCnxNode = countersNode.path("open_connections");
+ openCnxCountNode = openCnxNode.path("count");
+ Assert.assertTrue(openCnxCountNode.asInt() == 3);
+
+ msc.close();
+ Thread.sleep(2000);
+
+ jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ rootNode = objectMapper.readTree(jsonData);
+ countersNode = rootNode.path("counters");
+ openCnxNode = countersNode.path("open_connections");
+ openCnxCountNode = openCnxNode.path("count");
+ Assert.assertTrue(openCnxCountNode.asInt() == 2);
+
+ msc2.close();
+ Thread.sleep(2000);
+
+ jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ rootNode = objectMapper.readTree(jsonData);
+ countersNode = rootNode.path("counters");
+ openCnxNode = countersNode.path("open_connections");
+ openCnxCountNode = openCnxNode.path("count");
+ Assert.assertTrue(openCnxCountNode.asInt() == 1);
}
}
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 0bcd053..5aafbaf 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -39,6 +39,8 @@
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.common.cli.CommonCliOptions;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -188,8 +190,11 @@
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TServerSocket;
@@ -821,14 +826,6 @@ public void shutdown() {
threadLocalMS.remove();
}
}
- if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
- try {
- MetricsFactory.close();
- } catch (Exception e) {
- LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
- + e.getMessage(), e);
- }
- }
logInfo("Metastore shutdown complete.");
}
@@ -5878,7 +5875,7 @@ public int getPort() {
*/
public static void main(String[] args) throws Throwable {
HiveConf.setLoadMetastoreConfig(true);
- HiveConf conf = new HiveConf(HMSHandler.class);
+ final HiveConf conf = new HiveConf(HMSHandler.class);
HiveMetastoreCli cli = new HiveMetastoreCli(conf);
cli.parse(args);
@@ -5921,6 +5918,14 @@ public void run() {
if (isCliVerbose) {
System.err.println(shutdownMsg);
}
+ if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.close();
+ } catch (Exception e) {
+ LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ + e.getMessage(), e);
+ }
+ }
}
});
@@ -6057,6 +6062,42 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
.maxWorkerThreads(maxWorkerThreads);
TServer tServer = new TThreadPoolServer(args);
+ TServerEventHandler tServerEventHandler = new TServerEventHandler() {
+ @Override
+ public void preServe() {
+ }
+
+ @Override
+ public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
+ try {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ MetricsFactory.getInstance().incrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error Reporting Metastore open connection to Metrics system", e);
+ }
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
+ try {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ MetricsFactory.getInstance().decrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error Reporting Metastore close connection to Metrics system", e);
+ }
+ }
+
+ @Override
+ public void processContext(ServerContext serverContext, TTransport tTransport, TTransport tTransport1) {
+ }
+ };
+
+ tServer.setServerEventHandler(tServerEventHandler);
HMSHandler.LOG.info("Started the new metaserver on port [" + port
+ "]...");
HMSHandler.LOG.info("Options.minWorkerThreads = "
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 4273c0b..c74544f 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -62,6 +62,10 @@
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.AggrStats;
@@ -257,6 +261,17 @@ public void setConf(Configuration conf) {
initialize(propsFromConf);
+ //Add metric for number of active JDO transactions.
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ metrics.addGauge(MetricsConstant.JDO_ACTIVE_TRANSACTIONS, new MetricsVariable() {
+ @Override
+ public Object getValue() {
+ return openTrasactionCalls;
+ }
+ });
+ }
+
String partitionValidationRegex =
hiveConf.get(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.name());
if (partitionValidationRegex != null && partitionValidationRegex.equals("")) {
@@ -430,6 +445,7 @@ public boolean openTransaction() {
boolean result = currentTransaction.isActive();
debugLog("Open transaction: count = " + openTrasactionCalls + ", isActive = " + result);
+ incrementMetricsCount(MetricsConstant.JDO_OPEN_TRANSACTIONS);
return result;
}
@@ -468,6 +484,7 @@ public boolean commitTransaction() {
currentTransaction.commit();
}
+ incrementMetricsCount(MetricsConstant.JDO_COMMIT_TRANSACTIONS);
return true;
}
@@ -505,6 +522,7 @@ public void rollbackTransaction() {
// from reattaching in future transactions
pm.evictAll();
}
+ incrementMetricsCount(MetricsConstant.JDO_ROLLBACK_TRANSACTIONS);
}
@Override
@@ -6807,6 +6825,16 @@ public boolean doesPartitionExist(String dbName, String tableName, List
}
}
+ private void incrementMetricsCount(String name) {
+ try {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ metrics.incrementCounter(name);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JDO operation to Metrics system", e);
+ }
+ }
private void debugLog(String message) {
if (LOG.isDebugEnabled()) {
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index dfb7faa..67bc778 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -30,6 +30,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hive.service.AbstractService;
@@ -108,13 +111,29 @@ public ThriftCLIService(CLIService service, String serviceName) {
@Override
public ServerContext createContext(
TProtocol input, TProtocol output) {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ try {
+ metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JDO operation to Metrics system", e);
+ }
+ }
return new ThriftCLIServerContext();
}
@Override
public void deleteContext(ServerContext serverContext,
TProtocol input, TProtocol output) {
- ThriftCLIServerContext context = (ThriftCLIServerContext)serverContext;
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ try {
+ metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JDO operation to Metrics system", e);
+ }
+ }
+ ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext;
SessionHandle sessionHandle = context.getSessionHandle();
if (sessionHandle != null) {
LOG.info("Session disconnected without closing properly, close it now");