diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 53e6b28..c39c46b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -44,6 +45,8 @@ public class MapJoinOperator extends AbstractMapJoinOperator implements Serializable { private static final long serialVersionUID = 1L; private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName()); + private static final String CLASS_NAME = MapJoinOperator.class.getName(); + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private transient String tableKey; private transient String serdeKey; @@ -148,11 +151,12 @@ private void loadHashTable() throws HiveException { hashTblInitedOnce = true; } } - + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.load(this.getExecContext(), hconf, this.getConf(), posBigTable, mapJoinTables, mapJoinTableSerdes); cache.cache(tableKey, mapJoinTables); cache.cache(serdeKey, mapJoinTableSerdes); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); } // Load the hash table diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index f643c0f..1e27ef1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.Writable; @@ -62,6 +63,7 @@ @Override void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, OutputCollector out){ + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, mrReporter, inputs, out); //Update JobConf using MRInput, info like filename comes via this @@ -121,6 +123,7 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in throw new RuntimeException("Map operator initialization failed", e); } } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } private MRInput getMRInput(Map inputs) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index 70be9dd..e9dc16d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -50,6 +51,8 @@ private long numRows = 0; private long nextUpdateCntr = 1; + protected PerfLogger perfLogger = PerfLogger.getPerfLogger(); + protected String CLASS_NAME = RecordProcessor.class.getName(); /** @@ -84,7 +87,6 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in } catch (Exception e) { l4j.info("cannot get classpath: " + e.getMessage()); } - } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index e1b8321..5db7e52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -88,6 +89,7 @@ @Override void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, OutputCollector out){ + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, mrReporter, inputs, out); ObjectCache cache = ObjectCacheFactory.getCache(jconf); @@ -166,7 +168,7 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in reducer.setReporter(reporter); MapredContext.get().setReporter(reporter); - + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 9391ad7..aef8f0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -18,38 +18,37 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.apache.tez.dag.api.client.DAGStatus.State.*; +import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tez.client.TezClient; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; /** - * TezJobMonitor keeps track of a tez job while it's being executed. It will - * print status to the console and retrieve final status of the job after + * TezJobMonitor keeps track of a tez job while it's being executed. It will + * print status to the console and retrieve final status of the job after * completion. */ public class TezJobMonitor { - - static final private Log LOG = LogFactory.getLog(TezJobMonitor.class.getName()); + + private static final Log LOG = LogFactory.getLog(TezJobMonitor.class.getName()); + private static final String CLASS_NAME = TezJobMonitor.class.getName(); private transient LogHelper console; + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + private Set completed; public TezJobMonitor() { console = new LogHelper(LOG); @@ -64,6 +63,7 @@ public TezJobMonitor() { */ public int monitorExecution(DAGClient dagClient) throws InterruptedException { DAGStatus status = null; + completed = new HashSet(); boolean running = false; boolean done = false; @@ -75,14 +75,16 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { int rc = 0; DAGStatus.State lastState = null; String lastReport = null; - + console.printInfo("\n"); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); while(true) { ++counter; try { - status = dagClient.getDAGStatus(); + status = dagClient.getDAGStatus(); Map progressMap = status.getVertexProgress(); failedCounter = 0; DAGStatus.State state = status.getState(); @@ -99,11 +101,12 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { break; case RUNNING: if (!running) { + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); console.printInfo("Status: Running\n"); printTaskNumbers(progressMap, console); running = true; } - + if (counter % printInterval/checkInterval == 0) { lastReport = printStatus(progressMap, lastReport, console); } @@ -155,6 +158,7 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { } Thread.sleep(500); } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); return rc; } @@ -165,6 +169,10 @@ private String printStatus(Map progressMap, String lastReport, for (String s: keys) { Progress progress = progressMap.get(s); int percentComplete = (int) (100 * progress.getSucceededTaskCount() / (float) progress.getTotalTaskCount()); + if (percentComplete == 100 && !completed.contains(s)) { + completed.add(s); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } reportBuffer.append(String.format("%s: %3d%% complete\t", s, percentComplete)); } @@ -178,9 +186,10 @@ private String printStatus(Map progressMap, String lastReport, private void printTaskNumbers(Map progressMap, LogHelper console) { StringBuffer reportBuffer = new StringBuffer(); - + SortedSet keys = new TreeSet(progressMap.keySet()); for (String s: keys) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); Progress progress = progressMap.get(s); int numTasks = progress.getTotalTaskCount(); if (numTasks == 1) { @@ -189,7 +198,7 @@ private void printTaskNumbers(Map progressMap, LogHelper conso reportBuffer.append(String.format("%s: %7d tasks\t", s, numTasks)); } } - + String report = reportBuffer.toString(); console.printInfo(report); console.printInfo(""); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index d119d70..90826db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.tez.common.TezUtils; @@ -46,6 +47,9 @@ private JobConf jobConf; + private static final String CLASS_NAME = TezProcessor.class.getName(); + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + private TezProcessorContext processorContext; public TezProcessor(boolean isMap) { @@ -67,16 +71,19 @@ public void handleEvents(List arg0) { @Override public void initialize(TezProcessorContext processorContext) throws IOException { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); this.processorContext = processorContext; //get the jobconf byte[] userPayload = processorContext.getUserPayload(); Configuration conf = TezUtils.createConfFromUserPayload(userPayload); this.jobConf = new JobConf(conf); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } @Override public void run(Map inputs, Map outputs) throws Exception { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); // in case of broadcast-join read the broadcast edge inputs // (possibly asynchronously) @@ -106,6 +113,7 @@ public void run(Map inputs, Map out rproc.run(); //done - output does not need to be committed as hive does not use outputcommitter + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index e8a35ed..c82d794 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; @@ -57,6 +58,9 @@ @SuppressWarnings({"serial", "deprecation"}) public class TezTask extends Task { + private static final String CLASS_NAME = TezTask.class.getName(); + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + public TezTask() { super(); } @@ -85,6 +89,7 @@ public int execute(DriverContext driverContext) { if (!session.isOpen()) { // can happen if the user sets the tez flag after the session was // established + LOG.info("Tez session hasn't been created yet. Opening session"); session.open(ss.getSessionId(), conf); } @@ -138,6 +143,7 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, LocalResource appJarLr, Context ctx) throws Exception { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG); Map workToVertex = new HashMap(); Map workToConf = new HashMap(); @@ -162,10 +168,12 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, boolean isFinal = work.getLeaves().contains(w); // translate work to vertex + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); JobConf wxConf = DagUtils.initializeVertexConf(conf, w); Vertex wx = DagUtils.createVertex(wxConf, w, tezDir, appJarLr, additionalLr, fs, ctx, !isFinal); dag.addVertex(wx); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); workToVertex.put(w, wx); workToConf.put(w, wxConf); @@ -180,7 +188,7 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, dag.addEdge(e); } } - + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG); return dag; } @@ -188,9 +196,12 @@ private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr, TezSession session) throws IOException, TezException, InterruptedException { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); + // ready to start execution on the cluster DAGClient dagClient = session.submitDAG(dag); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); return dagClient; } diff --git ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index e0261fe..f4e8ff9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -53,6 +53,17 @@ public static final String FAILURE_HOOK = "FailureHook."; public static final String DRIVER_RUN = "Driver.run"; public static final String TIME_TO_SUBMIT = "TimeToSubmit"; + public static final String TEZ_SUBMIT_TO_RUNNING = "TezSubmitToRunningDag"; + public static final String TEZ_BUILD_DAG = "TezBuildDag"; + public static final String TEZ_SUBMIT_DAG = "TezSubmitDag"; + public static final String TEZ_RUN_DAG = "TezRunDag"; + public static final String TEZ_CREATE_VERTEX = "TezCreateVertex."; + public static final String TEZ_RUN_VERTEX = "TezRunVertex."; + public static final String TEZ_INITIALIZE_PROCESSOR = "TezInitializeProcessor"; + public static final String TEZ_RUN_PROCESSOR = "TezRunProcessor"; + public static final String TEZ_INIT_OPERATORS = "TezInitializeOperators"; + public static final String LOAD_HASHTABLE = "LoadHashtable"; + protected static final ThreadLocal perfLogger = new ThreadLocal();