diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0a81259..5efa98a 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -539,6 +539,18 @@ HIVE_INDEX_COMPACT_QUERY_MAX_SIZE("hive.index.compact.query.max.size", (long) 10 * 1024 * 1024 * 1024), // 10G HIVE_INDEX_COMPACT_BINARY_SEARCH("hive.index.compact.binary.search", true), + //Profiler + HIVEPROFILERDBCLASS("hive.profiler.dbclass","jdbc:derby"), + HIVEPROFILERJDBCDRIVER("hive.profiler.jdbcdriver", "org.apache.derby.jdbc.EmbeddedDriver"), + HIVEPROFILERDBCONNECTIONSTRING("hive.profiler.dbconnectionstring", + "jdbc:derby:;databaseName=TempProfilerStore;create=true"), // automatically create database + // default timeout for JDBC connection + HIVE_PROFILER_JDBC_TIMEOUT("hive.profiler.jdbc.timeout", 30), + HIVE_PROFILER_RETRIES_MAX("hive.stats.retries.max", + 0), // maximum # of retries to insert/select/delete the stats DB + HIVE_PROFILER_RETRIES_WAIT("hive.stats.retries.wait", + 3000), // # milliseconds to wait before the next retry + // Statistics HIVESTATSAUTOGATHER("hive.stats.autogather", true), HIVESTATSDBCLASS("hive.stats.dbclass", diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 08c5b72..a005f39 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1067,6 +1067,42 @@ + hive.profiler.dbclass + jdbc:derby + The default database that stores temporary hive statistics. + + + + hive.profiler.jdbcdriver + org.apache.derby.jdbc.EmbeddedDriver + The JDBC driver for the database that stores temporary hive statistics. + + + + hive.profiler.dbconnectionstring + jdbc:derby:;databaseName=TempStatsStore;create=true + The default connection string for the database that stores temporary hive statistics. + + + + hive.profiler.jdbc.timeout + 30 + Timeout value (number of seconds) used by JDBC connection and statements. + + + + hive.profiler.retries.max + 0 + Maximum number of retries when profiler publisher/aggregator got an exception updating intermediate database. Default is no tries on failures. + + + + hive.profiler.retries.wait + 3000 + The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by baseWindow * failues + baseWindow * (failure + 1) * (random number between [0.0,1.0]). + + + hive.stats.dbclass jdbc:derby The default database that stores temporary hive statistics. diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java new file mode 100644 index 0000000..e74724d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.profiler; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Map; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; + +public class HiveProfilePublisher { + private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private boolean connected = false; + private HiveProfilePublisherInfo info; + + + public boolean closeConnection() { + + if (info.getConnection() == null) { + return true; + } + try { + if (info.getInsertStatement() != null) { + info.closeInsertStatement(); + } + info.getConnection().close(); + return true; + } catch (SQLException e) { + LOG.error("Error during JDBC termination. ", e); + return false; + } + } + + public boolean initialize(Configuration conf) { + try { + info = new HiveProfilePublisherInfo(conf); + String createTable = getCreate(); + LOG.info(createTable); + HiveProfilerUtils.createTableIfNonExistent(info, createTable); + info.prepareInsert(); + } catch (Exception e) { + LOG.error("Error during HiveProfilePublisher initialization", e); + return false; + } + return true; + } + + private String getCreate() { + return "CREATE TABLE " + info.getTableName() + + " ( " + + HiveProfilerStats.Columns.QUERY_ID + " VARCHAR(512) NOT NULL, " + + HiveProfilerStats.Columns.TASK_ID + " VARCHAR(512) NOT NULL, " + + HiveProfilerStats.Columns.OPERATOR_ID + " INT, " + + HiveProfilerStats.Columns.OPERATOR_NAME + " VARCHAR(512) NOT NULL, " + + HiveProfilerStats.Columns.PARENT_OPERATOR_ID + " INT," + + HiveProfilerStats.Columns.PARENT_OPERATOR_NAME + " VARCHAR(512), " + + HiveProfilerStats.Columns.LEVEL_ANNO_NAME + " VARCHAR(512), " + + HiveProfilerStats.Columns.CALL_COUNT + " BIGINT, " + + HiveProfilerStats.Columns.INCL_TIME + " BIGINT ) "; + + } + + public boolean publishStat(String queryId, Map stats, + Configuration conf) { + if (info.getConnection() == null) { + if(!initialize(conf)) { + return false; + } + } else { + try { + if(stats == null || stats.isEmpty()) { + return true; + } + Utilities.SQLCommand execUpdate = new Utilities.SQLCommand() { + @Override + public Void run(PreparedStatement stmt) throws SQLException { + stmt.executeUpdate(); + return null; + } + }; + PreparedStatement insStmt = info.getInsert(stats); + Utilities.executeWithRetry(execUpdate, insStmt, info.getWaitWindow(), info.getMaxRetries()); + } catch (SQLException e) { + LOG.error("ERROR during publishing profiling data. ", e); + return false; + } + } + + return true; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisherInfo.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisherInfo.java new file mode 100644 index 0000000..0b6baa0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisherInfo.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.profiler; + +import java.util.Map; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Connection; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; + +public class HiveProfilePublisherInfo implements HiveProfilerConnectionInfo { + final private Log LOG = LogFactory.getLog(this.getClass().getName()); + private String statsDbClass; + private final String tableName = "PROFILER_STATS"; + private int maxRetries, waitWindow, timeout; + private Connection conn; + private String connectionString; + private PreparedStatement insStmt; + + public String getDbClass() { return statsDbClass; } + + public int getTimeout() { return timeout; } + + public int getMaxRetries() { return maxRetries; } + + public int getWaitWindow() { return waitWindow; } + + public String getConnectionString() { return connectionString; } + + public String getTableName() { return tableName; } + + public Connection getConnection() { return conn; } + + protected PreparedStatement getInsertStatement() { + return insStmt; + } + + private String getInsert() { + String colNames = ""; + String val = ""; + int numCols = HiveProfilerStats.COLUMN_NAMES.length; + for (int i = 0; i < numCols; i++) { + colNames += HiveProfilerStats.COLUMN_NAMES[i]; + val += "?"; + + if (i < numCols - 1) { + colNames += ","; + val += ","; + } + } + return "INSERT INTO " + tableName + " (" + colNames + ") VALUES (" + val + ")"; + } + + public HiveProfilePublisherInfo (Configuration conf) throws Exception{ + maxRetries = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_PROFILER_RETRIES_MAX); + waitWindow = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_PROFILER_RETRIES_WAIT); + connectionString = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPROFILERDBCONNECTIONSTRING); + timeout = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_PROFILER_JDBC_TIMEOUT); + String driver = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPROFILERJDBCDRIVER); + statsDbClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPROFILERDBCLASS); + try { + Class.forName(driver).newInstance(); + } catch (Exception e) { + LOG.error("Error during instantiating JDBC driver " + driver + ". ", e); + } + DriverManager.setLoginTimeout(timeout); // stats is non-blocking + conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries); + } + + protected void prepareInsert() throws SQLException { + insStmt = Utilities.prepareWithRetry(conn, getInsert(), waitWindow, maxRetries); + } + protected void closeInsertStatement() throws SQLException { + insStmt.close(); + } + + protected PreparedStatement getInsert(Map stats) throws SQLException { + for (int i = 0; i < HiveProfilerStats.COLUMN_NAMES.length; i++) { + insStmt.setString(i + 1, stats.get(HiveProfilerStats.COLUMN_NAMES[i])); + } + return insStmt; + } + +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java new file mode 100644 index 0000000..f187635 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.profiler; + +import java.lang.System; +import java.util.LinkedList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Iterator; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.OperatorHook; +import org.apache.hadoop.hive.ql.exec.OperatorHookContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public class HiveProfiler implements OperatorHook { + private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private static final HiveProfilePublisher pub = new HiveProfilePublisher(); + + private LinkedList operatorCallStack = + new LinkedList(); + + // Aggregates stats for each operator in memory so that stats are written to DB + // all at once - this allows the profiler to be extremely lightweight in + // communication with the DB + private Map aggrStats = + new HashMap(); + + public void enter(OperatorHookContext opHookContext) throws HiveException { + HiveProfilerEntry curEntry = new HiveProfilerEntry(opHookContext); + operatorCallStack.addFirst(curEntry); + } + + private void exit(HiveProfilerEntry curEntry, HiveProfilerEntry parentEntry) { + OperatorHookContext opHookContext = curEntry.getOperatorHookContext(); + + // update the metrics we are + long exitTime = System.nanoTime(); + long wallTime = exitTime - curEntry.wallStartTime; + + String opName = opHookContext.getOperatorName(); + + OperatorHookContext parentContext = + parentEntry != null ? parentEntry.getOperatorHookContext() : + null; + Configuration conf = opHookContext.getOperator().getConfiguration(); + + String opId = opHookContext.getOperatorId(); + if (aggrStats.containsKey(opId)) { + aggrStats.get(opId).updateStats(wallTime, 1); + } else { + HiveProfilerStats stats = + new HiveProfilerStats(opHookContext, parentContext, 1, wallTime, conf); + aggrStats.put(opId, stats); + } + + } + public void exit(OperatorHookContext opHookContext) throws HiveException { + if (operatorCallStack.isEmpty()) { + LOG.error("Unexpected state: Operator Call Stack is empty on exit."); + } + + // grab the top item on the call stack since that should be + // the first operator to exit. + HiveProfilerEntry curEntry = operatorCallStack.poll(); + if (!curEntry.getOperatorHookContext().equals(opHookContext)) { + LOG.error("Expected to exit from: " + curEntry.getOperatorHookContext().toString() + + " but exit called on " + opHookContext.toString()); + } + HiveProfilerEntry parentEntry = operatorCallStack.peekFirst(); + exit(curEntry, parentEntry); + } + + public void close(OperatorHookContext opHookContext) { + Configuration conf = opHookContext.getOperator().getConfiguration(); + + Collection stats = aggrStats.values(); + // example: + // queryId=pamelavagata_20130115163838_4a1cb4ae-43c1-4656-bfae-118557896eec, + // operatorName=TS, + // id=3, + // parentName="" (root), + // inclTime=1202710 + // callCount + + Iterator statsIter = stats.iterator(); + while (statsIter.hasNext()) { + HiveProfilerStats stat = statsIter.next(); + pub.initialize(conf); + boolean published = pub.publishStat(null, stat.getStatsMap(), conf); + LOG.info((published ? "did " : "did not ") + "publish stat for: " + stat.toString()); + pub.closeConnection(); + } + stats.clear(); + + } + + private class HiveProfilerEntry { + OperatorHookContext ctxt; + protected long wallStartTime; + + protected HiveProfilerEntry(OperatorHookContext opHookContext) { + this.ctxt = opHookContext; + this.wallStartTime = System.nanoTime(); + } + + protected OperatorHookContext getOperatorHookContext() { + return ctxt; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerAggregateStat.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerAggregateStat.java new file mode 100644 index 0000000..151ea4a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerAggregateStat.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.profiler; + +public class HiveProfilerAggregateStat { + long wallTime; + long callCount; + public HiveProfilerAggregateStat(long wallTime, long callCount){ + this.wallTime = wallTime; + this.callCount = callCount; + } + + public long getCallCount() { + return callCount; + } + + public long getWallTime() { + return wallTime; + } + + /* + * @param wallTime: inclusive walltime in microseconds + */ + public void update(long wallTime, long callCount) { + this.wallTime += wallTime; + this.callCount += callCount; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerConnectionInfo.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerConnectionInfo.java new file mode 100644 index 0000000..cd8f46d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerConnectionInfo.java @@ -0,0 +1,29 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hive.ql.profiler; + +import java.sql.Connection; +public interface HiveProfilerConnectionInfo { + public String getDbClass(); + public int getTimeout(); + public int getMaxRetries(); + public int getWaitWindow(); + public String getConnectionString(); + public String getTableName(); + public Connection getConnection(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java new file mode 100644 index 0000000..8b820c7 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.profiler; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.OperatorHookContext; +import org.apache.hadoop.hive.ql.exec.Utilities; + +public class HiveProfilerStats { + public final class Columns { + public static final String QUERY_ID = "queryId"; + public static final String OPERATOR_NAME = "operatorName"; + public static final String OPERATOR_ID = "operatorId"; + public static final String PARENT_OPERATOR_ID = "parentOperatorId"; + public static final String PARENT_OPERATOR_NAME = "parentOperatorName"; + public static final String EXCL_TIME = "exclTime"; + public static final String LEVEL_ANNO_NAME = "levelAnnotatedName"; + public static final String INCL_TIME = "inclTime"; + public static final String CALL_COUNT = "callCount"; + public static final String TASK_ID = "taskId"; + } + + public static final String[] COLUMN_NAMES= new String[] { + Columns.QUERY_ID, + Columns.TASK_ID, + Columns.OPERATOR_NAME, + Columns.OPERATOR_ID, + Columns.PARENT_OPERATOR_ID, + Columns.PARENT_OPERATOR_NAME, + Columns.LEVEL_ANNO_NAME, + Columns.INCL_TIME, + Columns.CALL_COUNT + }; + + private Map stats = new HashMap(); + + long callCount; + long inclTime; + String taskId; + + protected HiveProfilerStats( + OperatorHookContext opHookContext, + OperatorHookContext parentOpHookContext, + long callCount, long wallTime, Configuration conf) { + this.callCount = callCount; + this.inclTime = wallTime; + this.taskId = Utilities.getTaskId(conf); + populateStatsMap(opHookContext, parentOpHookContext, conf); + } + + private void populateStatsMap(OperatorHookContext opHookContext, + OperatorHookContext parentOpHookContext, + Configuration conf) { + String queryId = + conf == null ? "no conf" : HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); + stats.put(Columns.QUERY_ID, queryId); + String opName = opHookContext.getOperatorName(); + stats.put( + Columns.OPERATOR_NAME, opName); + stats.put( + Columns.OPERATOR_ID, opHookContext.getOperatorId()); + + String parentOpName = parentOpHookContext == null ? "" : parentOpHookContext.getOperatorName(); + stats.put(Columns.PARENT_OPERATOR_NAME, parentOpName); + + + String parentOpId = parentOpHookContext == null ? "-1" : parentOpHookContext.getOperatorId(); + stats.put(Columns.PARENT_OPERATOR_ID, parentOpId); + + String levelAnnoOpName = opName + "_" + opHookContext.getOperatorId(); + String levelAnnoName = parentOpHookContext == null ? "main() ==> " + levelAnnoOpName : + parentOpName + "_" + parentOpId + " ==> " + levelAnnoOpName; + stats.put(Columns.LEVEL_ANNO_NAME, levelAnnoName); + + } + + public void updateStats(long wallTime, long count) { + this.inclTime += wallTime; + this.callCount += count; + } + + public Map getStatsMap() { + stats.put(Columns.TASK_ID, taskId); + stats.put(Columns.INCL_TIME, String.valueOf(inclTime)); + stats.put(Columns.CALL_COUNT, String.valueOf(callCount)); + return stats; + } + + @Override + public String toString() { + return stats.toString(); + } +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java new file mode 100644 index 0000000..b4382f1 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.profiler; + +import java.util.HashMap; +import java.util.Map; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; + +public class HiveProfilerStatsAggregator { + final private Log LOG = LogFactory.getLog(this.getClass().getName()); + private long totalTime; + private HiveProfilePublisherInfo rawProfileConnInfo; + + private Map stats = + new HashMap(); + + public HiveProfilerStatsAggregator(HiveConf conf) { + try { + // initialize the raw data connection + rawProfileConnInfo = new HiveProfilePublisherInfo(conf); + populateAggregateStats(conf); + } catch (Exception e) { + + LOG.error("Error during initialization", e); + } + } + + public long getTotalTime() { + return totalTime; + } + + public Map getAggregateStats() { + return stats; + } + + + private void populateAggregateStats(HiveConf conf) { + int waitWindow = rawProfileConnInfo.getWaitWindow(); + int maxRetries = rawProfileConnInfo.getMaxRetries(); + + String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); + String profilerStatsTable = rawProfileConnInfo.getTableName(); + String getProfileStats = + "SELECT * FROM " + profilerStatsTable + " WHERE queryId = ? "; + Utilities.SQLCommand execQuery = new Utilities.SQLCommand() { + @Override + public ResultSet run(PreparedStatement stmt) throws SQLException { + return stmt.executeQuery(); + } + }; + + try { + PreparedStatement getProfileStatsStmt = + Utilities.prepareWithRetry(rawProfileConnInfo.getConnection(), + getProfileStats, waitWindow, maxRetries); + getProfileStatsStmt.setString(1, queryId); + ResultSet result = Utilities.executeWithRetry(execQuery, getProfileStatsStmt, + waitWindow, maxRetries); + + populateAggregateStats(result); + getProfileStatsStmt.close(); + rawProfileConnInfo.getConnection().close(); + } catch(Exception e) { + LOG.error("executing error: ", e); + } + } + + private void populateAggregateStats(ResultSet result) { + try { + while(result.next()){ + // string denoting parent==>child + // example:SEL_2==>GBY_1 + String levelAnnoName = result.getString(HiveProfilerStats.Columns.LEVEL_ANNO_NAME); + // Microseconds + Long curInclTime = result.getLong(HiveProfilerStats.Columns.INCL_TIME) / 1000; + Long curCallCount = result.getLong(HiveProfilerStats.Columns.CALL_COUNT); + totalTime += curInclTime; + if(curInclTime != null && curCallCount != null) { + HiveProfilerAggregateStat curStat; + if (stats.containsKey(levelAnnoName)) { + curStat = stats.get(levelAnnoName); + curStat.update(curInclTime, curCallCount); + } else { + curStat = new HiveProfilerAggregateStat(curInclTime, curCallCount); + } + stats.put(levelAnnoName, curStat); + } + } + } catch (SQLException e) { + LOG.error("Error Aggregating Stats", e); + } + } + +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java new file mode 100644 index 0000000..42984d9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java @@ -0,0 +1,60 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hive.ql.profiler; + +import java.sql.Connection; +import java.sql.Statement; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class HiveProfilerUtils { + public static void createTableIfNonExistent(HiveProfilerConnectionInfo info, + String createTable) throws Exception { + Connection conn = info.getConnection(); + Statement stmt = conn.createStatement(); + stmt.setQueryTimeout(info.getTimeout()); + DatabaseMetaData dbm = conn.getMetaData(); + ResultSet rs = dbm.getTables(null, null, info.getTableName(), null); + boolean tblExists = rs.next(); + if(!tblExists) { + stmt.executeUpdate(createTable); + stmt.close(); + } + } + + public static boolean closeConnection(HiveProfilerConnectionInfo info) throws SQLException{ + info.getConnection().close(); + // In case of derby, explicitly shutdown the database otherwise it reports error when + // trying to connect to the same JDBC connection string again. + if (info.getDbClass().equalsIgnoreCase("jdbc:derby")) { + try { + // The following closes the derby connection. It throws an exception that has to be caught + // and ignored. + DriverManager.getConnection(info.getConnectionString() + ";shutdown=true"); + } catch (Exception e) { + // Do nothing because we know that an exception is thrown anyway. + } + } + return true; + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/hooks/HiveProfilerResultsHook.java ql/src/test/org/apache/hadoop/hive/ql/hooks/HiveProfilerResultsHook.java new file mode 100644 index 0000000..a3df0cd --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/HiveProfilerResultsHook.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.hooks; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.sql.Connection; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.profiler.HiveProfilerStatsAggregator; +import org.apache.hadoop.hive.ql.profiler.HiveProfilerAggregateStat; +import org.apache.hadoop.hive.ql.session.SessionState; +import java.sql.DriverManager; +import org.apache.hadoop.hive.ql.exec.Utilities; + +public class HiveProfilerResultsHook implements ExecuteWithHookContext { + final private Log LOG = LogFactory.getLog(this.getClass().getName()); + @Override + public void run (HookContext hookContext) { + SessionState sess = SessionState.get(); + HiveConf conf = sess.getConf(); + HiveProfilerStatsAggregator aggr = new HiveProfilerStatsAggregator(conf); + Map stats = aggr.getAggregateStats(); + for (String name: stats.keySet()){ + SessionState.getConsole().printError(name + ": " + stats.get(name).getCallCount()); + } + } +} diff --git ql/src/test/queries/clientpositive/hiveprofiler0.q ql/src/test/queries/clientpositive/hiveprofiler0.q new file mode 100644 index 0000000..d19595d --- /dev/null +++ ql/src/test/queries/clientpositive/hiveprofiler0.q @@ -0,0 +1,6 @@ +set hive.exec.operator.hooks=org.apache.hadoop.hive.ql.profiler.HiveProfiler; +set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.HiveProfilerResultsHook; +SET hive.exec.mode.local.auto=false; +SET hive.task.progress=true; + +select count(1) from src; diff --git ql/src/test/results/clientpositive/hiveprofiler0.q.out ql/src/test/results/clientpositive/hiveprofiler0.q.out new file mode 100644 index 0000000..4decef2 --- /dev/null +++ ql/src/test/results/clientpositive/hiveprofiler0.q.out @@ -0,0 +1,12 @@ +PREHOOK: query: select count(1) from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +TS_0 ==> SEL_1: 500 +main() ==> RS_3: 1 +SEL_1 ==> GBY_2: 500 +main() ==> SEL_5: 1 +main() ==> GBY_4: 1 +main() ==> TS_0: 500 +SEL_5 ==> FS_6: 1 +500