Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1150811) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -2257,4 +2257,44 @@ } return sb.toString(); } + + /** + * Format number of milliseconds to strings + * + * @param msec milliseconds + * @return a formatted string like "x days y hours z minutes a seconds b msec" + */ + public static String formatMsecToStr(long msec) { + long day = -1, hour = -1, minute = -1, second = -1; + long ms = msec % 1000; + long timeLeft = msec / 1000; + if (timeLeft > 0) { + second = timeLeft % 60; + timeLeft /= 60; + if (timeLeft > 0) { + minute = timeLeft % 60; + timeLeft /= 60; + if (timeLeft > 0) { + hour = timeLeft % 24; + day = timeLeft / 24; + } + } + } + StringBuilder sb = new StringBuilder(); + if (day != -1) { + sb.append(day + " days "); + } + if (hour != -1) { + sb.append(hour + " hours "); + } + if (minute != -1) { + sb.append(minute + " minutes "); + } + if (second != -1) { + sb.append(second + " seconds "); + } + sb.append(ms + " msec"); + + return sb.toString(); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1150811) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -30,6 +30,7 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; @@ -43,12 +44,13 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapred.Counters.Counter; public class HadoopJobExecHelper { - + protected transient JobConf job; protected Task task; - + protected transient int mapProgress = 0; protected transient int reduceProgress = 0; public transient String jobId; @@ -69,10 +71,10 @@ return; } if(callBackObj != null) { - callBackObj.updateCounters(ctrs, rj); + callBackObj.updateCounters(ctrs, rj); } } - + /** * This msg pattern is used to track when a job is started. * @@ -113,7 +115,7 @@ return reduceProgress == 100; } - + public String getJobId() { return jobId; } @@ -122,10 +124,10 @@ this.jobId = jobId; } - + public HadoopJobExecHelper() { } - + public HadoopJobExecHelper(JobConf job, LogHelper console, Task task, HadoopJobExecHook hookCallBack) { this.job = job; @@ -134,7 +136,7 @@ this.callBackObj = hookCallBack; } - + /** * A list of the currently running jobs spawned in this Hive instance that is used to kill all * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are @@ -143,7 +145,7 @@ public static Map runningJobKillURIs = Collections .synchronizedMap(new HashMap()); - + /** * In Hive, when the user control-c's the command line, any running jobs spawned from that command * line are best-effort killed. @@ -200,12 +202,13 @@ } return this.callBackObj.checkFatalErrors(ctrs, errMsg); } - - private boolean progress(ExecDriverTaskHandle th) throws IOException { + + private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { JobClient jc = th.getJobClient(); RunningJob rj = th.getRunningJob(); String lastReport = ""; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); + //DecimalFormat longFormatter = new DecimalFormat("###,###"); long reportTime = System.currentTimeMillis(); long maxReportInterval = 60 * 1000; // One minute boolean fatal = false; @@ -213,6 +216,10 @@ long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL); boolean initializing = true; boolean initOutputPrinted = false; + long cpuMsec = -1; + int numMap = -1; + int numReduce = -1; + while (!rj.isComplete()) { try { Thread.sleep(pullInterval); @@ -233,24 +240,24 @@ String logMapper; String logReducer; - + TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID()); if (mappers == null) { logMapper = "no information for number of mappers; "; } else { - int numMap = mappers.length; + numMap = mappers.length; if (ss != null) { ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(), Keys.TASK_NUM_MAPPERS, Integer.toString(numMap)); } logMapper = "number of mappers: " + numMap + "; "; } - + TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID()); if (reducers == null) { logReducer = "no information for number of reducers. "; } else { - int numReduce = reducers.length; + numReduce = reducers.length; if (ss != null) { ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(), Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce)); @@ -295,8 +302,22 @@ String report = " " + getId() + " map = " + mapProgress + "%, reduce = " + reduceProgress + "%"; + if (!report.equals(lastReport) || System.currentTimeMillis() >= reportTime + maxReportInterval) { + // find out CPU msecs + // In the case that we can't find out this number, we just skip the step to print + // it out. + Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "CPU_MILLISECONDS"); + if (counterCpuMsec != null) { + long newCpuMSec = counterCpuMsec.getValue(); + if (newCpuMSec > 0) { + cpuMsec = newCpuMSec; + report += ", Cumulative CPU " + + (cpuMsec / 1000D) + " sec"; + } + } // write out serialized plan with counters to log file // LOG.info(queryPlan); @@ -315,9 +336,14 @@ } } + if (cpuMsec > 0) { + console.printInfo("MapReduce Total cumulative CPU time: " + + Utilities.formatMsecToStr(cpuMsec)); + } + boolean success; + Counters ctrs = th.getCounters(); - if (fatal) { success = false; } else { @@ -331,6 +357,61 @@ } } + Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "CPU_MILLISECONDS"); + if (counterCpuMsec != null) { + long newCpuMSec = counterCpuMsec.getValue(); + if (newCpuMSec > cpuMsec) { + cpuMsec = newCpuMSec; + } + } + + MapRedStats mapRedStats = new MapRedStats(numMap, numReduce, cpuMsec, success); + + Counter ctr; + + ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_SHUFFLE_BYTES"); + if (ctr != null) { + mapRedStats.setReduceShuffleBytes(ctr.getValue()); + } + + ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "MAP_INPUT_RECORDS"); + if (ctr != null) { + mapRedStats.setMapInputRecords(ctr.getValue()); + } + + ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "MAP_OUTPUT_RECORDS"); + if (ctr != null) { + mapRedStats.setMapOutputRecords(ctr.getValue()); + } + + ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_INPUT_RECORDS"); + if (ctr != null) { + mapRedStats.setReduceInputRecords(ctr.getValue()); + } + + ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "REDUCE_OUTPUT_RECORDS"); + if (ctr != null) { + mapRedStats.setReduceOutputRecords(ctr.getValue()); + } + + ctr = ctrs.findCounter("FileSystemCounters", + "HDFS_BYTES_READ"); + if (ctr != null) { + mapRedStats.setHdfsRead(ctr.getValue()); + } + + ctr = ctrs.findCounter("FileSystemCounters", + "HDFS_BYTES_WRITTEN"); + if (ctr != null) { + mapRedStats.setHdfsWrite(ctr.getValue()); + } + this.task.setDone(); // update based on the final value of the counters updateCounters(ctrs, rj); @@ -340,9 +421,9 @@ this.callBackObj.logPlanProgress(ss); } // LOG.info(queryPlan); - return (success); + return mapRedStats; } - + private String getId() { return this.task.getId(); } @@ -396,7 +477,7 @@ return rj.getCounters(); } } - + // Used for showJobFailDebugInfo private static class TaskInfo { String jobId; @@ -419,7 +500,7 @@ return jobId; } } - + @SuppressWarnings("deprecation") private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException { // Mapping from task ID to the number of failures @@ -549,9 +630,9 @@ public int progress(RunningJob rj, JobClient jc) throws IOException { jobId = rj.getJobID(); - + int returnVal = 0; - + // remove the pwd from conf file so that job tracker doesn't show this // logs String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD); @@ -570,8 +651,15 @@ ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); jobInfo(rj); - boolean success = progress(th); + MapRedStats mapRedStats = progress(th); + // Not always there is a SessionState. Sometimes ExeDriver is directly invoked + // for special modes. In that case, SessionState.get() is empty. + if (SessionState.get() != null) { + SessionState.get().getLastMapRedStatsList().add(mapRedStats); + } + boolean success = mapRedStats.isSuccess(); + String statusMesg = getJobEndMsg(rj.getJobID()); if (!success) { statusMesg += " with errors"; @@ -583,7 +671,7 @@ } else { console.printInfo(statusMesg); } - + return returnVal; } } Index: ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (revision 0) @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +/** + * MapRedStats. + * + * A data structure to keep one mapreduce's stats: + * number of mappers, number of reducers, accumulative CPU time and whether it + * succeeds. + * + */ +public class MapRedStats { + int numMap; + int numReduce; + long cpuMSec; + long hdfsRead = -1; + long hdfsWrite = -1; + long mapInputRecords = -1; + long mapOutputRecords = -1; + long reduceInputRecords = -1; + long reduceOutputRecords = -1; + long reduceShuffleBytes = -1; + boolean success; + + public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess) { + this.numMap = numMap; + this.numReduce = numReduce; + this.cpuMSec = cpuMSec; + this.success = ifSuccess; + } + + public boolean isSuccess() { + return success; + } + + public long getCpuMSec() { + return cpuMSec; + } + + public int getNumMap() { + return numMap; + } + + public void setNumMap(int numMap) { + this.numMap = numMap; + } + + public int getNumReduce() { + return numReduce; + } + + public void setNumReduce(int numReduce) { + this.numReduce = numReduce; + } + + public long getHdfsRead() { + return hdfsRead; + } + + public void setHdfsRead(long hdfsRead) { + this.hdfsRead = hdfsRead; + } + + public long getHdfsWrite() { + return hdfsWrite; + } + + public void setHdfsWrite(long hdfsWrite) { + this.hdfsWrite = hdfsWrite; + } + + public long getMapInputRecords() { + return mapInputRecords; + } + + public void setMapInputRecords(long mapInputRecords) { + this.mapInputRecords = mapInputRecords; + } + + public long getMapOutputRecords() { + return mapOutputRecords; + } + + public void setMapOutputRecords(long mapOutputRecords) { + this.mapOutputRecords = mapOutputRecords; + } + + public long getReduceInputRecords() { + return reduceInputRecords; + } + + public void setReduceInputRecords(long reduceInputRecords) { + this.reduceInputRecords = reduceInputRecords; + } + + public long getReduceOutputRecords() { + return reduceOutputRecords; + } + + public void setReduceOutputRecords(long reduceOutputRecords) { + this.reduceOutputRecords = reduceOutputRecords; + } + + public long getReduceShuffleBytes() { + return reduceShuffleBytes; + } + + public void setReduceShuffleBytes(long reduceShuffleBytes) { + this.reduceShuffleBytes = reduceShuffleBytes; + } + + public void setCpuMSec(long cpuMSec) { + this.cpuMSec = cpuMSec; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + if (numMap > 0) { + sb.append("Map: " + numMap + " "); + } + + if (numReduce > 0) { + sb.append("Reduce: " + numReduce + " "); + } + + if (cpuMSec > 0) { + sb.append(" Accumulative CPU: " + (cpuMSec / 1000D) + " sec "); + } + + if (hdfsRead >= 0) { + sb.append(" HDFS Read: " + hdfsRead); + } + + if (hdfsWrite >= 0) { + sb.append(" HDFS Write: " + hdfsWrite); + } + + sb.append(" " + (success ? "SUCESS" : "FAIL")); + + return sb.toString(); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1150811) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -1036,6 +1036,8 @@ DriverContext driverCxt = new DriverContext(runnable, ctx); + SessionState.get().setLastMapRedStatsList(new ArrayList()); + // Add root Tasks to runnable for (Task tsk : plan.getRootTasks()) { @@ -1180,6 +1182,17 @@ conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, ""); } Utilities.PerfLogEnd(LOG, "Driver.execute"); + + if (SessionState.get().getLastMapRedStatsList() != null + && SessionState.get().getLastMapRedStatsList().size() > 0) { + long totalCpu = 0; + console.printInfo("MapReduce Jobs Launched: "); + for (int i = 0; i < SessionState.get().getLastMapRedStatsList().size(); i++) { + console.printInfo("Job " + i + ": " + SessionState.get().getLastMapRedStatsList().get(i)); + totalCpu += SessionState.get().getLastMapRedStatsList().get(i).getCpuMSec(); + } + console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); + } } plan.setDone(); Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 1150811) +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -107,6 +108,8 @@ private CreateTableAutomaticGrant createTableGrants; + private List lastMapRedStatsList; + /** * Lineage state. */ @@ -641,4 +644,12 @@ public void setCreateTableGrants(CreateTableAutomaticGrant createTableGrants) { this.createTableGrants = createTableGrants; } + + public List getLastMapRedStatsList() { + return lastMapRedStatsList; + } + + public void setLastMapRedStatsList(List lastMapRedStatsList) { + this.lastMapRedStatsList = lastMapRedStatsList; + } }