diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 92ab0cc..1f782db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -70,17 +70,22 @@ protected transient JobConf job; protected Task task; - protected transient int mapProgress = 0; - protected transient int reduceProgress = 0; - public transient JobID jobId; - private final LogHelper console; - private final HadoopJobExecHook callBackObj; + protected transient int mapProgress = -1; + protected transient int reduceProgress = -1; + + protected transient int lastMapProgress; + protected transient int lastReduceProgress; + public transient JobID jobId; + private LogHelper console; + private HadoopJobExecHook callBackObj; /** * Update counters relevant to this task. */ private void updateCounters(Counters ctrs, RunningJob rj) throws IOException { + lastMapProgress = mapProgress; + lastReduceProgress = reduceProgress; mapProgress = Math.round(rj.mapProgress() * 100); mapProgress = mapProgress == 100 ? (int)Math.floor(rj.mapProgress() * 100) : mapProgress; reduceProgress = Math.round(rj.reduceProgress() * 100); @@ -212,7 +217,6 @@ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { JobClient jc = th.getJobClient(); RunningJob rj = th.getRunningJob(); - String lastReport = ""; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); //DecimalFormat longFormatter = new DecimalFormat("###,###"); long reportTime = System.currentTimeMillis(); @@ -324,45 +328,47 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { } } - String report = " " + getId() + " map = " + mapProgress + "%, reduce = " + reduceProgress - + "%"; - - - if (!report.equals(lastReport) - || System.currentTimeMillis() >= reportTime + maxReportInterval) { - // find out CPU msecs - // In the case that we can't find out this number, we just skip the step to print - // it out. - if (ctrs != null) { - Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "CPU_MILLISECONDS"); - if (counterCpuMsec != null) { - long newCpuMSec = counterCpuMsec.getValue(); - if (newCpuMSec > 0) { - cpuMsec = newCpuMSec; - report += ", Cumulative CPU " - + (cpuMsec / 1000D) + " sec"; - } + if (mapProgress == lastMapProgress && reduceProgress == lastReduceProgress && + System.currentTimeMillis() < reportTime + maxReportInterval) { + continue; + } + StringBuilder report = new StringBuilder(); + report.append(dateFormat.format(Calendar.getInstance().getTime())); + + report.append(' ').append(getId()); + report.append(" map = ").append(mapProgress).append("%, "); + report.append(" reduce = ").append(reduceProgress).append('%'); + + // find out CPU msecs + // In the case that we can't find out this number, we just skip the step to print + // it out. + if (ctrs != null) { + Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + "CPU_MILLISECONDS"); + if (counterCpuMsec != null) { + long newCpuMSec = counterCpuMsec.getValue(); + if (newCpuMSec > 0) { + cpuMsec = newCpuMSec; + report.append(", Cumulative CPU ").append((cpuMsec / 1000D)).append(" sec"); } } + } - // write out serialized plan with counters to log file - // LOG.info(queryPlan); - String output = dateFormat.format(Calendar.getInstance().getTime()) + report; - SessionState ss = SessionState.get(); - if (ss != null) { - ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(), getId(), ctrs); - ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(), - Keys.TASK_HADOOP_PROGRESS, output); - if (ss.getConf().getBoolVar(HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS)) { - ss.getHiveHistory().progressTask(SessionState.get().getQueryId(), this.task); - this.callBackObj.logPlanProgress(ss); - } + // write out serialized plan with counters to log file + // LOG.info(queryPlan); + String output = report.toString(); + SessionState ss = SessionState.get(); + if (ss != null) { + ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(), getId(), ctrs); + ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(), + Keys.TASK_HADOOP_PROGRESS, output); + if (ss.getConf().getBoolVar(HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS)) { + ss.getHiveHistory().progressTask(SessionState.get().getQueryId(), this.task); + this.callBackObj.logPlanProgress(ss); } - console.printInfo(output); - lastReport = report; - reportTime = System.currentTimeMillis(); } + console.printInfo(output); + reportTime = System.currentTimeMillis(); } if (cpuMsec > 0) {