diff --git a/common/pom.xml b/common/pom.xml
index 2292fdf..cd14581 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -239,6 +239,23 @@
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+ *.xml
+
+
+
+
+
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
index 52d99e4..9be9b50 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.common.metrics;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsScope;
import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -56,7 +57,7 @@ private LegacyMetrics() {
* (i) a "number of calls" counter ( <name>.n ), and
* (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t)
*/
- public static class MetricsScope {
+ public static class LegacyMetricsScope implements MetricsScope {
final LegacyMetrics metrics;
@@ -73,7 +74,7 @@ private LegacyMetrics() {
* @param name - name of the variable
* @throws IOException
*/
- private MetricsScope(String name, LegacyMetrics metrics) throws IOException {
+ private LegacyMetricsScope(String name, LegacyMetrics metrics) throws IOException {
this.metrics = metrics;
this.name = name;
this.numCounter = name + ".n";
@@ -150,11 +151,11 @@ public void reopen() throws IOException {
}
}
- private static final ThreadLocal> threadLocalScopes
- = new ThreadLocal>() {
+ private static final ThreadLocal> threadLocalScopes
+ = new ThreadLocal>() {
@Override
- protected HashMap initialValue() {
- return new HashMap();
+ protected HashMap initialValue() {
+ return new HashMap();
}
};
@@ -212,15 +213,15 @@ public Object get(String name) throws IOException{
return metrics.get(name);
}
- public void startScope(String name) throws IOException{
+ public void startStoredScope(String name) throws IOException{
if (threadLocalScopes.get().containsKey(name)) {
threadLocalScopes.get().get(name).open();
} else {
- threadLocalScopes.get().put(name, new MetricsScope(name, this));
+ threadLocalScopes.get().put(name, new LegacyMetricsScope(name, this));
}
}
- public MetricsScope getScope(String name) throws IOException {
+ public MetricsScope getStoredScope(String name) throws IOException {
if (threadLocalScopes.get().containsKey(name)) {
return threadLocalScopes.get().get(name);
} else {
@@ -228,13 +229,19 @@ public MetricsScope getScope(String name) throws IOException {
}
}
- public void endScope(String name) throws IOException{
+ public void endStoredScope(String name) throws IOException{
if (threadLocalScopes.get().containsKey(name)) {
threadLocalScopes.get().get(name).close();
}
}
+ public MetricsScope createScope(String name) throws IOException {
+ return new LegacyMetricsScope(name, this);
+ }
+ public void endScope(MetricsScope scope) throws IOException {
+ ((LegacyMetricsScope) scope).close();
+ }
/**
* Resets the static context state to initial.
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
index 49b2b32..4297233 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
@@ -28,20 +28,40 @@
*/
public interface Metrics {
- //Must declare CTOR taking in HiveConf.
-
/**
* Deinitializes the Metrics system.
*/
public void close() throws Exception;
/**
+ *
+ * @param name starts a scope of a given name. Scopes is stored as thread-local variable.
+ * @throws IOException
+ */
+ public void startStoredScope(String name) throws IOException;
+
+ /**
+ * Closes the stored scope of a given name.
+ * Note that this must be called on the same thread as where the scope was started.
+ * @param name
+ * @throws IOException
+ */
+ public void endStoredScope(String name) throws IOException;
+
+ /**
+ * Create scope with given name and returns it.
* @param name
+ * @return
* @throws IOException
*/
- public void startScope(String name) throws IOException;
+ public MetricsScope createScope(String name) throws IOException;
- public void endScope(String name) throws IOException;
+ /**
+ * Close the given scope.
+ * @param scope
+ * @throws IOException
+ */
+ public void endScope(MetricsScope scope) throws IOException;
//Counter-related methods
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
index a5aa995..f18aa6e 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
@@ -38,4 +38,10 @@
public static final String ZOOKEEPER_HIVE_SHAREDLOCKS = "zookeeper_hive_sharedlocks";
public static final String ZOOKEEPER_HIVE_EXCLUSIVELOCKS = "zookeeper_hive_exclusivelocks";
public static final String ZOOKEEPER_HIVE_SEMISHAREDLOCKS = "zookeeper_hive_semisharedlocks";
+
+ public static final String EXEC_ASYNC_QUEUE_SIZE = "exec_async_queue_size";
+ public static final String EXEC_ASYNC_POOL_SIZE = "exec_async_pool_size";
+
+ public static final String OPERATION_PREFIX = "hs2_operation_";
+ public static final String COMPLETED_OPERATION_PREFIX = "hs2_completed_operation_";
}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsScope.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsScope.java
new file mode 100644
index 0000000..3d6a23e
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsScope.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+/**
+ * Metrics Scope to represent duration of an event.
+ *
+ * Implementation can capture information like the average duration of open scopes,
+ * number of open scopes, number of completed scopes.
+ *
+ * Scopes are created via the Metrics framework (see Metrics#createScope or Metrics$createStoredScope)
+ *
+ * Scope may be stored by the Metrics framework via 'storedScope' concept for further reference.
+ *
+ * In either case, it is the caller's responsibility to end the scope via the Metrics framework (see Metrics#endScope)
+ */
+public interface MetricsScope {
+}
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
index 3db26af..cba1c5a 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.metrics.common.MetricsScope;
import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
@@ -72,6 +73,7 @@
*/
public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.common.Metrics {
public static final String API_PREFIX = "api_";
+ public static final String ACTIVE_CALLS = "active_calls_";
public static final Logger LOGGER = LoggerFactory.getLogger(CodahaleMetrics.class);
public final MetricRegistry metricRegistry = new MetricRegistry();
@@ -86,15 +88,15 @@
private HiveConf conf;
private final Set reporters = new HashSet();
- private final ThreadLocal> threadLocalScopes
- = new ThreadLocal>() {
+ private final ThreadLocal> threadLocalScopes
+ = new ThreadLocal>() {
@Override
- protected HashMap initialValue() {
- return new HashMap();
+ protected HashMap initialValue() {
+ return new HashMap();
}
};
- public static class MetricsScope {
+ public static class CodahaleMetricsScope implements MetricsScope {
final String name;
final Timer timer;
@@ -108,7 +110,7 @@
* @param name - name of the variable
* @throws IOException
*/
- private MetricsScope(String name, CodahaleMetrics metrics) throws IOException {
+ private CodahaleMetricsScope(String name, CodahaleMetrics metrics) throws IOException {
this.name = name;
this.metrics = metrics;
this.timer = metrics.getTimer(name);
@@ -124,6 +126,7 @@ public void open() throws IOException {
if (!isOpen) {
isOpen = true;
this.timerContext = timer.time();
+ metrics.incrementCounter(ACTIVE_CALLS + name);
} else {
throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
}
@@ -136,7 +139,7 @@ public void open() throws IOException {
public void close() throws IOException {
if (isOpen) {
timerContext.close();
-
+ metrics.decrementCounter(ACTIVE_CALLS + name);
} else {
throw new IOException("Scope named " + name + " is not open, cannot be closed.");
}
@@ -210,23 +213,41 @@ public void close() throws Exception {
}
@Override
- public void startScope(String name) throws IOException {
+ public void startStoredScope(String name) throws IOException {
name = API_PREFIX + name;
if (threadLocalScopes.get().containsKey(name)) {
threadLocalScopes.get().get(name).open();
} else {
- threadLocalScopes.get().put(name, new MetricsScope(name, this));
+ threadLocalScopes.get().put(name, new CodahaleMetricsScope(name, this));
}
}
@Override
- public void endScope(String name) throws IOException {
+ public void endStoredScope(String name) throws IOException {
name = API_PREFIX + name;
if (threadLocalScopes.get().containsKey(name)) {
threadLocalScopes.get().get(name).close();
+ threadLocalScopes.get().remove(name);
+ }
+ }
+
+ public MetricsScope getStoredScope(String name) throws IOException {
+ if (threadLocalScopes.get().containsKey(name)) {
+ return threadLocalScopes.get().get(name);
+ } else {
+ throw new IOException("No metrics scope named " + name);
}
}
+ public MetricsScope createScope(String name) throws IOException {
+ name = API_PREFIX + name;
+ return new CodahaleMetricsScope(name, this);
+ }
+
+ public void endScope(MetricsScope scope) throws IOException {
+ ((CodahaleMetricsScope) scope).close();
+ }
+
@Override
public Long incrementCounter(String name) throws IOException {
return incrementCounter(name, 1L);
diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index 548d7db..1ef636c 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.hive.ql.log;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -116,6 +119,7 @@ public void PerfLogBegin(String callerName, String method) {
long startTime = System.currentTimeMillis();
LOG.info("");
startTimes.put(method, new Long(startTime));
+ beginMetrics(method);
}
/**
* Call this function in correspondence of PerfLogBegin to mark the end of the measurement.
@@ -156,6 +160,8 @@ public long PerfLogEnd(String callerName, String method, String additionalInfo)
sb.append(">");
LOG.info(sb.toString());
+ endMetrics(method);
+
return duration;
}
@@ -193,4 +199,25 @@ public Long getDuration(String method) {
return duration;
}
+ private void beginMetrics(String method) {
+ Metrics metrics = MetricsFactory.getInstance();
+ try {
+ if (metrics != null) {
+ metrics.startStoredScope(method);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error recording metrics", e);
+ }
+ }
+
+ private void endMetrics(String method) {
+ Metrics metrics = MetricsFactory.getInstance();
+ try {
+ if (metrics != null) {
+ metrics.endStoredScope(method);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error recording metrics", e);
+ }
+ }
}
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/MetricsTestUtils.java b/common/src/test/org/apache/hadoop/hive/common/metrics/MetricsTestUtils.java
new file mode 100644
index 0000000..fd420f7
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/MetricsTestUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * Utilities for codahale metrics verification.
+ */
+public class MetricsTestUtils {
+
+ public static final MetricsCategory COUNTER = new MetricsCategory("counters", "count");
+ public static final MetricsCategory TIMER = new MetricsCategory("timers", "count");
+ public static final MetricsCategory GAUGE = new MetricsCategory("gauges", "value");
+
+ static class MetricsCategory {
+ String category;
+ String metricsHandle;
+ MetricsCategory(String category, String metricsHandle) {
+ this.category = category;
+ this.metricsHandle = metricsHandle;
+ }
+ }
+
+ public static void verifyMetricFile(File jsonReportFile, MetricsCategory category, String metricsName,
+ Object value) throws Exception {
+ JsonNode jsonNode = getJsonNode(jsonReportFile, category, metricsName);
+ Assert.assertEquals(jsonNode.asText(), value.toString());
+ }
+
+ private static JsonNode getJsonNode(File jsonReportFile, MetricsCategory category, String metricsName) throws Exception {
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode categoryNode = rootNode.path(category.category);
+ JsonNode metricsNode = categoryNode.path(metricsName);
+ return metricsNode.path(category.metricsHandle);
+ }
+}
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
index c3e8282..a3fb04f 100644
--- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
@@ -32,7 +32,7 @@
import javax.management.ObjectName;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
-import org.apache.hadoop.hive.common.metrics.LegacyMetrics.MetricsScope;
+import org.apache.hadoop.hive.common.metrics.common.MetricsScope;
import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.After;
import org.junit.Before;
@@ -124,8 +124,8 @@ public void testMetricsMBean() throws Exception {
@Test
public void testScopeSingleThread() throws Exception {
- metrics.startScope(scopeName);
- final MetricsScope fooScope = metrics.getScope(scopeName);
+ metrics.startStoredScope(scopeName);
+ final LegacyMetrics.LegacyMetricsScope fooScope = (LegacyMetrics.LegacyMetricsScope) metrics.getStoredScope(scopeName);
// the time and number counters become available only after the 1st
// scope close:
expectIOE(new Callable() {
@@ -151,15 +151,15 @@ public Void call() throws Exception {
}
});
- assertSame(fooScope, metrics.getScope(scopeName));
+ assertSame(fooScope, metrics.getStoredScope(scopeName));
Thread.sleep(periodMs+ 1);
// 1st close:
// closing of open scope should be ok:
- metrics.endScope(scopeName);
+ metrics.endStoredScope(scopeName);
expectIOE(new Callable() {
@Override
public Void call() throws Exception {
- metrics.endScope(scopeName); // closing of closed scope not allowed
+ metrics.endStoredScope(scopeName); // closing of closed scope not allowed
return null;
}
});
@@ -168,15 +168,15 @@ public Void call() throws Exception {
final long t1 = fooScope.getTimeCounter().longValue();
assertTrue(t1 > periodMs);
- assertSame(fooScope, metrics.getScope(scopeName));
+ assertSame(fooScope, metrics.getStoredScope(scopeName));
// opening allowed after closing:
- metrics.startScope(scopeName);
+ metrics.startStoredScope(scopeName);
// opening of already open scope not allowed:
expectIOE(new Callable() {
@Override
public Void call() throws Exception {
- metrics.startScope(scopeName);
+ metrics.startStoredScope(scopeName);
return null;
}
});
@@ -184,7 +184,7 @@ public Void call() throws Exception {
assertEquals(Long.valueOf(1), fooScope.getNumCounter());
assertEquals(t1, fooScope.getTimeCounter().longValue());
- assertSame(fooScope, metrics.getScope(scopeName));
+ assertSame(fooScope, metrics.getStoredScope(scopeName));
Thread.sleep(periodMs + 1);
// Reopening (close + open) allowed in opened state:
fooScope.reopen();
@@ -204,8 +204,8 @@ public Void call() throws Exception {
@Test
public void testScopeConcurrency() throws Exception {
- metrics.startScope(scopeName);
- MetricsScope fooScope = metrics.getScope(scopeName);
+ metrics.startStoredScope(scopeName);
+ LegacyMetrics.LegacyMetricsScope fooScope = (LegacyMetrics.LegacyMetricsScope) metrics.getStoredScope(scopeName);
final int threads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(threads);
for (int i=0; i 3 * periodMs * threads);
Double avgT = (Double) metrics.get("foo.avg_t");
assertTrue(avgT.doubleValue() > periodMs);
- metrics.endScope(scopeName);
+ metrics.endStoredScope(scopeName);
}
void testScopeImpl(int n) throws Exception {
- metrics.startScope(scopeName);
- final MetricsScope fooScope = metrics.getScope(scopeName);
+ metrics.startStoredScope(scopeName);
+ final LegacyMetrics.LegacyMetricsScope fooScope = (LegacyMetrics.LegacyMetricsScope) metrics.getStoredScope(scopeName);
// cannot open scope that is already open:
expectIOE(new Callable() {
@Override
@@ -241,10 +241,10 @@ public Void call() throws Exception {
}
});
- assertSame(fooScope, metrics.getScope(scopeName));
+ assertSame(fooScope, metrics.getStoredScope(scopeName));
Thread.sleep(periodMs+ 1);
// 1st close:
- metrics.endScope(scopeName); // closing of open scope should be ok.
+ metrics.endStoredScope(scopeName); // closing of open scope should be ok.
assertTrue(fooScope.getNumCounter().longValue() >= 1);
final long t1 = fooScope.getTimeCounter().longValue();
@@ -253,15 +253,15 @@ public Void call() throws Exception {
expectIOE(new Callable() {
@Override
public Void call() throws Exception {
- metrics.endScope(scopeName); // closing of closed scope not allowed
+ metrics.endStoredScope(scopeName); // closing of closed scope not allowed
return null;
}
});
- assertSame(fooScope, metrics.getScope(scopeName));
+ assertSame(fooScope, metrics.getStoredScope(scopeName));
// opening allowed after closing:
- metrics.startScope(scopeName);
+ metrics.startStoredScope(scopeName);
assertTrue(fooScope.getNumCounter().longValue() >= 1);
assertTrue(fooScope.getTimeCounter().longValue() >= t1);
@@ -270,12 +270,12 @@ public Void call() throws Exception {
expectIOE(new Callable() {
@Override
public Void call() throws Exception {
- metrics.startScope(scopeName);
+ metrics.startStoredScope(scopeName);
return null;
}
});
- assertSame(fooScope, metrics.getScope(scopeName));
+ assertSame(fooScope, metrics.getStoredScope(scopeName));
Thread.sleep(periodMs + 1);
// Reopening (close + open) allowed in opened state:
fooScope.reopen();
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
index a3aa549..27825b1 100644
--- a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
@@ -77,8 +77,8 @@ public void after() throws Exception {
public void testScope() throws Exception {
int runs = 5;
for (int i = 0; i < runs; i++) {
- MetricsFactory.getInstance().startScope("method1");
- MetricsFactory.getInstance().endScope("method1");
+ MetricsFactory.getInstance().startStoredScope("method1");
+ MetricsFactory.getInstance().endStoredScope("method1");
}
Timer timer = metricRegistry.getTimers().get("api_method1");
@@ -106,8 +106,8 @@ public void testConcurrency() throws Exception {
executorService.submit(new Callable() {
@Override
public Void call() throws Exception {
- MetricsFactory.getInstance().startScope("method2");
- MetricsFactory.getInstance().endScope("method2");
+ MetricsFactory.getInstance().startStoredScope("method2");
+ MetricsFactory.getInstance().endStoredScope("method2");
return null;
}
});
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index c202001..1809cd8 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -146,6 +146,13 @@
${project.version}
tests
+
+ org.apache.hive
+ hive-common
+ ${project.version}
+ test-jar
+ test
+
junit
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java
new file mode 100644
index 0000000..8fc8841
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHs2Metrics.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc.miniHS2;
+
+import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hive.service.cli.CLIServiceClient;
+import org.apache.hive.service.cli.SessionHandle;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests HiveServer2 metrics.
+ */
+public class TestHs2Metrics {
+
+ private static MiniHS2 miniHS2;
+ private static Map confOverlay;
+ private static File jsonReportFile;
+ private static Object monitor = new Object();
+
+ //Check metrics during semantic analysis.
+ public static class MetricCheckingHook implements HiveSemanticAnalyzerHook {
+ @Override
+ public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context,
+ ASTNode ast) throws SemanticException {
+ try {
+ //Pre-analyze hook is fired in the middle of these calls
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_semanticAnalyze", 1);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_compile", 1);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_hs2_operation_RUNNING", 1);
+ } catch (Exception e) {
+ throw new SemanticException("metrics verification failed", e);
+ }
+ return ast;
+ }
+
+ @Override
+ public void postAnalyze(HiveSemanticAnalyzerHookContext context,
+ List> rootTasks) throws SemanticException {
+ }
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ miniHS2 = new MiniHS2(new HiveConf());
+ confOverlay = new HashMap();
+ confOverlay.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ confOverlay.put(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, MetricCheckingHook.class.getName());
+ miniHS2.start(confOverlay);
+
+ //for Metrics. MiniHS2 init code-path doesn't go through HiveServer2.startHiveServer2().
+ File workDir = new File(System.getProperty("test.tmp.dir"));
+ jsonReportFile = new File(workDir, "json_reporting");
+ jsonReportFile.delete();
+ HiveConf conf = new HiveConf();
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms");
+ MetricsFactory.init(conf);
+ }
+
+ @Test
+ public void testMetrics() throws Exception {
+ String tableName = "testMetrics";
+ CLIServiceClient serviceClient = miniHS2.getServiceClient();
+ SessionHandle sessHandle = serviceClient.openSession("foo", "bar");
+
+ //Block on semantic analysis to check 'active_calls'
+ serviceClient.executeStatement(sessHandle, "CREATE TABLE " + tableName + " (id INT)", confOverlay);
+ Thread.sleep(2000);
+
+ //check that all calls were recorded.
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.TIMER, "api_hs2_operation_INITIALIZED", 1);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.TIMER, "api_hs2_operation_PENDING", 1);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.TIMER, "api_hs2_operation_RUNNING", 1);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "hs2_completed_operation_FINISHED", 1);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.TIMER, "api_Driver.run", 1);
+
+ //but there should be no more active calls.
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_semanticAnalyze", 0);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_compile", 0);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.COUNTER, "active_calls_api_hs2_operation_RUNNING", 0);
+
+ }
+
+}
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 3c40d6e..a835f6a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -676,7 +676,7 @@ private String startFunction(String function, String extraLogInfo) {
function + extraLogInfo);
if (MetricsFactory.getInstance() != null) {
try {
- MetricsFactory.getInstance().startScope(function);
+ MetricsFactory.getInstance().startStoredScope(function);
} catch (IOException e) {
LOG.debug("Exception when starting metrics scope"
+ e.getClass().getName() + " " + e.getMessage(), e);
@@ -720,7 +720,7 @@ private void endFunction(String function, boolean successful, Exception e,
private void endFunction(String function, MetaStoreEndFunctionContext context) {
if (MetricsFactory.getInstance() != null) {
try {
- MetricsFactory.getInstance().endScope(function);
+ MetricsFactory.getInstance().endStoredScope(function);
} catch (IOException e) {
LOG.debug("Exception when closing metrics scope" + e);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 93c7a54..75e9027 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1548,7 +1548,6 @@ public int execute() throws CommandNeedRetryException {
// Launch upto maxthreads tasks
Task extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId());
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
diff --git a/service/pom.xml b/service/pom.xml
index 22234d9..afa52cf 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -121,6 +121,13 @@
test
tests
+
+ org.apache.hive
+ hive-common
+ ${project.version}
+ test
+ test-jar
+
junit
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 4ca0561..ade913e 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -19,13 +19,17 @@
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.util.EnumSet;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -45,6 +49,7 @@
public abstract class Operation {
protected final HiveSession parentSession;
private OperationState state = OperationState.INITIALIZED;
+ private MetricsScope currentStateScope;
private final OperationHandle opHandle;
private HiveConf configuration;
public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT;
@@ -70,6 +75,7 @@ protected Operation(HiveSession parentSession, OperationType opType, boolean run
lastAccessTime = System.currentTimeMillis();
operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
+ setMetrics(state);
}
public Future> getBackgroundHandle() {
@@ -128,6 +134,7 @@ public OperationLog getOperationLog() {
protected final OperationState setState(OperationState newState) throws HiveSQLException {
state.validateTransition(newState);
this.state = newState;
+ setMetrics(state);
this.lastAccessTime = System.currentTimeMillis();
return this.state;
}
@@ -330,4 +337,40 @@ protected HiveSQLException toSQLException(String prefix, CommandProcessorRespons
}
return ex;
}
+
+ //list of operation states to measure duration of.
+ protected static Set scopeStates = Sets.immutableEnumSet(
+ OperationState.INITIALIZED,
+ OperationState.PENDING,
+ OperationState.RUNNING
+ );
+
+ //list of termainal operation states. We measure only completed counts for operations in these states.
+ protected static Set terminalStates = Sets.immutableEnumSet(
+ OperationState.CLOSED,
+ OperationState.CANCELED,
+ OperationState.FINISHED,
+ OperationState.ERROR,
+ OperationState.UNKNOWN
+ );
+
+ protected void setMetrics(OperationState state) {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ try {
+ if (currentStateScope != null) {
+ metrics.endScope(currentStateScope);
+ currentStateScope = null;
+ }
+ if (scopeStates.contains(state)) {
+ currentStateScope = metrics.createScope(MetricsConstant.OPERATION_PREFIX + state.toString());
+ }
+ if (terminalStates.contains(state)) {
+ metrics.incrementCounter(MetricsConstant.COMPLETED_OPERATION_PREFIX + state.toString());
+ }
+ } catch (IOException e) {
+ LOG.warn("Error metrics", e);
+ }
+ }
+ }
}
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index a9b4334..d11cf3d 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -25,13 +25,19 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -108,8 +114,9 @@ private void createBackgroundOperationPool() {
// Threads terminate when they are idle for more than the keepAliveTime
// A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
String threadPoolName = "HiveServer2-Background-Pool";
+ final BlockingQueue queue = new LinkedBlockingQueue(poolQueueSize);
backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
- keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(poolQueueSize),
+ keepAliveTime, TimeUnit.SECONDS, queue,
new ThreadFactoryWithGarbageCleanup(threadPoolName));
backgroundOperationPool.allowCoreThreadTimeOut(true);
@@ -119,6 +126,22 @@ private void createBackgroundOperationPool() {
hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
checkOperation = HiveConf.getBoolVar(hiveConf,
ConfVars.HIVE_SERVER2_IDLE_SESSION_CHECK_OPERATION);
+
+ Metrics m = MetricsFactory.getInstance();
+ if (m != null) {
+ m.addGauge(MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, new MetricsVariable() {
+ @Override
+ public Object getValue() {
+ return queue.size();
+ }
+ });
+ m.addGauge(MetricsConstant.EXEC_ASYNC_POOL_SIZE, new MetricsVariable() {
+ @Override
+ public Object getValue() {
+ return backgroundOperationPool.getPoolSize();
+ }
+ });
+ }
}
private void initOperationLogRootDir() {
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
new file mode 100644
index 0000000..aaeecbe
--- /dev/null
+++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.session;
+
+import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.server.HiveServer2;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+
+/**
+ * Test metrics from SessionManager.
+ */
+public class TestSessionManagerMetrics {
+
+ private static SessionManager sm;
+ private static File jsonReportFile;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 2);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE, 10);
+ conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, "1000000s");
+
+ File workDir = new File(System.getProperty("test.tmp.dir"));
+ jsonReportFile = new File(workDir, "json_reporting");
+ jsonReportFile.delete();
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms");
+ MetricsFactory.init(conf);
+
+ HiveServer2 hs2 = new HiveServer2();
+ sm = new SessionManager(hs2);
+ sm.init(conf);
+ }
+
+ final Object barrier = new Object();
+
+ class BarrierRunnable implements Runnable {
+ @Override
+ public void run() {
+ synchronized (barrier) {
+ try {
+ barrier.wait();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Tests metrics regarding async thread pool.
+ */
+ @Test
+ public void testThreadPoolMetrics() throws Exception {
+
+ sm.submitBackgroundOperation(new BarrierRunnable());
+ sm.submitBackgroundOperation(new BarrierRunnable());
+ sm.submitBackgroundOperation(new BarrierRunnable());
+ sm.submitBackgroundOperation(new BarrierRunnable());
+
+ Thread.sleep(2000);
+
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_POOL_SIZE, 2);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, 2);
+
+ synchronized (barrier) {
+ barrier.notifyAll();
+ }
+ Thread.sleep(2000);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_POOL_SIZE, 2);
+ MetricsTestUtils.verifyMetricFile(jsonReportFile, MetricsTestUtils.GAUGE, MetricsConstant.EXEC_ASYNC_QUEUE_SIZE, 0);
+ }
+}