diff --git common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java index 9dc96f9..c9d4087 100644 --- common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java +++ common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java @@ -61,4 +61,11 @@ // The number of Hive operations that are waiting to enter the compile block public static final String WAITING_COMPILE_OPS = "waiting_compile_ops"; + // The number of map reduce tasks executed by the HiveServer2 since the last restart + public static final String HIVE_MR_TASKS = "hive_mapred_tasks"; + // The number of spark tasks executed by the HiveServer2 since the last restart + public static final String HIVE_SPARK_TASKS = "hive_spark_tasks"; + // The number of tez tasks executed by the HiveServer2 since the last restart + public static final String HIVE_TEZ_TASKS = "hive_tez_tasks"; + } \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 183ed82..f5519b7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1662,6 +1662,11 @@ public int execute() throws CommandNeedRetryException { // incorrect results. assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty(); driverCxt.addToRunnable(tsk); + + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + tsk.updateTaskMetrics(metrics); + } } perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index eeaa543..e1bd291 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -27,6 +27,8 @@ import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -534,6 +536,13 @@ protected void cloneConf() { } } + /** + * Provide metrics on the type and number of tasks executed by the HiveServer + * @param metrics + */ + public void updateTaskMetrics(Metrics metrics) { + // no metrics gathered by default + } public int getTaskTag() { return taskTag; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index ce1106d9..f48d511 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CachingPrintStream; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; @@ -371,6 +373,15 @@ public boolean reduceDone() { return runningViaChild ? done() : b; } + @Override + public void updateTaskMetrics(Metrics metrics) { + try { + metrics.incrementCounter(MetricsConstant.HIVE_MR_TASKS); + } catch (IOException ex) { + LOG.warn("Could not increment metrics for " + this, ex); + } + } + /** * Set the number of reducers for the mapred work. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index ac922ce..ff5dd17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -42,6 +42,8 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CachingPrintStream; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -122,6 +124,15 @@ public void setExecContext(ExecMapperContext execContext) { } @Override + public void updateTaskMetrics(Metrics metrics) { + try { + metrics.incrementCounter(MetricsConstant.HIVE_MR_TASKS); + } catch (IOException ex) { + LOG.warn("Could not increment metrics for " + this, ex); + } + } + + @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext opContext) { super.initialize(queryState, queryPlan, driverContext, opContext); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 0b494aa..72c8bf7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -180,6 +182,15 @@ private int close(int rc) { } @Override + public void updateTaskMetrics(Metrics metrics) { + try { + metrics.incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + } catch (IOException ex) { + LOG.warn("Could not increment metrics for " + this, ex); + } + } + + @Override public boolean isMapRedTask() { return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 25c4514..c51c92f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -35,6 +35,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; @@ -529,6 +531,15 @@ private void closeDagClientWithoutEx(){ } @Override + public void updateTaskMetrics(Metrics metrics) { + try { + metrics.incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + } catch (IOException ex) { + LOG.warn("Could not increment metrics for " + this, ex); + } + } + + @Override public boolean isMapRedTask() { return true; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapRedTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapRedTask.java new file mode 100644 index 0000000..5ec7c0d --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapRedTask.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mr; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestMapRedTask { + + @Test + public void mrTask_updates_Metrics() throws IOException { + + Metrics mockMetrics = Mockito.mock(Metrics.class); + + MapRedTask mapRedTask = new MapRedTask(); + mapRedTask.updateTaskMetrics(mockMetrics); + + verify(mockMetrics, times(1)).incrementCounter(MetricsConstant.HIVE_MR_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + } + +} diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapredLocalTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapredLocalTask.java new file mode 100644 index 0000000..4a0fafe --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/mr/TestMapredLocalTask.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mr; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestMapredLocalTask { + + @Test + public void localMRTask_updates_Metrics() throws IOException { + + Metrics mockMetrics = Mockito.mock(Metrics.class); + + MapredLocalTask localMrTask = new MapredLocalTask(); + localMrTask.updateTaskMetrics(mockMetrics); + + verify(mockMetrics, times(1)).incrementCounter(MetricsConstant.HIVE_MR_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + } + +} diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java new file mode 100644 index 0000000..4c7ec76 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestSparkTask { + + @Test + public void sparkTask_updates_Metrics() throws IOException { + + Metrics mockMetrics = Mockito.mock(Metrics.class); + + SparkTask sparkTask = new SparkTask(); + sparkTask.updateTaskMetrics(mockMetrics); + + verify(mockMetrics, times(1)).incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_MR_TASKS); + } + +} diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 53672a9..5c012f3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; import java.io.IOException; import java.util.ArrayList; @@ -39,6 +40,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Operator; @@ -67,6 +70,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -326,4 +330,17 @@ public void testParseRightmostXmx() throws Exception { heapSize = DagUtils.parseRightmostXmx(javaOpts); assertEquals("Unexpected maximum heap size", -1, heapSize); } + + @Test + public void tezTask_updates_Metrics() throws IOException { + + Metrics mockMetrics = Mockito.mock(Metrics.class); + + TezTask tezTask = new TezTask(); + tezTask.updateTaskMetrics(mockMetrics); + + verify(mockMetrics, times(1)).incrementCounter(MetricsConstant.HIVE_TEZ_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_SPARK_TASKS); + verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_MR_TASKS); + } }