diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 7f6e359..0eddbce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -523,7 +523,7 @@ public void process(Object row, int tag) throws HiveException { if (fatalError) { return; } - OperatorHookContext opHookContext = new OperatorHookContext(this, row); + OperatorHookContext opHookContext = new OperatorHookContext(this, row, tag); preProcessCounter(); enterOperatorHooks(opHookContext); processOp(row, tag); @@ -610,7 +610,7 @@ public void close(boolean abort) throws HiveException { LOG.info(id + " forwarded " + cntr + " rows"); - closeOperatorHooks(new OperatorHookContext(this, null)); + closeOperatorHooks(new OperatorHookContext(this)); // call the operator specific close routine closeOp(abort); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java index bb40f29..fd9d07c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java @@ -18,26 +18,42 @@ package org.apache.hadoop.hive.ql.exec; +import java.util.List; + public class OperatorHookContext { - private String operatorName; - private String operatorId; - private Object currentRow; + private final String operatorName; + private final String operatorId; + private final Object currentRow; + private final int parentTag; private Operator operator; - public OperatorHookContext(Operator op, Object row) { - this(op.getName(), op.getIdentifier(), row); + public OperatorHookContext(Operator op) { + this(op, null, -1); + } + public OperatorHookContext(Operator op, Object row, int tag) { + this(op.getName(), op.getIdentifier(), row, tag); this.operator = op; } - private OperatorHookContext(String opName, String opId, Object row) { + private OperatorHookContext(String opName, String opId, Object row, int tag) { operatorName = opName; operatorId = opId; currentRow = row; + parentTag = tag; } public Operator getOperator() { return operator; } + public Operator getParentOperator() { + List parents= this.operator.getParentOperators(); + if (parents == null || parents.isEmpty()) { + return null; + } + return (Operator)(this.operator.getParentOperators().get(this.parentTag)); + + } + public String getOperatorName() { return operatorName; } diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java index d87e8d3..6c46bac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilePublisher.java @@ -45,7 +45,7 @@ public boolean closeConnection() { } info.getConnection().close(); return true; - } catch (SQLException e) { + } catch (Exception e) { LOG.error("Error during JDBC termination. ", e); return false; } @@ -100,7 +100,7 @@ public Void run(PreparedStatement stmt) throws SQLException { }; PreparedStatement insStmt = info.getInsert(stats); Utilities.executeWithRetry(execUpdate, insStmt, info.getWaitWindow(), info.getMaxRetries()); - } catch (SQLException e) { + } catch (Exception e) { LOG.error("ERROR during publishing profiling data. ", e); return false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java index f187635..e5274c8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfiler.java @@ -17,18 +17,14 @@ */ package org.apache.hadoop.hive.ql.profiler; -import java.lang.System; -import java.util.LinkedList; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Iterator; import java.util.Collection; -import java.util.List; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.OperatorHook; import org.apache.hadoop.hive.ql.exec.OperatorHookContext; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -37,41 +33,43 @@ private final Log LOG = LogFactory.getLog(this.getClass().getName()); private static final HiveProfilePublisher pub = new HiveProfilePublisher(); - private LinkedList operatorCallStack = - new LinkedList(); + private final Map operatorCallStack = + new ConcurrentHashMap(); // Aggregates stats for each operator in memory so that stats are written to DB // all at once - this allows the profiler to be extremely lightweight in // communication with the DB - private Map aggrStats = - new HashMap(); + private final Map aggrStats = + new ConcurrentHashMap(); public void enter(OperatorHookContext opHookContext) throws HiveException { + String opLevelAnnoName = HiveProfilerUtils.getLevelAnnotatedName(opHookContext); HiveProfilerEntry curEntry = new HiveProfilerEntry(opHookContext); - operatorCallStack.addFirst(curEntry); + if (operatorCallStack.get(opLevelAnnoName) != null) { + LOG.error(opLevelAnnoName + " is already in the dictionary"); + throw new HiveException(); + } + operatorCallStack.put(opLevelAnnoName, curEntry); } - private void exit(HiveProfilerEntry curEntry, HiveProfilerEntry parentEntry) { + private void exit(HiveProfilerEntry curEntry) { OperatorHookContext opHookContext = curEntry.getOperatorHookContext(); - // update the metrics we are long exitTime = System.nanoTime(); long wallTime = exitTime - curEntry.wallStartTime; String opName = opHookContext.getOperatorName(); - OperatorHookContext parentContext = - parentEntry != null ? parentEntry.getOperatorHookContext() : - null; Configuration conf = opHookContext.getOperator().getConfiguration(); - String opId = opHookContext.getOperatorId(); - if (aggrStats.containsKey(opId)) { - aggrStats.get(opId).updateStats(wallTime, 1); + String opLevelAnnoName = HiveProfilerUtils.getLevelAnnotatedName(opHookContext); + + if (aggrStats.containsKey(opLevelAnnoName)) { + aggrStats.get(opLevelAnnoName).updateStats(wallTime, 1); } else { HiveProfilerStats stats = - new HiveProfilerStats(opHookContext, parentContext, 1, wallTime, conf); - aggrStats.put(opId, stats); + new HiveProfilerStats(opHookContext, 1, wallTime, conf); + aggrStats.put(opLevelAnnoName, stats); } } @@ -79,16 +77,17 @@ public void exit(OperatorHookContext opHookContext) throws HiveException { if (operatorCallStack.isEmpty()) { LOG.error("Unexpected state: Operator Call Stack is empty on exit."); } + String opLevelAnnoName = HiveProfilerUtils.getLevelAnnotatedName(opHookContext); + + HiveProfilerEntry curEntry = operatorCallStack.get(opLevelAnnoName); - // grab the top item on the call stack since that should be - // the first operator to exit. - HiveProfilerEntry curEntry = operatorCallStack.poll(); if (!curEntry.getOperatorHookContext().equals(opHookContext)) { LOG.error("Expected to exit from: " + curEntry.getOperatorHookContext().toString() + " but exit called on " + opHookContext.toString()); } - HiveProfilerEntry parentEntry = operatorCallStack.peekFirst(); - exit(curEntry, parentEntry); + + exit(curEntry); + operatorCallStack.remove(opLevelAnnoName); } public void close(OperatorHookContext opHookContext) { diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java index 8b820c7..e4ee89c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStats.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorHookContext; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -50,7 +51,7 @@ Columns.CALL_COUNT }; - private Map stats = new HashMap(); + private final Map stats = new HashMap(); long callCount; long inclTime; @@ -58,16 +59,14 @@ protected HiveProfilerStats( OperatorHookContext opHookContext, - OperatorHookContext parentOpHookContext, long callCount, long wallTime, Configuration conf) { this.callCount = callCount; this.inclTime = wallTime; this.taskId = Utilities.getTaskId(conf); - populateStatsMap(opHookContext, parentOpHookContext, conf); + populateStatsMap(opHookContext, conf); } private void populateStatsMap(OperatorHookContext opHookContext, - OperatorHookContext parentOpHookContext, Configuration conf) { String queryId = conf == null ? "no conf" : HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); @@ -78,17 +77,14 @@ private void populateStatsMap(OperatorHookContext opHookContext, stats.put( Columns.OPERATOR_ID, opHookContext.getOperatorId()); - String parentOpName = parentOpHookContext == null ? "" : parentOpHookContext.getOperatorName(); + Operator parent = opHookContext.getParentOperator(); + String parentOpName = parent == null ? "" : parent.getName(); stats.put(Columns.PARENT_OPERATOR_NAME, parentOpName); - - String parentOpId = parentOpHookContext == null ? "-1" : parentOpHookContext.getOperatorId(); + String parentOpId = parent == null ? "-1" : parent.getIdentifier(); stats.put(Columns.PARENT_OPERATOR_ID, parentOpId); - String levelAnnoOpName = opName + "_" + opHookContext.getOperatorId(); - String levelAnnoName = parentOpHookContext == null ? "main() ==> " + levelAnnoOpName : - parentOpName + "_" + parentOpId + " ==> " + levelAnnoOpName; - stats.put(Columns.LEVEL_ANNO_NAME, levelAnnoName); + stats.put(Columns.LEVEL_ANNO_NAME, HiveProfilerUtils.getLevelAnnotatedName(opHookContext)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java index b4382f1..d640056 100644 --- ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerStatsAggregator.java @@ -110,7 +110,7 @@ private void populateAggregateStats(ResultSet result) { stats.put(levelAnnoName, curStat); } } - } catch (SQLException e) { + } catch (Exception e) { LOG.error("Error Aggregating Stats", e); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java index 42984d9..ddd35ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/profiler/HiveProfilerUtils.java @@ -18,14 +18,14 @@ package org.apache.hadoop.hive.ql.profiler; import java.sql.Connection; -import java.sql.Statement; import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorHookContext; public class HiveProfilerUtils { public static void createTableIfNonExistent(HiveProfilerConnectionInfo info, @@ -57,4 +57,13 @@ public static boolean closeConnection(HiveProfilerConnectionInfo info) throws SQ } return true; } + + public static String getLevelAnnotatedName(OperatorHookContext opHookContext) { + Operator parent = opHookContext.getParentOperator(); + Operator op = opHookContext.getOperator(); + String parentOpName = parent == null ? "" : parent.getName(); + String parentOpId = parent == null ? "main()" : parent.getOperatorId(); + String levelAnnoName = parentOpId + " ==> " + op.getOperatorId(); + return levelAnnoName; + } } diff --git ql/src/test/results/clientpositive/hiveprofiler0.q.out ql/src/test/results/clientpositive/hiveprofiler0.q.out index 4decef2..414a1cc 100644 --- ql/src/test/results/clientpositive/hiveprofiler0.q.out +++ ql/src/test/results/clientpositive/hiveprofiler0.q.out @@ -2,11 +2,11 @@ PREHOOK: query: select count(1) from src PREHOOK: type: QUERY PREHOOK: Input: default@src #### A masked pattern was here #### +GBY_2 ==> RS_3: 1 TS_0 ==> SEL_1: 500 -main() ==> RS_3: 1 +null ==> TS_0: 500 SEL_1 ==> GBY_2: 500 -main() ==> SEL_5: 1 +GBY_4 ==> SEL_5: 1 main() ==> GBY_4: 1 -main() ==> TS_0: 500 SEL_5 ==> FS_6: 1 500