Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1389161) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.MapRedStats; +import org.apache.hadoop.hive.ql.ReducerSkewStat; import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; @@ -696,6 +697,11 @@ // for special modes. In that case, SessionState.get() is empty. if (SessionState.get() != null) { SessionState.get().getLastMapRedStatsList().add(mapRedStats); + // Computes the skew for all the MapReduce irrespective + // of Success or Failure + if (this.task.getQueryPlan() != null) { + computeReduceSkew(rj); + } } boolean success = mapRedStats.isSuccess(); @@ -733,6 +739,69 @@ return returnVal; } + + private void computeStatistics(List reduceTimes) { + + ReducerSkewStat reduceSkewStat = new ReducerSkewStat(); + reduceSkewStat.setJobId(new String(this.jobId)); + + // If zero then -1 is taken for all + if (!reduceTimes.isEmpty()) { + long minimum = reduceTimes.get(0); + long maximum = reduceTimes.get(0); + long sum = reduceTimes.get(0); + double standardDeviation = 0.0; + double mean; + + for (int i = 1; i < reduceTimes.size(); i++) { + if (reduceTimes.get(i) < minimum) { + minimum = reduceTimes.get(i); + } + if (reduceTimes.get(i) > maximum) { + maximum = reduceTimes.get(i); + } + sum += reduceTimes.get(i); + } + mean = (double)sum / reduceTimes.size(); + + for (int i = 0; i < reduceTimes.size(); i++) { + standardDeviation += Math.pow(mean - reduceTimes.get(i), 2); + } + standardDeviation /= reduceTimes.size(); + standardDeviation = Math.sqrt(standardDeviation); + + reduceSkewStat.setMinSkew(minimum); + reduceSkewStat.setMaxSkew(maximum); + reduceSkewStat.setMeanSkew(mean); + reduceSkewStat.setStandardDeviationSkew(standardDeviation); + } + this.task.getQueryPlan().getReducerSkewStats().add(reduceSkewStat); + } + + + private void computeReduceSkew(RunningJob rj) throws IOException { + if (this.task.getQueryPlan().getReducerSkewStats() == null) { + this.task.getQueryPlan().setReducerSkewStats(new ArrayList()); + } + TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(0); + List reduceTimes = new ArrayList(); + + for (TaskCompletionEvent taskCompletion : taskCompletions) { + String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(taskCompletion); + if (taskJobIds == null) { + // Task attempt info is unavailable in this Hadoop version"); + continue; + } + String taskId = taskJobIds[0]; + if (!taskCompletion.isMapTask()) { + reduceTimes.add(new Integer(taskCompletion.getTaskRunTime())); + } + } + // Compute the statistics now + computeStatistics(reduceTimes); + } + + private Map extractAllCounterValues(Counters counters) { Map exctractedCounters = new HashMap(); for (Counters.Group cg : counters) { Index: ql/src/java/org/apache/hadoop/hive/ql/ReducerSkewStat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ReducerSkewStat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/ReducerSkewStat.java (working copy) @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +/* + * Stores the skew in the Reducers based upon the + * task completion events of the Reducers. + * The values are computed in the HadoopJobExecHelper + * and then populated inside the QueryPlan + */ +public class ReducerSkewStat { + private long minimum; + private long maximum; + private double mean; + private double standardDeviation; + private String jobId; + + public ReducerSkewStat() { + this.minimum = -1; + this.maximum = -1; + this.mean = -1.0; + this.standardDeviation = -1.0; + this.jobId = ""; + } + + public long getMinSkew() { + return this.minimum; + } + + public long getMaxSkew() { + return this.maximum; + } + + public double getMeanSkew() { + return this.mean; + } + + public double getStandardDeviationSkew() { + return this.standardDeviation; + } + + public String getJobId() { + return this.jobId; + } + + public void setMinSkew(long minimum) { + this.minimum = minimum; + } + + public void setMaxSkew(long maximum) { + this.maximum = maximum; + } + + public void setMeanSkew(double mean) { + this.mean = mean; + } + + public void setStandardDeviationSkew(double standardDeviation) { + this.standardDeviation = standardDeviation; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + +} + Index: ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (revision 1389161) +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (working copy) @@ -67,6 +67,7 @@ private ArrayList> rootTasks; private FetchTask fetchTask; + private List reducerSkewStats; private HashSet inputs; /** @@ -706,6 +707,14 @@ return query; } + public List getReducerSkewStats() { + return this.reducerSkewStats; + } + + public void setReducerSkewStats(List reducerSkewStats) { + this.reducerSkewStats = reducerSkewStats; + } + public void setQuery(org.apache.hadoop.hive.ql.plan.api.Query query) { this.query = query; }