diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0a81259..5efa98a 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -539,6 +539,18 @@
HIVE_INDEX_COMPACT_QUERY_MAX_SIZE("hive.index.compact.query.max.size", (long) 10 * 1024 * 1024 * 1024), // 10G
HIVE_INDEX_COMPACT_BINARY_SEARCH("hive.index.compact.binary.search", true),
+ //Profiler
+ HIVEPROFILERDBCLASS("hive.profiler.dbclass","jdbc:derby"),
+ HIVEPROFILERJDBCDRIVER("hive.profiler.jdbcdriver", "org.apache.derby.jdbc.EmbeddedDriver"),
+ HIVEPROFILERDBCONNECTIONSTRING("hive.profiler.dbconnectionstring",
+ "jdbc:derby:;databaseName=TempProfilerStore;create=true"), // automatically create database
+ // default timeout for JDBC connection
+ HIVE_PROFILER_JDBC_TIMEOUT("hive.profiler.jdbc.timeout", 30),
+ HIVE_PROFILER_RETRIES_MAX("hive.stats.retries.max",
+ 0), // maximum # of retries to insert/select/delete the stats DB
+ HIVE_PROFILER_RETRIES_WAIT("hive.stats.retries.wait",
+ 3000), // # milliseconds to wait before the next retry
+
// Statistics
HIVESTATSAUTOGATHER("hive.stats.autogather", true),
HIVESTATSDBCLASS("hive.stats.dbclass",
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 08c5b72..a005f39 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1067,6 +1067,42 @@
+ hive.profiler.dbclass
+ jdbc:derby
+ The default database that stores temporary hive statistics.
+
+
+
+ hive.profiler.jdbcdriver
+ org.apache.derby.jdbc.EmbeddedDriver
+ The JDBC driver for the database that stores temporary hive statistics.
+
+
+
+ hive.profiler.dbconnectionstring
+ jdbc:derby:;databaseName=TempStatsStore;create=true
+ The default connection string for the database that stores temporary hive statistics.
+
+
+
+ hive.profiler.jdbc.timeout
+ 30
+ Timeout value (number of seconds) used by JDBC connection and statements.
+
+
+
+ hive.profiler.retries.max
+ 0
+ Maximum number of retries when profiler publisher/aggregator got an exception updating intermediate database. Default is no tries on failures.
+
+
+
+ hive.profiler.retries.wait
+ 3000
+ The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by baseWindow * failues + baseWindow * (failure + 1) * (random number between [0.0,1.0]).
+
+
+
hive.stats.dbclass
jdbc:derby
The default database that stores temporary hive statistics.
diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java
new file mode 100644
index 0000000..e74724d
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.profiler;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Map;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+public class HiveProfilePublisher {
+ private final Log LOG = LogFactory.getLog(this.getClass().getName());
+ private boolean connected = false;
+ private HiveProfilePublisherInfo info;
+
+
+ public boolean closeConnection() {
+
+ if (info.getConnection() == null) {
+ return true;
+ }
+ try {
+ if (info.getInsertStatement() != null) {
+ info.closeInsertStatement();
+ }
+ info.getConnection().close();
+ return true;
+ } catch (SQLException e) {
+ LOG.error("Error during JDBC termination. ", e);
+ return false;
+ }
+ }
+
+ public boolean initialize(Configuration conf) {
+ try {
+ info = new HiveProfilePublisherInfo(conf);
+ String createTable = getCreate();
+ LOG.info(createTable);
+ HiveProfilerUtils.createTableIfNonExistent(info, createTable);
+ info.prepareInsert();
+ } catch (Exception e) {
+ LOG.error("Error during HiveProfilePublisher initialization", e);
+ return false;
+ }
+ return true;
+ }
+
+ private String getCreate() {
+ return "CREATE TABLE " + info.getTableName() +
+ " ( " +
+ HiveProfilerStats.Columns.QUERY_ID + " VARCHAR(512) NOT NULL, " +
+ HiveProfilerStats.Columns.TASK_ID + " VARCHAR(512) NOT NULL, " +
+ HiveProfilerStats.Columns.OPERATOR_ID + " INT, " +
+ HiveProfilerStats.Columns.OPERATOR_NAME + " VARCHAR(512) NOT NULL, " +
+ HiveProfilerStats.Columns.PARENT_OPERATOR_ID + " INT," +
+ HiveProfilerStats.Columns.PARENT_OPERATOR_NAME + " VARCHAR(512), " +
+ HiveProfilerStats.Columns.LEVEL_ANNO_NAME + " VARCHAR(512), " +
+ HiveProfilerStats.Columns.CALL_COUNT + " BIGINT, " +
+ HiveProfilerStats.Columns.INCL_TIME + " BIGINT ) ";
+
+ }
+
+ public boolean publishStat(String queryId, Map stats,
+ Configuration conf) {
+ if (info.getConnection() == null) {
+ if(!initialize(conf)) {
+ return false;
+ }
+ } else {
+ try {
+ if(stats == null || stats.isEmpty()) {
+ return true;
+ }
+ Utilities.SQLCommand execUpdate = new Utilities.SQLCommand() {
+ @Override
+ public Void run(PreparedStatement stmt) throws SQLException {
+ stmt.executeUpdate();
+ return null;
+ }
+ };
+ PreparedStatement insStmt = info.getInsert(stats);
+ Utilities.executeWithRetry(execUpdate, insStmt, info.getWaitWindow(), info.getMaxRetries());
+ } catch (SQLException e) {
+ LOG.error("ERROR during publishing profiling data. ", e);
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisherInfo.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisherInfo.java
new file mode 100644
index 0000000..0b6baa0
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisherInfo.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.profiler;
+
+import java.util.Map;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Connection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+public class HiveProfilePublisherInfo implements HiveProfilerConnectionInfo {
+ final private Log LOG = LogFactory.getLog(this.getClass().getName());
+ private String statsDbClass;
+ private final String tableName = "PROFILER_STATS";
+ private int maxRetries, waitWindow, timeout;
+ private Connection conn;
+ private String connectionString;
+ private PreparedStatement insStmt;
+
+ public String getDbClass() { return statsDbClass; }
+
+ public int getTimeout() { return timeout; }
+
+ public int getMaxRetries() { return maxRetries; }
+
+ public int getWaitWindow() { return waitWindow; }
+
+ public String getConnectionString() { return connectionString; }
+
+ public String getTableName() { return tableName; }
+
+ public Connection getConnection() { return conn; }
+
+ protected PreparedStatement getInsertStatement() {
+ return insStmt;
+ }
+
+ private String getInsert() {
+ String colNames = "";
+ String val = "";
+ int numCols = HiveProfilerStats.COLUMN_NAMES.length;
+ for (int i = 0; i < numCols; i++) {
+ colNames += HiveProfilerStats.COLUMN_NAMES[i];
+ val += "?";
+
+ if (i < numCols - 1) {
+ colNames += ",";
+ val += ",";
+ }
+ }
+ return "INSERT INTO " + tableName + " (" + colNames + ") VALUES (" + val + ")";
+ }
+
+ public HiveProfilePublisherInfo (Configuration conf) throws Exception{
+ maxRetries = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_PROFILER_RETRIES_MAX);
+ waitWindow = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_PROFILER_RETRIES_WAIT);
+ connectionString = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPROFILERDBCONNECTIONSTRING);
+ timeout = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_PROFILER_JDBC_TIMEOUT);
+ String driver = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPROFILERJDBCDRIVER);
+ statsDbClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPROFILERDBCLASS);
+ try {
+ Class.forName(driver).newInstance();
+ } catch (Exception e) {
+ LOG.error("Error during instantiating JDBC driver " + driver + ". ", e);
+ }
+ DriverManager.setLoginTimeout(timeout); // stats is non-blocking
+ conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries);
+ }
+
+ protected void prepareInsert() throws SQLException {
+ insStmt = Utilities.prepareWithRetry(conn, getInsert(), waitWindow, maxRetries);
+ }
+ protected void closeInsertStatement() throws SQLException {
+ insStmt.close();
+ }
+
+ protected PreparedStatement getInsert(Map stats) throws SQLException {
+ for (int i = 0; i < HiveProfilerStats.COLUMN_NAMES.length; i++) {
+ insStmt.setString(i + 1, stats.get(HiveProfilerStats.COLUMN_NAMES[i]));
+ }
+ return insStmt;
+ }
+
+}
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java
new file mode 100644
index 0000000..f187635
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.profiler;
+
+import java.lang.System;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorHook;
+import org.apache.hadoop.hive.ql.exec.OperatorHookContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public class HiveProfiler implements OperatorHook {
+ private final Log LOG = LogFactory.getLog(this.getClass().getName());
+ private static final HiveProfilePublisher pub = new HiveProfilePublisher();
+
+ private LinkedList operatorCallStack =
+ new LinkedList();
+
+ // Aggregates stats for each operator in memory so that stats are written to DB
+ // all at once - this allows the profiler to be extremely lightweight in
+ // communication with the DB
+ private Map aggrStats =
+ new HashMap();
+
+ public void enter(OperatorHookContext opHookContext) throws HiveException {
+ HiveProfilerEntry curEntry = new HiveProfilerEntry(opHookContext);
+ operatorCallStack.addFirst(curEntry);
+ }
+
+ private void exit(HiveProfilerEntry curEntry, HiveProfilerEntry parentEntry) {
+ OperatorHookContext opHookContext = curEntry.getOperatorHookContext();
+
+ // update the metrics we are
+ long exitTime = System.nanoTime();
+ long wallTime = exitTime - curEntry.wallStartTime;
+
+ String opName = opHookContext.getOperatorName();
+
+ OperatorHookContext parentContext =
+ parentEntry != null ? parentEntry.getOperatorHookContext() :
+ null;
+ Configuration conf = opHookContext.getOperator().getConfiguration();
+
+ String opId = opHookContext.getOperatorId();
+ if (aggrStats.containsKey(opId)) {
+ aggrStats.get(opId).updateStats(wallTime, 1);
+ } else {
+ HiveProfilerStats stats =
+ new HiveProfilerStats(opHookContext, parentContext, 1, wallTime, conf);
+ aggrStats.put(opId, stats);
+ }
+
+ }
+ public void exit(OperatorHookContext opHookContext) throws HiveException {
+ if (operatorCallStack.isEmpty()) {
+ LOG.error("Unexpected state: Operator Call Stack is empty on exit.");
+ }
+
+ // grab the top item on the call stack since that should be
+ // the first operator to exit.
+ HiveProfilerEntry curEntry = operatorCallStack.poll();
+ if (!curEntry.getOperatorHookContext().equals(opHookContext)) {
+ LOG.error("Expected to exit from: " + curEntry.getOperatorHookContext().toString() +
+ " but exit called on " + opHookContext.toString());
+ }
+ HiveProfilerEntry parentEntry = operatorCallStack.peekFirst();
+ exit(curEntry, parentEntry);
+ }
+
+ public void close(OperatorHookContext opHookContext) {
+ Configuration conf = opHookContext.getOperator().getConfiguration();
+
+ Collection stats = aggrStats.values();
+ // example:
+ // queryId=pamelavagata_20130115163838_4a1cb4ae-43c1-4656-bfae-118557896eec,
+ // operatorName=TS,
+ // id=3,
+ // parentName="" (root),
+ // inclTime=1202710
+ // callCount
+
+ Iterator statsIter = stats.iterator();
+ while (statsIter.hasNext()) {
+ HiveProfilerStats stat = statsIter.next();
+ pub.initialize(conf);
+ boolean published = pub.publishStat(null, stat.getStatsMap(), conf);
+ LOG.info((published ? "did " : "did not ") + "publish stat for: " + stat.toString());
+ pub.closeConnection();
+ }
+ stats.clear();
+
+ }
+
+ private class HiveProfilerEntry {
+ OperatorHookContext ctxt;
+ protected long wallStartTime;
+
+ protected HiveProfilerEntry(OperatorHookContext opHookContext) {
+ this.ctxt = opHookContext;
+ this.wallStartTime = System.nanoTime();
+ }
+
+ protected OperatorHookContext getOperatorHookContext() {
+ return ctxt;
+ }
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerAggregateStat.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerAggregateStat.java
new file mode 100644
index 0000000..151ea4a
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerAggregateStat.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.profiler;
+
+public class HiveProfilerAggregateStat {
+ long wallTime;
+ long callCount;
+ public HiveProfilerAggregateStat(long wallTime, long callCount){
+ this.wallTime = wallTime;
+ this.callCount = callCount;
+ }
+
+ public long getCallCount() {
+ return callCount;
+ }
+
+ public long getWallTime() {
+ return wallTime;
+ }
+
+ /*
+ * @param wallTime: inclusive walltime in microseconds
+ */
+ public void update(long wallTime, long callCount) {
+ this.wallTime += wallTime;
+ this.callCount += callCount;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerConnectionInfo.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerConnectionInfo.java
new file mode 100644
index 0000000..cd8f46d
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerConnectionInfo.java
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hive.ql.profiler;
+
+import java.sql.Connection;
+public interface HiveProfilerConnectionInfo {
+ public String getDbClass();
+ public int getTimeout();
+ public int getMaxRetries();
+ public int getWaitWindow();
+ public String getConnectionString();
+ public String getTableName();
+ public Connection getConnection();
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java
new file mode 100644
index 0000000..8b820c7
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.profiler;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.OperatorHookContext;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+public class HiveProfilerStats {
+ public final class Columns {
+ public static final String QUERY_ID = "queryId";
+ public static final String OPERATOR_NAME = "operatorName";
+ public static final String OPERATOR_ID = "operatorId";
+ public static final String PARENT_OPERATOR_ID = "parentOperatorId";
+ public static final String PARENT_OPERATOR_NAME = "parentOperatorName";
+ public static final String EXCL_TIME = "exclTime";
+ public static final String LEVEL_ANNO_NAME = "levelAnnotatedName";
+ public static final String INCL_TIME = "inclTime";
+ public static final String CALL_COUNT = "callCount";
+ public static final String TASK_ID = "taskId";
+ }
+
+ public static final String[] COLUMN_NAMES= new String[] {
+ Columns.QUERY_ID,
+ Columns.TASK_ID,
+ Columns.OPERATOR_NAME,
+ Columns.OPERATOR_ID,
+ Columns.PARENT_OPERATOR_ID,
+ Columns.PARENT_OPERATOR_NAME,
+ Columns.LEVEL_ANNO_NAME,
+ Columns.INCL_TIME,
+ Columns.CALL_COUNT
+ };
+
+ private Map stats = new HashMap();
+
+ long callCount;
+ long inclTime;
+ String taskId;
+
+ protected HiveProfilerStats(
+ OperatorHookContext opHookContext,
+ OperatorHookContext parentOpHookContext,
+ long callCount, long wallTime, Configuration conf) {
+ this.callCount = callCount;
+ this.inclTime = wallTime;
+ this.taskId = Utilities.getTaskId(conf);
+ populateStatsMap(opHookContext, parentOpHookContext, conf);
+ }
+
+ private void populateStatsMap(OperatorHookContext opHookContext,
+ OperatorHookContext parentOpHookContext,
+ Configuration conf) {
+ String queryId =
+ conf == null ? "no conf" : HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
+ stats.put(Columns.QUERY_ID, queryId);
+ String opName = opHookContext.getOperatorName();
+ stats.put(
+ Columns.OPERATOR_NAME, opName);
+ stats.put(
+ Columns.OPERATOR_ID, opHookContext.getOperatorId());
+
+ String parentOpName = parentOpHookContext == null ? "" : parentOpHookContext.getOperatorName();
+ stats.put(Columns.PARENT_OPERATOR_NAME, parentOpName);
+
+
+ String parentOpId = parentOpHookContext == null ? "-1" : parentOpHookContext.getOperatorId();
+ stats.put(Columns.PARENT_OPERATOR_ID, parentOpId);
+
+ String levelAnnoOpName = opName + "_" + opHookContext.getOperatorId();
+ String levelAnnoName = parentOpHookContext == null ? "main() ==> " + levelAnnoOpName :
+ parentOpName + "_" + parentOpId + " ==> " + levelAnnoOpName;
+ stats.put(Columns.LEVEL_ANNO_NAME, levelAnnoName);
+
+ }
+
+ public void updateStats(long wallTime, long count) {
+ this.inclTime += wallTime;
+ this.callCount += count;
+ }
+
+ public Map getStatsMap() {
+ stats.put(Columns.TASK_ID, taskId);
+ stats.put(Columns.INCL_TIME, String.valueOf(inclTime));
+ stats.put(Columns.CALL_COUNT, String.valueOf(callCount));
+ return stats;
+ }
+
+ @Override
+ public String toString() {
+ return stats.toString();
+ }
+}
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java
new file mode 100644
index 0000000..b4382f1
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.profiler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+public class HiveProfilerStatsAggregator {
+ final private Log LOG = LogFactory.getLog(this.getClass().getName());
+ private long totalTime;
+ private HiveProfilePublisherInfo rawProfileConnInfo;
+
+ private Map stats =
+ new HashMap();
+
+ public HiveProfilerStatsAggregator(HiveConf conf) {
+ try {
+ // initialize the raw data connection
+ rawProfileConnInfo = new HiveProfilePublisherInfo(conf);
+ populateAggregateStats(conf);
+ } catch (Exception e) {
+
+ LOG.error("Error during initialization", e);
+ }
+ }
+
+ public long getTotalTime() {
+ return totalTime;
+ }
+
+ public Map getAggregateStats() {
+ return stats;
+ }
+
+
+ private void populateAggregateStats(HiveConf conf) {
+ int waitWindow = rawProfileConnInfo.getWaitWindow();
+ int maxRetries = rawProfileConnInfo.getMaxRetries();
+
+ String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
+ String profilerStatsTable = rawProfileConnInfo.getTableName();
+ String getProfileStats =
+ "SELECT * FROM " + profilerStatsTable + " WHERE queryId = ? ";
+ Utilities.SQLCommand execQuery = new Utilities.SQLCommand() {
+ @Override
+ public ResultSet run(PreparedStatement stmt) throws SQLException {
+ return stmt.executeQuery();
+ }
+ };
+
+ try {
+ PreparedStatement getProfileStatsStmt =
+ Utilities.prepareWithRetry(rawProfileConnInfo.getConnection(),
+ getProfileStats, waitWindow, maxRetries);
+ getProfileStatsStmt.setString(1, queryId);
+ ResultSet result = Utilities.executeWithRetry(execQuery, getProfileStatsStmt,
+ waitWindow, maxRetries);
+
+ populateAggregateStats(result);
+ getProfileStatsStmt.close();
+ rawProfileConnInfo.getConnection().close();
+ } catch(Exception e) {
+ LOG.error("executing error: ", e);
+ }
+ }
+
+ private void populateAggregateStats(ResultSet result) {
+ try {
+ while(result.next()){
+ // string denoting parent==>child
+ // example:SEL_2==>GBY_1
+ String levelAnnoName = result.getString(HiveProfilerStats.Columns.LEVEL_ANNO_NAME);
+ // Microseconds
+ Long curInclTime = result.getLong(HiveProfilerStats.Columns.INCL_TIME) / 1000;
+ Long curCallCount = result.getLong(HiveProfilerStats.Columns.CALL_COUNT);
+ totalTime += curInclTime;
+ if(curInclTime != null && curCallCount != null) {
+ HiveProfilerAggregateStat curStat;
+ if (stats.containsKey(levelAnnoName)) {
+ curStat = stats.get(levelAnnoName);
+ curStat.update(curInclTime, curCallCount);
+ } else {
+ curStat = new HiveProfilerAggregateStat(curInclTime, curCallCount);
+ }
+ stats.put(levelAnnoName, curStat);
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Error Aggregating Stats", e);
+ }
+ }
+
+}
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java
new file mode 100644
index 0000000..42984d9
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java
@@ -0,0 +1,60 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hive.ql.profiler;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class HiveProfilerUtils {
+ public static void createTableIfNonExistent(HiveProfilerConnectionInfo info,
+ String createTable) throws Exception {
+ Connection conn = info.getConnection();
+ Statement stmt = conn.createStatement();
+ stmt.setQueryTimeout(info.getTimeout());
+ DatabaseMetaData dbm = conn.getMetaData();
+ ResultSet rs = dbm.getTables(null, null, info.getTableName(), null);
+ boolean tblExists = rs.next();
+ if(!tblExists) {
+ stmt.executeUpdate(createTable);
+ stmt.close();
+ }
+ }
+
+ public static boolean closeConnection(HiveProfilerConnectionInfo info) throws SQLException{
+ info.getConnection().close();
+ // In case of derby, explicitly shutdown the database otherwise it reports error when
+ // trying to connect to the same JDBC connection string again.
+ if (info.getDbClass().equalsIgnoreCase("jdbc:derby")) {
+ try {
+ // The following closes the derby connection. It throws an exception that has to be caught
+ // and ignored.
+ DriverManager.getConnection(info.getConnectionString() + ";shutdown=true");
+ } catch (Exception e) {
+ // Do nothing because we know that an exception is thrown anyway.
+ }
+ }
+ return true;
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/hooks/HiveProfilerResultsHook.java ql/src/test/org/apache/hadoop/hive/ql/hooks/HiveProfilerResultsHook.java
new file mode 100644
index 0000000..a3df0cd
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/hooks/HiveProfilerResultsHook.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.sql.Connection;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.profiler.HiveProfilerStatsAggregator;
+import org.apache.hadoop.hive.ql.profiler.HiveProfilerAggregateStat;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import java.sql.DriverManager;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+public class HiveProfilerResultsHook implements ExecuteWithHookContext {
+ final private Log LOG = LogFactory.getLog(this.getClass().getName());
+ @Override
+ public void run (HookContext hookContext) {
+ SessionState sess = SessionState.get();
+ HiveConf conf = sess.getConf();
+ HiveProfilerStatsAggregator aggr = new HiveProfilerStatsAggregator(conf);
+ Map stats = aggr.getAggregateStats();
+ for (String name: stats.keySet()){
+ SessionState.getConsole().printError(name + ": " + stats.get(name).getCallCount());
+ }
+ }
+}
diff --git ql/src/test/queries/clientpositive/hiveprofiler0.q ql/src/test/queries/clientpositive/hiveprofiler0.q
new file mode 100644
index 0000000..d19595d
--- /dev/null
+++ ql/src/test/queries/clientpositive/hiveprofiler0.q
@@ -0,0 +1,6 @@
+set hive.exec.operator.hooks=org.apache.hadoop.hive.ql.profiler.HiveProfiler;
+set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.HiveProfilerResultsHook;
+SET hive.exec.mode.local.auto=false;
+SET hive.task.progress=true;
+
+select count(1) from src;
diff --git ql/src/test/results/clientpositive/hiveprofiler0.q.out ql/src/test/results/clientpositive/hiveprofiler0.q.out
new file mode 100644
index 0000000..4decef2
--- /dev/null
+++ ql/src/test/results/clientpositive/hiveprofiler0.q.out
@@ -0,0 +1,12 @@
+PREHOOK: query: select count(1) from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+TS_0 ==> SEL_1: 500
+main() ==> RS_3: 1
+SEL_1 ==> GBY_2: 500
+main() ==> SEL_5: 1
+main() ==> GBY_4: 1
+main() ==> TS_0: 500
+SEL_5 ==> FS_6: 1
+500