Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1389161) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.MapRedStats; +import org.apache.hadoop.hive.ql.ReducerSkewStat; import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; @@ -696,6 +697,11 @@ // for special modes. In that case, SessionState.get() is empty. if (SessionState.get() != null) { SessionState.get().getLastMapRedStatsList().add(mapRedStats); + // Computes the skew for all the MapReduce irrespective + // of Success or Failure + if (this.task.getQueryPlan() != null) { + computeReduceSkew(rj); + } } boolean success = mapRedStats.isSuccess(); @@ -733,6 +739,69 @@ return returnVal; } + + private void computeStatistics(ArrayList reduceTimes) { + + ReducerSkewStat reduceSkewStat = new ReducerSkewStat(); + reduceSkewStat.jobId = new String(this.jobId); + + // If zero then -1 is taken for all + if (reduceTimes.size() != 0) { + long minimum = reduceTimes.get(0); + long maximum = reduceTimes.get(0); + long sum = reduceTimes.get(0); + double standardDeviation = 0.0; + double mean; + + for (int i = 1; i < reduceTimes.size(); i++) { + if (reduceTimes.get(i) < minimum) { + minimum = reduceTimes.get(i); + } + if (reduceTimes.get(i) > maximum) { + maximum = reduceTimes.get(i); + } + sum += reduceTimes.get(i); + } + mean = (double)sum / reduceTimes.size(); + + for (int i = 0; i < reduceTimes.size(); i++) { + standardDeviation += Math.pow(mean - reduceTimes.get(i), 2); + } + standardDeviation /= reduceTimes.size(); + standardDeviation = Math.sqrt(standardDeviation); + + reduceSkewStat.minimum = minimum; + reduceSkewStat.maximum = maximum; + reduceSkewStat.mean = mean; + reduceSkewStat.standardDeviation = standardDeviation; + } + this.task.getQueryPlan().getReducerSkewStats().add(reduceSkewStat); + } + + + private void computeReduceSkew(RunningJob rj) throws IOException { + if (this.task.getQueryPlan().getReducerSkewStats() == null) { + this.task.getQueryPlan().setReducerSkewStats(new ArrayList()); + } + TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(0); + ArrayList reduceTimes = new ArrayList(); + + for (TaskCompletionEvent taskCompletion : taskCompletions) { + String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(taskCompletion); + if (taskJobIds == null) { + // Task attempt info is unavailable in this Hadoop version"); + continue; + } + String taskId = taskJobIds[0]; + if (!taskCompletion.isMapTask()) { + reduceTimes.add(new Integer(taskCompletion.getTaskRunTime())); + } + } + // Compute the statistics now + computeStatistics(reduceTimes); + } + + private Map extractAllCounterValues(Counters counters) { Map exctractedCounters = new HashMap(); for (Counters.Group cg : counters) { Index: ql/src/java/org/apache/hadoop/hive/ql/ReducerSkewStat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ReducerSkewStat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/ReducerSkewStat.java (working copy) @@ -0,0 +1,9 @@ +package org.apache.hadoop.hive.ql; + +public class ReducerSkewStat { + public long minimum = -1; + public long maximum = -1; + public double mean = -1.0; + public double standardDeviation = -1.0; + public String jobId = ""; +} Index: ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (revision 1389161) +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (working copy) @@ -67,6 +67,7 @@ private ArrayList> rootTasks; private FetchTask fetchTask; + private List reducerSkewStats; private HashSet inputs; /** @@ -706,6 +707,14 @@ return query; } + public List getReducerSkewStats() { + return this.reducerSkewStats; + } + + public void setReducerSkewStats(List reducerSkewStats) { + this.reducerSkewStats = reducerSkewStats; + } + public void setQuery(org.apache.hadoop.hive.ql.plan.api.Query query) { this.query = query; }