diff --git a/common/pom.xml b/common/pom.xml
index a615c1e..8d4b1ea 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -98,6 +98,26 @@
json
${json.version}
+
+ io.dropwizard.metrics
+ metrics-core
+ ${dropwizard.version}
+
+
+ io.dropwizard.metrics
+ metrics-jvm
+ ${dropwizard.version}
+
+
+ io.dropwizard.metrics
+ metrics-json
+ ${dropwizard.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.new.version}
+
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
new file mode 100644
index 0000000..ad2f66b
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.util.Daemon;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Based on the JvmPauseMonitor from Hadoop.
+ */
+public class JvmPauseMonitor {
+ private static final Log LOG = LogFactory.getLog(
+ JvmPauseMonitor.class);
+
+ /** The target sleep time */
+ private static final long SLEEP_INTERVAL_MS = 500;
+
+ /** log WARN if we detect a pause longer than this threshold */
+ private final long warnThresholdMs;
+ private static final String WARN_THRESHOLD_KEY =
+ "jvm.pause.warn-threshold.ms";
+ private static final long WARN_THRESHOLD_DEFAULT = 10000;
+
+ /** log INFO if we detect a pause longer than this threshold */
+ private final long infoThresholdMs;
+ private static final String INFO_THRESHOLD_KEY =
+ "jvm.pause.info-threshold.ms";
+ private static final long INFO_THRESHOLD_DEFAULT = 1000;
+
+ private long numGcWarnThresholdExceeded = 0;
+ private long numGcInfoThresholdExceeded = 0;
+ private long totalGcExtraSleepTime = 0;
+
+ private Thread monitorThread;
+ private volatile boolean shouldRun = true;
+
+ public JvmPauseMonitor(Configuration conf) {
+ this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
+ this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT);
+ }
+
+ public void start() {
+ Preconditions.checkState(monitorThread == null,
+ "Already started");
+ monitorThread = new Daemon(new Monitor());
+ monitorThread.start();
+ }
+
+ public void stop() {
+ shouldRun = false;
+ monitorThread.interrupt();
+ try {
+ monitorThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public boolean isStarted() {
+ return monitorThread != null;
+ }
+
+ public long getNumGcWarnThreadholdExceeded() {
+ return numGcWarnThresholdExceeded;
+ }
+
+ public long getNumGcInfoThresholdExceeded() {
+ return numGcInfoThresholdExceeded;
+ }
+
+ public long getTotalGcExtraSleepTime() {
+ return totalGcExtraSleepTime;
+ }
+
+ private String formatMessage(long extraSleepTime,
+ Map gcTimesAfterSleep,
+ Map gcTimesBeforeSleep) {
+
+ Set gcBeanNames = Sets.intersection(
+ gcTimesAfterSleep.keySet(),
+ gcTimesBeforeSleep.keySet());
+ List gcDiffs = Lists.newArrayList();
+ for (String name : gcBeanNames) {
+ GcTimes diff = gcTimesAfterSleep.get(name).subtract(
+ gcTimesBeforeSleep.get(name));
+ if (diff.gcCount != 0) {
+ gcDiffs.add("GC pool '" + name + "' had collection(s): " +
+ diff.toString());
+ }
+ }
+
+ String ret = "Detected pause in JVM or host machine (eg GC): " +
+ "pause of approximately " + extraSleepTime + "ms\n";
+ if (gcDiffs.isEmpty()) {
+ ret += "No GCs detected";
+ } else {
+ ret += Joiner.on("\n").join(gcDiffs);
+ }
+ return ret;
+ }
+
+ private Map getGcTimes() {
+ Map map = Maps.newHashMap();
+ List gcBeans =
+ ManagementFactory.getGarbageCollectorMXBeans();
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ map.put(gcBean.getName(), new GcTimes(gcBean));
+ }
+ return map;
+ }
+
+ private static class GcTimes {
+ private GcTimes(GarbageCollectorMXBean gcBean) {
+ gcCount = gcBean.getCollectionCount();
+ gcTimeMillis = gcBean.getCollectionTime();
+ }
+
+ private GcTimes(long count, long time) {
+ this.gcCount = count;
+ this.gcTimeMillis = time;
+ }
+
+ private GcTimes subtract(GcTimes other) {
+ return new GcTimes(this.gcCount - other.gcCount,
+ this.gcTimeMillis - other.gcTimeMillis);
+ }
+
+ @Override
+ public String toString() {
+ return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
+ }
+
+ private long gcCount;
+ private long gcTimeMillis;
+ }
+
+ private class Monitor implements Runnable {
+ @Override
+ public void run() {
+ Stopwatch sw = new Stopwatch();
+ Map gcTimesBeforeSleep = getGcTimes();
+ while (shouldRun) {
+ sw.reset().start();
+ try {
+ Thread.sleep(SLEEP_INTERVAL_MS);
+ } catch (InterruptedException ie) {
+ return;
+ }
+ long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
+ Map gcTimesAfterSleep = getGcTimes();
+
+ if (extraSleepTime > warnThresholdMs) {
+ ++numGcWarnThresholdExceeded;
+ LOG.warn(formatMessage(
+ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+ incrementMetricsCounter("jvm.pause.warn-threshold", 1);
+ } else if (extraSleepTime > infoThresholdMs) {
+ ++numGcInfoThresholdExceeded;
+ LOG.info(formatMessage(
+ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+ incrementMetricsCounter("jvm.pause.info-threshold", 1);
+ }
+ incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime);
+ totalGcExtraSleepTime += extraSleepTime;
+ gcTimesBeforeSleep = gcTimesAfterSleep;
+ }
+ }
+
+ private void incrementMetricsCounter(String name, long count) {
+ try {
+ MetricsFactory.getMetricsInstance().incrementCounter(name, count);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JvmPauseMonitor to Metrics system", e);
+ }
+ }
+ }
+
+ /**
+ * Simple 'main' to facilitate manual testing of the pause monitor.
+ *
+ * This main function just leaks memory into a list. Running this class
+ * with a 1GB heap will very quickly go into "GC hell" and result in
+ * log messages about the GC pauses.
+ */
+ public static void main(String []args) throws Exception {
+ new JvmPauseMonitor(new Configuration()).start();
+ List list = Lists.newArrayList();
+ int i = 0;
+ while (true) {
+ list.add(String.valueOf(i++));
+ }
+ }
+}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
deleted file mode 100644
index 01c9d1d..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.common.metrics;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-/**
- * Metrics Subsystem - allows exposure of a number of named parameters/counters
- * via jmx, intended to be used as a static subsystem
- *
- * Has a couple of primary ways it can be used:
- * (i) Using the set and get methods to set and get named parameters
- * (ii) Using the incrementCounter method to increment and set named
- * parameters in one go, rather than having to make a get and then a set.
- * (iii) Using the startScope and endScope methods to start and end
- * named "scopes" that record the number of times they've been
- * instantiated and amount of time(in milliseconds) spent inside
- * the scopes.
- */
-public class Metrics {
-
- private Metrics() {
- // block
- }
-
- /**
- * MetricsScope : A class that encapsulates an idea of a metered scope.
- * Instantiating a named scope and then closing it exposes two counters:
- * (i) a "number of calls" counter ( <name>.n ), and
- * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t)
- */
- public static class MetricsScope {
-
- final String name;
- final String numCounter;
- final String timeCounter;
- final String avgTimeCounter;
-
- private boolean isOpen = false;
- private Long startTime = null;
-
- /**
- * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
- * @param name - name of the variable
- * @throws IOException
- */
- private MetricsScope(String name) throws IOException {
- this.name = name;
- this.numCounter = name + ".n";
- this.timeCounter = name + ".t";
- this.avgTimeCounter = name + ".avg_t";
- open();
- }
-
- public Long getNumCounter() throws IOException {
- return (Long)Metrics.get(numCounter);
- }
-
- public Long getTimeCounter() throws IOException {
- return (Long)Metrics.get(timeCounter);
- }
-
- /**
- * Opens scope, and makes note of the time started, increments run counter
- * @throws IOException
- *
- */
- public void open() throws IOException {
- if (!isOpen) {
- isOpen = true;
- startTime = System.currentTimeMillis();
- } else {
- throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
- }
- }
-
- /**
- * Closes scope, and records the time taken
- * @throws IOException
- */
- public void close() throws IOException {
- if (isOpen) {
- Long endTime = System.currentTimeMillis();
- synchronized(metrics) {
- Long num = Metrics.incrementCounter(numCounter);
- Long time = Metrics.incrementCounter(timeCounter, endTime - startTime);
- if (num != null && time != null) {
- Metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue()));
- }
- }
- } else {
- throw new IOException("Scope named " + name + " is not open, cannot be closed.");
- }
- isOpen = false;
- }
-
-
- /**
- * Closes scope if open, and reopens it
- * @throws IOException
- */
- public void reopen() throws IOException {
- if(isOpen) {
- close();
- }
- open();
- }
-
- }
-
- private static final MetricsMBean metrics = new MetricsMBeanImpl();
-
- private static final ObjectName oname;
- static {
- try {
- oname = new ObjectName(
- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
- } catch (MalformedObjectNameException mone) {
- throw new RuntimeException(mone);
- }
- }
-
-
- private static final ThreadLocal> threadLocalScopes
- = new ThreadLocal>() {
- @Override
- protected HashMap initialValue() {
- return new HashMap();
- }
- };
-
- private static boolean initialized = false;
-
- public static void init() throws Exception {
- synchronized (metrics) {
- if (!initialized) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(metrics, oname);
- initialized = true;
- }
- }
- }
-
- public static Long incrementCounter(String name) throws IOException{
- if (!initialized) {
- return null;
- }
- return incrementCounter(name,Long.valueOf(1));
- }
-
- public static Long incrementCounter(String name, long increment) throws IOException{
- if (!initialized) {
- return null;
- }
- Long value;
- synchronized(metrics) {
- if (!metrics.hasKey(name)) {
- value = Long.valueOf(increment);
- set(name, value);
- } else {
- value = ((Long)get(name)) + increment;
- set(name, value);
- }
- }
- return value;
- }
-
- public static void set(String name, Object value) throws IOException{
- if (!initialized) {
- return;
- }
- metrics.put(name,value);
- }
-
- public static Object get(String name) throws IOException{
- if (!initialized) {
- return null;
- }
- return metrics.get(name);
- }
-
- public static MetricsScope startScope(String name) throws IOException{
- if (!initialized) {
- return null;
- }
- if (threadLocalScopes.get().containsKey(name)) {
- threadLocalScopes.get().get(name).open();
- } else {
- threadLocalScopes.get().put(name, new MetricsScope(name));
- }
- return threadLocalScopes.get().get(name);
- }
-
- public static MetricsScope getScope(String name) throws IOException {
- if (!initialized) {
- return null;
- }
- if (threadLocalScopes.get().containsKey(name)) {
- return threadLocalScopes.get().get(name);
- } else {
- throw new IOException("No metrics scope named " + name);
- }
- }
-
- public static void endScope(String name) throws IOException{
- if (!initialized) {
- return;
- }
- if (threadLocalScopes.get().containsKey(name)) {
- threadLocalScopes.get().get(name).close();
- }
- }
-
- /**
- * Resets the static context state to initial.
- * Used primarily for testing purposes.
- *
- * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
- */
- static void uninit() throws Exception {
- synchronized (metrics) {
- if (initialized) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- if (mbs.isRegistered(oname)) {
- mbs.unregisterMBean(oname);
- }
- metrics.clear();
- initialized = false;
- }
- }
- }
-}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsLegacy.java b/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsLegacy.java
new file mode 100644
index 0000000..92c326a
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsLegacy.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics;
+
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+/**
+ * This class may eventually get superseded by org.apache.hadoop.hive.common.metrics2.Metrics.
+ *
+ * Metrics Subsystem - allows exposure of a number of named parameters/counters
+ * via jmx, intended to be used as a static subsystem
+ *
+ * Has a couple of primary ways it can be used:
+ * (i) Using the set and get methods to set and get named parameters
+ * (ii) Using the incrementCounter method to increment and set named
+ * parameters in one go, rather than having to make a get and then a set.
+ * (iii) Using the startScope and endScope methods to start and end
+ * named "scopes" that record the number of times they've been
+ * instantiated and amount of time(in milliseconds) spent inside
+ * the scopes.
+ */
+public class MetricsLegacy implements Metrics {
+
+ private MetricsLegacy() {
+ // block
+ }
+
+ /**
+ * MetricsScope : A class that encapsulates an idea of a metered scope.
+ * Instantiating a named scope and then closing it exposes two counters:
+ * (i) a "number of calls" counter ( <name>.n ), and
+ * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t)
+ */
+ public static class MetricsScope {
+
+ final MetricsLegacy metrics;
+
+ final String name;
+ final String numCounter;
+ final String timeCounter;
+ final String avgTimeCounter;
+
+ private boolean isOpen = false;
+ private Long startTime = null;
+
+ /**
+ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
+ * @param name - name of the variable
+ * @throws IOException
+ */
+ private MetricsScope(String name, MetricsLegacy metrics) throws IOException {
+ this.metrics = metrics;
+ this.name = name;
+ this.numCounter = name + ".n";
+ this.timeCounter = name + ".t";
+ this.avgTimeCounter = name + ".avg_t";
+ open();
+ }
+
+ public Long getNumCounter() throws IOException {
+ return (Long) metrics.get(numCounter);
+ }
+
+ public Long getTimeCounter() throws IOException {
+ return (Long) metrics.get(timeCounter);
+ }
+
+ /**
+ * Opens scope, and makes note of the time started, increments run counter
+ * @throws IOException
+ *
+ */
+ public void open() throws IOException {
+ if (!isOpen) {
+ isOpen = true;
+ startTime = System.currentTimeMillis();
+ } else {
+ throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
+ }
+ }
+
+ /**
+ * Closes scope, and records the time taken
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ if (isOpen) {
+ Long endTime = System.currentTimeMillis();
+ synchronized(metrics) {
+ Long num = metrics.incrementCounter(numCounter);
+ Long time = metrics.incrementCounter(timeCounter, endTime - startTime);
+ if (num != null && time != null) {
+ metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue()));
+ }
+ }
+ } else {
+ throw new IOException("Scope named " + name + " is not open, cannot be closed.");
+ }
+ isOpen = false;
+ }
+
+
+ /**
+ * Closes scope if open, and reopens it
+ * @throws IOException
+ */
+ public void reopen() throws IOException {
+ if(isOpen) {
+ close();
+ }
+ open();
+ }
+
+ }
+
+ private static final MetricsMBean metrics = new MetricsMBeanImpl();
+
+ private static final ObjectName oname;
+ static {
+ try {
+ oname = new ObjectName(
+ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+ } catch (MalformedObjectNameException mone) {
+ throw new RuntimeException(mone);
+ }
+ }
+
+
+ private static final ThreadLocal> threadLocalScopes
+ = new ThreadLocal>() {
+ @Override
+ protected HashMap initialValue() {
+ return new HashMap();
+ }
+ };
+
+ private boolean initialized = false;
+
+ public void init(HiveConf conf) throws Exception {
+ if (!initialized) {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(metrics, oname);
+ initialized = true;
+ }
+ }
+
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ public Long incrementCounter(String name) throws IOException{
+ if (!initialized) {
+ return null;
+ }
+ return incrementCounter(name,Long.valueOf(1));
+ }
+
+ public Long incrementCounter(String name, long increment) throws IOException{
+ if (!initialized) {
+ return null;
+ }
+ Long value;
+ synchronized(metrics) {
+ if (!metrics.hasKey(name)) {
+ value = Long.valueOf(increment);
+ set(name, value);
+ } else {
+ value = ((Long)get(name)) + increment;
+ set(name, value);
+ }
+ }
+ return value;
+ }
+
+ public void set(String name, Object value) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ metrics.put(name,value);
+ }
+
+ public Object get(String name) throws IOException{
+ if (!initialized) {
+ return null;
+ }
+ return metrics.get(name);
+ }
+
+ public void startScope(String name) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).open();
+ } else {
+ threadLocalScopes.get().put(name, new MetricsScope(name, this));
+ }
+ }
+
+ public MetricsScope getScope(String name) throws IOException {
+ if (!initialized) {
+ return null;
+ }
+ if (threadLocalScopes.get().containsKey(name)) {
+ return threadLocalScopes.get().get(name);
+ } else {
+ throw new IOException("No metrics scope named " + name);
+ }
+ }
+
+ public void endScope(String name) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).close();
+ }
+ }
+
+ /**
+ * Resets the static context state to initial.
+ * Used primarily for testing purposes.
+ *
+ * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
+ */
+ public void deInit() throws Exception {
+ synchronized (metrics) {
+ if (initialized) {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ if (mbs.isRegistered(oname)) {
+ mbs.unregisterMBean(oname);
+ }
+ metrics.clear();
+ initialized = false;
+ }
+ }
+ }
+}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
new file mode 100644
index 0000000..13a5336
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+
+/**
+ * Generic Metics interface.
+ */
+public interface Metrics {
+
+ /**
+ * Initialize Metrics system with given Hive configuration.
+ * @param conf
+ */
+ public void init(HiveConf conf) throws Exception;
+
+ /**
+ * Deinitializes the Metrics system.
+ */
+ public void deInit() throws Exception;
+
+ /**
+ * @param name
+ * @throws IOException
+ */
+ public void startScope(String name) throws IOException;
+
+ public void endScope(String name) throws IOException;
+
+ //Counter-related methods
+
+ /**
+ * Increments a counter of the given name by 1.
+ * @param name
+ * @return
+ * @throws IOException
+ */
+ public Long incrementCounter(String name) throws IOException;
+
+ /**
+ * Increments a counter of the given name by "increment"
+ * @param name
+ * @param increment
+ * @return
+ * @throws IOException
+ */
+ public Long incrementCounter(String name, long increment) throws IOException;
+}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
new file mode 100644
index 0000000..36cfdca
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Class that manages a static Metric instance for this process.
+ */
+public class MetricsFactory {
+
+ private static Metrics metrics;
+ private static Object initLock = new Object();
+
+ public synchronized static void init(HiveConf conf) throws Exception {
+ if (metrics == null) {
+ metrics = (Metrics) ReflectionUtils.newInstance(conf.getClassByName(
+ conf.getVar(HiveConf.ConfVars.HIVE_METRICS_CLASS)), conf);
+ }
+ metrics.init(conf);
+ }
+
+ public static Metrics getMetricsInstance() {
+ return metrics;
+ }
+
+ public synchronized static void uninit() throws Exception {
+ if (metrics != null) {
+ metrics.deInit();
+ }
+ }
+}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics.java
new file mode 100644
index 0000000..dccefef
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/Metrics.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.json.MetricsModule;
+import com.codahale.metrics.jvm.BufferPoolMetricSet;
+import com.codahale.metrics.jvm.ClassLoadingGaugeSet;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * New Metrics implementation, implemented backed by Codahale.
+ */
+public class Metrics implements org.apache.hadoop.hive.common.metrics.common.Metrics {
+ public static final String API_PREFIX = "api_";
+ public static final Log LOGGER = LogFactory.getLog(Metrics.class);
+
+ public final MetricRegistry metricRegistry = new MetricRegistry();
+ private final Lock timersLock = new ReentrantLock();
+ private final Lock countersLock = new ReentrantLock();
+
+ private LoadingCache timers;
+ private LoadingCache counters;
+
+ private boolean initialized = false;
+ private HiveConf conf;
+
+ private final ThreadLocal> threadLocalScopes
+ = new ThreadLocal>() {
+ @Override
+ protected HashMap initialValue() {
+ return new HashMap();
+ }
+ };
+
+ private transient ObjectMapper jsonMapper;
+
+ public static class MetricsScope {
+
+ final String name;
+ final Timer timer;
+ Timer.Context timerContext;
+ Metrics metrics;
+
+ private boolean isOpen = false;
+
+ /**
+ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
+ * @param name - name of the variable
+ * @throws IOException
+ */
+ private MetricsScope(String name, Metrics metrics) throws IOException {
+ this.name = name;
+ this.metrics = metrics;
+ this.timer = metrics.getTimer(name);
+ open();
+ }
+
+ /**
+ * Opens scope, and makes note of the time started, increments run counter
+ * @throws IOException
+ *
+ */
+ public void open() throws IOException {
+ if (!isOpen) {
+ isOpen = true;
+ this.timerContext = timer.time();
+ } else {
+ throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
+ }
+ }
+
+ /**
+ * Closes scope, and records the time taken
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ if (isOpen) {
+ timerContext.close();
+
+ } else {
+ throw new IOException("Scope named " + name + " is not open, cannot be closed.");
+ }
+ isOpen = false;
+ }
+ }
+
+ public void init(HiveConf conf) throws Exception {
+ synchronized (this) {
+ if (initialized) {
+ return;
+ }
+
+ this.conf = conf;
+ //Codahale artifacts are lazily-created.
+ timers = CacheBuilder.newBuilder().build(
+ new CacheLoader() {
+ @Override
+ public com.codahale.metrics.Timer load(String key) throws Exception {
+ com.codahale.metrics.Timer timer
+ = new com.codahale.metrics.Timer(new ExponentiallyDecayingReservoir());
+ metricRegistry.register(key, timer);
+ return timer;
+ }
+ }
+ );
+ counters = CacheBuilder.newBuilder().build(
+ new CacheLoader() {
+ @Override
+ public Counter load(String key) throws Exception {
+ Counter counter = new Counter();
+ metricRegistry.register(key, counter);
+ return counter;
+ }
+ }
+ );
+
+ //register JVM metrics
+ registerAll("gc", new GarbageCollectorMetricSet());
+ registerAll("buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()));
+ registerAll("memory", new MemoryUsageGaugeSet());
+ registerAll("threads", new ThreadStatesGaugeSet());
+ registerAll("classLoading", new ClassLoadingGaugeSet());
+
+ //Metrics reporter
+ List metricsReporterNames = Lists.newArrayList(
+ Splitter.on(",").trimResults().omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER)));
+ Set finalReporterList = new HashSet();
+ if(metricsReporterNames != null) {
+ for (String metricsReportingName : metricsReporterNames) {
+ try {
+ MetricsReporting reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase());
+ finalReporterList.add(reporter);
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn("Metrics reporter skipped due to invalid configured reporter: " + metricsReportingName);
+ }
+ }
+ }
+ initReporting(finalReporterList);
+ initialized = true;
+ }
+ }
+
+
+ public void deInit() throws Exception {
+ synchronized (this) {
+ if (initialized) {
+ initialized = false;
+ }
+ }
+ }
+
+ /* Should be only called once to initialize the reporters
+ */
+ public void initReporting(Set reportingSet) throws Exception {
+ for (MetricsReporting reporting : reportingSet) {
+ switch(reporting) {
+ case CONSOLE:
+ final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ consoleReporter.start(1, TimeUnit.SECONDS);
+ break;
+ case JMX:
+ final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ jmxReporter.start();
+ break;
+ case JSON_FILE:
+ this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS, false));
+ long time = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS);
+ final String pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION);
+ java.util.Timer timer = new java.util.Timer(true);
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
+ Path path = new Path(pathString);
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(path, true);
+ BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path, true)));
+ br.write(json);
+ br.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error writing JSON Metrics to file", e);
+ }
+ }
+ }, 0, time);
+ }
+ }
+ }
+
+ public void startScope(String name) throws IOException {
+ if (!initialized) {
+ return;
+ }
+ name = API_PREFIX + name;
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).open();
+ } else {
+ threadLocalScopes.get().put(name, new MetricsScope(name, this));
+ }
+ }
+
+ public void endScope(String name) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ name = API_PREFIX + name;
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).close();
+ }
+ }
+
+ public Long incrementCounter(String name) throws IOException {
+ return incrementCounter(name, 1);
+ }
+
+ public Long incrementCounter(String name, long increment) throws IOException {
+ String key = name;
+ try {
+ countersLock.lock();
+ counters.get(key).inc(increment);
+ return counters.get(key).getCount();
+ } catch(ExecutionException ee) {
+ throw new RuntimeException(ee);
+ } finally {
+ countersLock.unlock();
+ }
+ }
+
+ // This method is necessary to synchronize lazy-creation to the timers.
+ private Timer getTimer(String name) throws IOException {
+ String key = name;
+ try {
+ timersLock.lock();
+ Timer timer = timers.get(key);
+ return timer;
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ timersLock.unlock();
+ }
+ }
+
+ private void registerAll(String prefix, MetricSet metricSet) {
+ for (Map.Entry entry : metricSet.getMetrics().entrySet()) {
+ if (entry.getValue() instanceof MetricSet) {
+ registerAll(prefix + "." + entry.getKey(), (MetricSet) entry.getValue());
+ } else {
+ metricRegistry.register(prefix + "." + entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public MetricRegistry getMetricRegistry() {
+ return metricRegistry;
+ }
+}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
new file mode 100644
index 0000000..643246f
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+/**
+ * Reporting options for org.apache.hadoop.hive.common.metrics.metrics2.Metrics.
+ */
+public enum MetricsReporting {
+ JMX,
+ CONSOLE,
+ JSON_FILE
+}
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 49b8f97..cde460a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -645,6 +645,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
"Maximum cache full % after which the cache cleaner thread kicks in."),
METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL("hive.metastore.aggregate.stats.cache.clean.until", (float) 0.8,
"The cleaner thread cleans until cache reaches this % full size."),
+ METASTORE_METRICS("hive.metastore.metrics.enabled", false, "Enable metrics on the metastore."),
// Parameters for exporting metadata on table drop (requires the use of the)
// org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener
@@ -1688,6 +1689,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
" EXECUTION: Log completion of tasks\n" +
" PERFORMANCE: Execution + Performance logs \n" +
" VERBOSE: All logs" ),
+ HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."),
// logging configuration
HIVE_LOG4J_FILE("hive.log4j.file", "",
"Hive log4j configuration file.\n" +
@@ -1715,7 +1717,21 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME(
"hive.autogen.columnalias.prefix.includefuncname", false,
"Whether to include function name in the column alias auto generated by Hive."),
-
+ HIVE_METRICS_CLASS("hive.service.metrics.class",
+ "org.apache.hadoop.hive.common.metrics.metrics2.Metrics",
+ new StringSet(
+ "org.apache.hadoop.hive.common.metrics.metrics2.Metrics",
+ "org.apache.hadoop.hive.common.metrics.MetricsLegacy"),
+ "Hive metrics subsystem implementation class."),
+ HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX",
+ "Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.Metrics, comma separated list of JMX, CONSOLE, JSON_FILE"),
+ HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.json.file.location", "file:///tmp/my-logging.properties",
+ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.Metrics JSON_FILE reporter, the location of JSON metrics file. " +
+ "This file will get overwritten at every interval."),
+ HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.json.file.interval", "5s",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.Metrics JSON_FILE reporter, " +
+ "the frequency of updating JSON metrics file."),
HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger",
"The class responsible for logging client side performance metrics. \n" +
"Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger"),
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
deleted file mode 100644
index e85d3f8..0000000
--- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.common.metrics;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.Attribute;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanOperationInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.hadoop.hive.common.metrics.Metrics.MetricsScope;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestMetrics {
-
- private static final String scopeName = "foo";
- private static final long periodMs = 50L;
-
- @Before
- public void before() throws Exception {
- Metrics.uninit();
- Metrics.init();
- }
-
- @After
- public void after() throws Exception {
- Metrics.uninit();
- }
-
- @Test
- public void testMetricsMBean() throws Exception {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- final ObjectName oname = new ObjectName(
- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
- MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
- // check implementation class:
- assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
-
- // check reset operation:
- MBeanOperationInfo[] oops = mBeanInfo.getOperations();
- boolean resetFound = false;
- for (MBeanOperationInfo op : oops) {
- if ("reset".equals(op.getName())) {
- resetFound = true;
- break;
- }
- }
- assertTrue(resetFound);
-
- // add metric with a non-null value:
- Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
- mbs.setAttribute(oname, attr);
-
- mBeanInfo = mbs.getMBeanInfo(oname);
- MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
- assertEquals(1, attrinuteInfos.length);
- boolean attrFound = false;
- for (MBeanAttributeInfo info : attrinuteInfos) {
- if ("fooMetric".equals(info.getName())) {
- assertEquals("java.lang.Long", info.getType());
- assertTrue(info.isReadable());
- assertTrue(info.isWritable());
- assertFalse(info.isIs());
-
- attrFound = true;
- break;
- }
- }
- assertTrue(attrFound);
-
- // check metric value:
- Object v = mbs.getAttribute(oname, "fooMetric");
- assertEquals(Long.valueOf(-77), v);
-
- // reset the bean:
- Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
- assertNull(result);
-
- // the metric value must be zeroed:
- v = mbs.getAttribute(oname, "fooMetric");
- assertEquals(Long.valueOf(0), v);
- }
-
- private void expectIOE(Callable c) throws Exception {
- try {
- T t = c.call();
- fail("IOE expected but ["+t+"] was returned.");
- } catch (IOException ioe) {
- // ok, expected
- }
- }
-
- @Test
- public void testScopeSingleThread() throws Exception {
- final MetricsScope fooScope = Metrics.startScope(scopeName);
- // the time and number counters become available only after the 1st
- // scope close:
- expectIOE(new Callable() {
- @Override
- public Long call() throws Exception {
- Long num = fooScope.getNumCounter();
- return num;
- }
- });
- expectIOE(new Callable() {
- @Override
- public Long call() throws Exception {
- Long time = fooScope.getTimeCounter();
- return time;
- }
- });
- // cannot open scope that is already open:
- expectIOE(new Callable() {
- @Override
- public Void call() throws Exception {
- fooScope.open();
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs+1);
- // 1st close:
- // closing of open scope should be ok:
- Metrics.endScope(scopeName);
- expectIOE(new Callable() {
- @Override
- public Void call() throws Exception {
- Metrics.endScope(scopeName); // closing of closed scope not allowed
- return null;
- }
- });
-
- assertEquals(Long.valueOf(1), fooScope.getNumCounter());
- final long t1 = fooScope.getTimeCounter().longValue();
- assertTrue(t1 > periodMs);
-
- assertSame(fooScope, Metrics.getScope(scopeName));
-
- // opening allowed after closing:
- Metrics.startScope(scopeName);
- // opening of already open scope not allowed:
- expectIOE(new Callable() {
- @Override
- public Void call() throws Exception {
- Metrics.startScope(scopeName);
- return null;
- }
- });
-
- assertEquals(Long.valueOf(1), fooScope.getNumCounter());
- assertEquals(t1, fooScope.getTimeCounter().longValue());
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs + 1);
- // Reopening (close + open) allowed in opened state:
- fooScope.reopen();
-
- assertEquals(Long.valueOf(2), fooScope.getNumCounter());
- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
-
- Thread.sleep(periodMs + 1);
- // 3rd close:
- fooScope.close();
-
- assertEquals(Long.valueOf(3), fooScope.getNumCounter());
- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
- Double avgT = (Double)Metrics.get("foo.avg_t");
- assertTrue(avgT.doubleValue() > periodMs);
- }
-
- @Test
- public void testScopeConcurrency() throws Exception {
- MetricsScope fooScope = Metrics.startScope(scopeName);
- final int threads = 10;
- ExecutorService executorService = Executors.newFixedThreadPool(threads);
- for (int i=0; i() {
- @Override
- public Void call() throws Exception {
- testScopeImpl(n);
- return null;
- }
- });
- }
- executorService.shutdown();
- assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS));
-
- fooScope = Metrics.getScope(scopeName);
- assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads);
- Double avgT = (Double)Metrics.get("foo.avg_t");
- assertTrue(avgT.doubleValue() > periodMs);
- Metrics.endScope(scopeName);
- }
-
- void testScopeImpl(int n) throws Exception {
- final MetricsScope fooScope = Metrics.startScope(scopeName);
- // cannot open scope that is already open:
- expectIOE(new Callable() {
- @Override
- public Void call() throws Exception {
- fooScope.open();
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs+1);
- // 1st close:
- Metrics.endScope(scopeName); // closing of open scope should be ok.
-
- assertTrue(fooScope.getNumCounter().longValue() >= 1);
- final long t1 = fooScope.getTimeCounter().longValue();
- assertTrue(t1 > periodMs);
-
- expectIOE(new Callable() {
- @Override
- public Void call() throws Exception {
- Metrics.endScope(scopeName); // closing of closed scope not allowed
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
-
- // opening allowed after closing:
- Metrics.startScope(scopeName);
-
- assertTrue(fooScope.getNumCounter().longValue() >= 1);
- assertTrue(fooScope.getTimeCounter().longValue() >= t1);
-
- // opening of already open scope not allowed:
- expectIOE(new Callable() {
- @Override
- public Void call() throws Exception {
- Metrics.startScope(scopeName);
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs + 1);
- // Reopening (close + open) allowed in opened state:
- fooScope.reopen();
-
- assertTrue(fooScope.getNumCounter().longValue() >= 2);
- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
-
- Thread.sleep(periodMs + 1);
- // 3rd close:
- fooScope.close();
-
- assertTrue(fooScope.getNumCounter().longValue() >= 3);
- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
- Double avgT = (Double)Metrics.get("foo.avg_t");
- assertTrue(avgT.doubleValue() > periodMs);
- }
-}
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetricsLegacy.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetricsLegacy.java
new file mode 100644
index 0000000..f851319
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetricsLegacy.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.Attribute;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.hive.common.metrics.MetricsLegacy;
+import org.apache.hadoop.hive.common.metrics.MetricsMBeanImpl;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.MetricsLegacy.MetricsScope;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestMetricsLegacy {
+
+ private static final String scopeName = "foo";
+ private static final long periodMs = 50L;
+ private static MetricsLegacy metrics;
+
+ @Before
+ public void before() throws Exception {
+ MetricsFactory.uninit();
+ HiveConf conf = new HiveConf();
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, MetricsLegacy.class.getCanonicalName());
+ MetricsFactory.init(conf);
+ metrics = (MetricsLegacy) MetricsFactory.getMetricsInstance();
+ }
+
+ @After
+ public void after() throws Exception {
+ MetricsFactory.uninit();
+ }
+
+ @Test
+ public void testMetricsMBean() throws Exception {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ final ObjectName oname = new ObjectName(
+ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+ MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
+ // check implementation class:
+ assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
+
+ // check reset operation:
+ MBeanOperationInfo[] oops = mBeanInfo.getOperations();
+ boolean resetFound = false;
+ for (MBeanOperationInfo op : oops) {
+ if ("reset".equals(op.getName())) {
+ resetFound = true;
+ break;
+ }
+ }
+ assertTrue(resetFound);
+
+ // add metric with a non-null value:
+ Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
+ mbs.setAttribute(oname, attr);
+
+ mBeanInfo = mbs.getMBeanInfo(oname);
+ MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
+ assertEquals(1, attrinuteInfos.length);
+ boolean attrFound = false;
+ for (MBeanAttributeInfo info : attrinuteInfos) {
+ if ("fooMetric".equals(info.getName())) {
+ assertEquals("java.lang.Long", info.getType());
+ assertTrue(info.isReadable());
+ assertTrue(info.isWritable());
+ assertFalse(info.isIs());
+
+ attrFound = true;
+ break;
+ }
+ }
+ assertTrue(attrFound);
+
+ // check metric value:
+ Object v = mbs.getAttribute(oname, "fooMetric");
+ assertEquals(Long.valueOf(-77), v);
+
+ // reset the bean:
+ Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
+ assertNull(result);
+
+ // the metric value must be zeroed:
+ v = mbs.getAttribute(oname, "fooMetric");
+ assertEquals(Long.valueOf(0), v);
+ }
+
+ private void expectIOE(Callable c) throws Exception {
+ try {
+ T t = c.call();
+ fail("IOE expected but ["+t+"] was returned.");
+ } catch (IOException ioe) {
+ // ok, expected
+ }
+ }
+
+ @Test
+ public void testScopeSingleThread() throws Exception {
+ metrics.startScope(scopeName);
+ final MetricsScope fooScope = metrics.getScope(scopeName);
+ // the time and number counters become available only after the 1st
+ // scope close:
+ expectIOE(new Callable() {
+ @Override
+ public Long call() throws Exception {
+ Long num = fooScope.getNumCounter();
+ return num;
+ }
+ });
+ expectIOE(new Callable() {
+ @Override
+ public Long call() throws Exception {
+ Long time = fooScope.getTimeCounter();
+ return time;
+ }
+ });
+ // cannot open scope that is already open:
+ expectIOE(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ fooScope.open();
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs+ 1);
+ // 1st close:
+ // closing of open scope should be ok:
+ metrics.endScope(scopeName);
+ expectIOE(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ metrics.endScope(scopeName); // closing of closed scope not allowed
+ return null;
+ }
+ });
+
+ assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+ final long t1 = fooScope.getTimeCounter().longValue();
+ assertTrue(t1 > periodMs);
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+
+ // opening allowed after closing:
+ metrics.startScope(scopeName);
+ // opening of already open scope not allowed:
+ expectIOE(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ metrics.startScope(scopeName);
+ return null;
+ }
+ });
+
+ assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+ assertEquals(t1, fooScope.getTimeCounter().longValue());
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs + 1);
+ // Reopening (close + open) allowed in opened state:
+ fooScope.reopen();
+
+ assertEquals(Long.valueOf(2), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+
+ Thread.sleep(periodMs + 1);
+ // 3rd close:
+ fooScope.close();
+
+ assertEquals(Long.valueOf(3), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+ Double avgT = (Double) metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ }
+
+ @Test
+ public void testScopeConcurrency() throws Exception {
+ metrics.startScope(scopeName);
+ MetricsScope fooScope = metrics.getScope(scopeName);
+ final int threads = 10;
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ for (int i=0; i() {
+ @Override
+ public Void call() throws Exception {
+ testScopeImpl(n);
+ return null;
+ }
+ });
+ }
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS));
+
+ fooScope = metrics.getScope(scopeName);
+ assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads);
+ Double avgT = (Double) metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ metrics.endScope(scopeName);
+ }
+
+ void testScopeImpl(int n) throws Exception {
+ metrics.startScope(scopeName);
+ final MetricsScope fooScope = metrics.getScope(scopeName);
+ // cannot open scope that is already open:
+ expectIOE(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ fooScope.open();
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs+ 1);
+ // 1st close:
+ metrics.endScope(scopeName); // closing of open scope should be ok.
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 1);
+ final long t1 = fooScope.getTimeCounter().longValue();
+ assertTrue(t1 > periodMs);
+
+ expectIOE(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ metrics.endScope(scopeName); // closing of closed scope not allowed
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+
+ // opening allowed after closing:
+ metrics.startScope(scopeName);
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 1);
+ assertTrue(fooScope.getTimeCounter().longValue() >= t1);
+
+ // opening of already open scope not allowed:
+ expectIOE(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ metrics.startScope(scopeName);
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs + 1);
+ // Reopening (close + open) allowed in opened state:
+ fooScope.reopen();
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 2);
+ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+
+ Thread.sleep(periodMs + 1);
+ // 3rd close:
+ fooScope.close();
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 3);
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+ Double avgT = (Double) metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ }
+}
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestMetrics.java
new file mode 100644
index 0000000..34e7430
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestMetrics.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for new Metrics subsystem.
+ */
+public class TestMetrics {
+
+ private static File workDir = new File(System.getProperty("test.tmp.dir"));
+ private static File jsonReportFile;
+ public static MetricRegistry metricRegistry;
+
+ @BeforeClass
+ public static void before() throws Exception {
+ HiveConf conf = new HiveConf();
+
+ jsonReportFile = new File(workDir, "json_reporting");
+ jsonReportFile.delete();
+ String defaultFsName = ShimLoader.getHadoopShims().getHadoopConfNames().get("HADOOPFS");
+ conf.set(defaultFsName, "local");
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, Metrics.class.getCanonicalName());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "50ms");
+
+ MetricsFactory.init(conf);
+ metricRegistry = ((Metrics) MetricsFactory.getMetricsInstance()).getMetricRegistry();
+ }
+
+ @Test
+ public void testScope() throws Exception {
+ int runs = 5;
+ for (int i = 0; i < runs; i++) {
+ MetricsFactory.getMetricsInstance().startScope("method1");
+ MetricsFactory.getMetricsInstance().endScope("method1");
+ }
+
+ Timer timer = metricRegistry.getTimers().get("api_method1");
+ Assert.assertEquals(5, timer.getCount());
+ Assert.assertTrue(timer.getMeanRate() > 0);
+ }
+
+
+ @Test
+ public void testCount() throws Exception {
+ int runs = 5;
+ for (int i = 0; i < runs; i++) {
+ MetricsFactory.getMetricsInstance().incrementCounter("count1");
+ }
+ Counter counter = metricRegistry.getCounters().get("count1");
+ Assert.assertEquals(5L, counter.getCount());
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ int threads = 4;
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ for (int i=0; i< threads; i++) {
+ final int n = i;
+ executorService.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ MetricsFactory.getMetricsInstance().startScope("method2");
+ MetricsFactory.getMetricsInstance().endScope("method2");
+ return null;
+ }
+ });
+ }
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(10000, TimeUnit.MILLISECONDS));
+ Timer timer = metricRegistry.getTimers().get("api_method2");
+ Assert.assertEquals(4, timer.getCount());
+ Assert.assertTrue(timer.getMeanRate() > 0);
+ }
+
+ @Test
+ public void testFileReporting() throws Exception {
+ int runs = 5;
+ for (int i = 0; i < runs; i++) {
+ MetricsFactory.getMetricsInstance().incrementCounter("count2");
+ Thread.sleep(100);
+ }
+
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode countersNode = rootNode.path("counters");
+ JsonNode methodCounterNode = countersNode.path("count2");
+ JsonNode countNode = methodCounterNode.path("count");
+ Assert.assertEquals(countNode.asInt(), 5);
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
new file mode 100644
index 0000000..1758947
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import junit.framework.TestCase;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * Tests Hive Metastore Metrics.
+ */
+public class TestMetaStoreMetrics {
+
+ private static File workDir = new File(System.getProperty("test.tmp.dir"));
+ private static File jsonReportFile;
+
+ private static HiveConf hiveConf;
+ private static Driver driver;
+
+
+ @BeforeClass
+ public static void before() throws Exception {
+
+ int port = MetaStoreUtils.findFreePort();
+
+ jsonReportFile = new File(workDir, "json_reporting");
+ jsonReportFile.delete();
+
+ hiveConf = new HiveConf(TestMetaStoreMetrics.class);
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port);
+ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString());
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "50ms");
+
+ MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf);
+
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
+
+ @Test
+ public void testMetricsFile() throws Exception {
+ driver.run("show databases");
+
+ //give timer thread a chance to print the metrics
+ Thread.sleep(500);
+
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode countersNode = rootNode.path("timers");
+ JsonNode methodCounterNode = countersNode.path("api_get_all_databases");
+ JsonNode countNode = methodCounterNode.path("count");
+ Assert.assertTrue(countNode.asInt() > 0);
+ }
+}
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index d81c856..5ce85e4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -18,39 +18,14 @@
package org.apache.hadoop.hive.metastore;
-import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Formatter;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Timer;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
-import javax.jdo.JDOException;
-
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.fb_status;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,12 +33,13 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.common.cli.CommonCliOptions;
-import org.apache.hadoop.hive.common.metrics.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
@@ -221,14 +197,35 @@
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
-import com.facebook.fb303.FacebookBase;
-import com.facebook.fb303.fb_status;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
+import javax.jdo.JDOException;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.hadoop.hive.metastore.MetaStoreUtils.*;
/**
* TODO:pc remove application logic to a separate interface.
@@ -464,9 +461,10 @@ public void init() throws MetaException {
}
}
- if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) {
+ //Start Metrics for Embedded mode
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
try {
- Metrics.init();
+ MetricsFactory.init(hiveConf);
} catch (Exception e) {
// log exception, but ignore inability to start
LOG.error("error in Metrics init: " + e.getClass().getName() + " "
@@ -750,11 +748,13 @@ private String startFunction(String function, String extraLogInfo) {
incrementCounter(function);
logInfo((getIpAddress() == null ? "" : "source:" + getIpAddress() + " ") +
function + extraLogInfo);
- try {
- Metrics.startScope(function);
- } catch (IOException e) {
- LOG.debug("Exception when starting metrics scope"
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.getMetricsInstance().startScope(function);
+ } catch (IOException e) {
+ LOG.debug("Exception when starting metrics scope"
+ e.getClass().getName() + " " + e.getMessage(), e);
+ }
}
return function;
}
@@ -792,10 +792,12 @@ private void endFunction(String function, boolean successful, Exception e,
}
private void endFunction(String function, MetaStoreEndFunctionContext context) {
- try {
- Metrics.endScope(function);
- } catch (IOException e) {
- LOG.debug("Exception when closing metrics scope" + e);
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.getMetricsInstance().endScope(function);
+ } catch (IOException e) {
+ LOG.debug("Exception when closing metrics scope" + e);
+ }
}
for (MetaStoreEndFunctionListener listener : endFunctionListeners) {
@@ -5901,6 +5903,16 @@ public void run() {
}
});
+ //Start Metrics for Standalone (Remote) Mode
+ if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.init(conf);
+ } catch (Exception e) {
+ // log exception, but ignore inability to start
+ LOG.error("error in Metrics init: " + e.getClass().getName() + " "
+ + e.getMessage(), e);
+ }
+ }
Lock startLock = new ReentrantLock();
Condition startCondition = startLock.newCondition();
@@ -6091,7 +6103,13 @@ public void run() {
// Wrap the start of the threads in a catch Throwable loop so that any failures
// don't doom the rest of the metastore.
startLock.lock();
- ShimLoader.getHadoopShims().startPauseMonitor(conf);
+ try {
+ JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf);
+ pauseMonitor.start();
+ } catch (Throwable t) {
+ LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
+ "warned upon.", t);
+ }
try {
// Per the javadocs on Condition, do not depend on the condition alone as a start gate
diff --git a/pom.xml b/pom.xml
index b21d894..35133f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,7 @@
1.5.4
1.4
10.11.1.1
+ 3.1.0
14.0.1
2.1.6
1.2.1
@@ -128,6 +129,8 @@
4.4
2.4.0
1.9.2
+
+ 2.4.2
0.3.2
5.5.1
3.0.1
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 58e8e49..afbd407 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -42,14 +42,15 @@
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.HiveStringUtils;
@@ -344,7 +345,18 @@ private static void startHiveServer2() throws Throwable {
server = new HiveServer2();
server.init(hiveConf);
server.start();
- ShimLoader.getHadoopShims().startPauseMonitor(hiveConf);
+
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
+ MetricsFactory.getMetricsInstance().init(hiveConf);
+ }
+ try {
+ JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(hiveConf);
+ pauseMonitor.start();
+ } catch (Throwable t) {
+ LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
+ "warned upon.", t);
+ }
+
// If we're supporting dynamic service discovery, we'll add the service uri for this
// HiveServer2 instance to Zookeeper as a znode.
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
index 6d8166c..ffffcb7 100644
--- a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
+++ b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
@@ -159,11 +159,6 @@ public TaskAttemptID newTaskAttemptID(JobID jobId, boolean isMap, int taskId, in
}
@Override
- public void startPauseMonitor(Configuration conf) {
- /* no supported */
- }
-
- @Override
public boolean isLocalMode(Configuration conf) {
return "local".equals(getJobLauncherRpcAddress(conf));
}
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 19324b8..5ddab98 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -212,19 +212,6 @@ public TaskAttemptID newTaskAttemptID(JobID jobId, boolean isMap, int taskId, in
}
@Override
- public void startPauseMonitor(Configuration conf) {
- try {
- Class.forName("org.apache.hadoop.util.JvmPauseMonitor");
- org.apache.hadoop.util.JvmPauseMonitor pauseMonitor = new org.apache.hadoop.util
- .JvmPauseMonitor(conf);
- pauseMonitor.start();
- } catch (Throwable t) {
- LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
- "warned upon.", t);
- }
- }
-
- @Override
public boolean isLocalMode(Configuration conf) {
return "local".equals(conf.get("mapreduce.framework.name"));
}
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 5a6bc44..5b7e7f6 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -146,8 +146,6 @@ MiniDFSShim getMiniDfs(Configuration conf,
public JobContext newJobContext(Job job);
- public void startPauseMonitor(Configuration conf);
-
/**
* Check wether MR is configured to run in local-mode
* @param conf