Index: ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (revision 1138748) +++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (working copy) @@ -19,11 +19,12 @@ package org.apache.hadoop.hive.ql; import java.io.Serializable; +import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; import java.util.Queue; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.mapred.JobConf; /** * DriverContext. @@ -38,14 +39,16 @@ Context ctx; + List mapRedStatsList; + public DriverContext() { - this.runnable = null; - this.ctx = null; + this(null, null); } public DriverContext(Queue> runnable, Context ctx) { this.runnable = runnable; this.ctx = ctx; + this.mapRedStatsList = new ArrayList(); } public Queue> getRunnable() { @@ -82,5 +85,13 @@ public void incCurJobNo(int amount) { this.curJobNo = this.curJobNo + amount; } - + + public List getMapRedStatsList() { + return mapRedStatsList; + } + + public void setMapRedStatsList(List mapRedStatsList) { + this.mapRedStatsList = mapRedStatsList; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 1138748) +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -110,6 +111,8 @@ private CreateTableAutomaticGrant createTableGrants; + private List lastMapRedStatsList; + /** * Lineage state. */ @@ -667,4 +670,12 @@ public void setCreateTableGrants(CreateTableAutomaticGrant createTableGrants) { this.createTableGrants = createTableGrants; } + + public List getLastMapRedStatsList() { + return lastMapRedStatsList; + } + + public void setLastMapRedStatsList(List lastMapRedStatsList) { + this.lastMapRedStatsList = lastMapRedStatsList; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1138748) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -30,6 +30,8 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; @@ -43,12 +45,13 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapred.Counters.Counter; public class HadoopJobExecHelper { - + protected transient JobConf job; protected Task task; - + protected transient int mapProgress = 0; protected transient int reduceProgress = 0; public transient String jobId; @@ -69,10 +72,10 @@ return; } if(callBackObj != null) { - callBackObj.updateCounters(ctrs, rj); + callBackObj.updateCounters(ctrs, rj); } } - + /** * This msg pattern is used to track when a job is started. * @@ -113,7 +116,7 @@ return reduceProgress == 100; } - + public String getJobId() { return jobId; } @@ -122,10 +125,10 @@ this.jobId = jobId; } - + public HadoopJobExecHelper() { } - + public HadoopJobExecHelper(JobConf job, LogHelper console, Task task, HadoopJobExecHook hookCallBack) { this.job = job; @@ -134,7 +137,7 @@ this.callBackObj = hookCallBack; } - + /** * A list of the currently running jobs spawned in this Hive instance that is used to kill all * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are @@ -143,7 +146,7 @@ public static Map runningJobKillURIs = Collections .synchronizedMap(new HashMap()); - + /** * In Hive, when the user control-c's the command line, any running jobs spawned from that command * line are best-effort killed. @@ -200,12 +203,13 @@ } return this.callBackObj.checkFatalErrors(ctrs, errMsg); } - - private boolean progress(ExecDriverTaskHandle th) throws IOException { + + private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { JobClient jc = th.getJobClient(); RunningJob rj = th.getRunningJob(); String lastReport = ""; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); + //DecimalFormat longFormatter = new DecimalFormat("###,###"); long reportTime = System.currentTimeMillis(); long maxReportInterval = 60 * 1000; // One minute boolean fatal = false; @@ -213,6 +217,10 @@ long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL); boolean initializing = true; boolean initOutputPrinted = false; + long cpuMsec = -1; + int numMap = -1; + int numReduce = -1; + while (!rj.isComplete()) { try { Thread.sleep(pullInterval); @@ -233,24 +241,24 @@ String logMapper; String logReducer; - + TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID()); if (mappers == null) { logMapper = "no information for number of mappers; "; } else { - int numMap = mappers.length; + numMap = mappers.length; if (ss != null) { ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(), Keys.TASK_NUM_MAPPERS, Integer.toString(numMap)); } logMapper = "number of mappers: " + numMap + "; "; } - + TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID()); if (reducers == null) { logReducer = "no information for number of reducers. "; } else { - int numReduce = reducers.length; + numReduce = reducers.length; if (ss != null) { ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(), Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce)); @@ -295,8 +303,20 @@ String report = " " + getId() + " map = " + mapProgress + "%, reduce = " + reduceProgress + "%"; + if (!report.equals(lastReport) || System.currentTimeMillis() >= reportTime + maxReportInterval) { + // find out CPU msecs + Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "CPU_MILLISECONDS"); + if (counterCpuMsec != null) { + long newCpuMSec = counterCpuMsec.getValue(); + if (newCpuMSec > 0) { + cpuMsec = newCpuMSec; + report += ", Cumulative CPU " + + (cpuMsec / 1000D) + " sec"; + } + } // write out serialized plan with counters to log file // LOG.info(queryPlan); @@ -315,8 +335,21 @@ } } + if (cpuMsec > 0) { + console.printInfo("MapReduce Total cumulative CPU time: " + + Utilities.formatMsecToStr(cpuMsec)); + } + boolean success; Counters ctrs = th.getCounters(); + Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "CPU_MILLISECONDS"); + if (counterCpuMsec != null) { + long newCpuMSec = counterCpuMsec.getValue(); + if (newCpuMSec > cpuMsec) { + cpuMsec = newCpuMSec; + } + } if (fatal) { success = false; @@ -331,6 +364,8 @@ } } + MapRedStats mapRedStats = new MapRedStats(numMap, numReduce, cpuMsec, success); + this.task.setDone(); // update based on the final value of the counters updateCounters(ctrs, rj); @@ -340,9 +375,9 @@ this.callBackObj.logPlanProgress(ss); } // LOG.info(queryPlan); - return (success); + return mapRedStats; } - + private String getId() { return this.task.getId(); } @@ -396,7 +431,7 @@ return rj.getCounters(); } } - + // Used for showJobFailDebugInfo private static class TaskInfo { String jobId; @@ -419,7 +454,7 @@ return jobId; } } - + @SuppressWarnings("deprecation") private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException { // Mapping from task ID to the number of failures @@ -547,11 +582,11 @@ } - public int progress(RunningJob rj, JobClient jc) throws IOException { + public int progress(RunningJob rj, JobClient jc, DriverContext ctx) throws IOException { jobId = rj.getJobID(); - + int returnVal = 0; - + // remove the pwd from conf file so that job tracker doesn't show this // logs String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD); @@ -570,7 +605,9 @@ ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); jobInfo(rj); - boolean success = progress(th); + MapRedStats mapRedStats = progress(th); + ctx.getMapRedStatsList().add(mapRedStats); + boolean success = mapRedStats.isSuccess(); String statusMesg = getJobEndMsg(rj.getJobID()); if (!success) { @@ -583,7 +620,7 @@ } else { console.printInfo(statusMesg); } - + return returnVal; } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1138748) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -108,6 +108,7 @@ this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } + @Override public boolean requireLock() { return true; } @@ -412,7 +413,7 @@ HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd); } - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, driverContext); success = (returnVal == 0); } catch (Exception e) { e.printStackTrace(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1138748) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -117,8 +117,8 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; @@ -133,8 +133,8 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.FileOutputFormat; @@ -2243,4 +2243,44 @@ } return sb.toString(); } + + /** + * Format number of milliseconds to strings + * + * @param msec milliseconds + * @return a formatted string like "x days y hours z minutes a seconds b msec" + */ + public static String formatMsecToStr(long msec) { + long day = -1, hour = -1, minute = -1, second = -1; + long ms = msec % 1000; + long timeLeft = msec / 1000; + if (timeLeft > 0) { + second = timeLeft % 60; + timeLeft /= 60; + if (timeLeft > 0) { + minute = timeLeft % 60; + timeLeft /= 60; + if (timeLeft > 0) { + hour = timeLeft % 24; + day = timeLeft / 24; + } + } + } + StringBuilder sb = new StringBuilder(); + if (day != -1) { + sb.append(day + " days "); + } + if (hour != -1) { + sb.append(hour + " hours "); + } + if (minute != -1) { + sb.append(minute + " minutes "); + } + if (second != -1) { + sb.append(second + " seconds "); + } + sb.append(ms + " msec"); + + return sb.toString(); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (revision 0) @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.ql; + +/** + * MapRedStats. + * + * A data structure to keep one mapreduce's stats: + * number of mappers, number of reducers, accumulative CPU time and whether it + * succeeds. + * + */ +public class MapRedStats { + int numMap; + int numReduce; + long cpuMSec; + boolean success; + + public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess) { + this.numMap = numMap; + this.numReduce = numReduce; + this.cpuMSec = cpuMSec; + this.success = ifSuccess; + } + + public boolean isSuccess() { + return success; + } + + public long getCpuMSec() { + return cpuMSec; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + if (numMap > 0) { + sb.append("Map: " + numMap + " "); + } + + if (numReduce > 0) { + sb.append("Reduce: " + numReduce + " "); + } + + if (cpuMSec > 0) { + sb.append("Accumulative CPU: " + (cpuMSec / 1000D) + " sec "); + } + sb.append(success ? "SUCESS" : "FAIL"); + + return sb.toString(); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1138748) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -981,6 +981,8 @@ conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr); maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); + DriverContext driverCxt = null; + try { LOG.info("Starting command: " + queryStr); @@ -1034,8 +1036,10 @@ Queue> runnable = new ConcurrentLinkedQueue>(); Map running = new HashMap(); - DriverContext driverCxt = new DriverContext(runnable, ctx); + driverCxt = new DriverContext(runnable, ctx); + SessionState.get().setLastMapRedStatsList(driverCxt.getMapRedStatsList()); + // Add root Tasks to runnable for (Task tsk : plan.getRootTasks()) { @@ -1180,6 +1184,16 @@ conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, ""); } Utilities.PerfLogEnd(LOG, "Driver.execute"); + + if (driverCxt != null && driverCxt.getMapRedStatsList().size() > 0) { + long totalCpu = 0; + console.printInfo("MapReduce Jobs Launched: "); + for (int i = 0; i < driverCxt.getMapRedStatsList().size(); i++) { + console.printInfo("Job " + i + ": " + driverCxt.getMapRedStatsList().get(i)); + totalCpu += driverCxt.getMapRedStatsList().get(i).getCpuMSec(); + } + console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); + } } plan.setDone();