Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1389161) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.plan.ReducersSkewPerJob; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher; @@ -696,6 +697,12 @@ // for special modes. In that case, SessionState.get() is empty. if (SessionState.get() != null) { SessionState.get().getLastMapRedStatsList().add(mapRedStats); + + // Computes the skew for all the MapReduce irrespective + // of Success or Failure + if (this.task.getQueryPlan() != null) { + computeReducersSkewPerJob(rj); + } } boolean success = mapRedStats.isSuccess(); @@ -733,6 +740,30 @@ return returnVal; } + + private void computeReducersSkewPerJob(RunningJob rj) throws IOException { + TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(0); + List reducersRunTimes = new ArrayList(); + + for (TaskCompletionEvent taskCompletion : taskCompletions) { + String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(taskCompletion); + if (taskJobIds == null) { + // Task attempt info is unavailable in this Hadoop version"); + continue; + } + String taskId = taskJobIds[0]; + if (!taskCompletion.isMapTask()) { + reducersRunTimes.add(new Integer(taskCompletion.getTaskRunTime())); + } + } + // Compute the reducers skew statistics for the job + ReducersSkewPerJob reduceSkewStat = new ReducersSkewPerJob(reducersRunTimes, new String( + this.jobId)); + // adding the reducers skew statistics for the job in the QueryPlan + this.task.getQueryPlan().getReducersSkewStats().add(reduceSkewStat); + } + + private Map extractAllCounterValues(Counters counters) { Map exctractedCounters = new HashMap(); for (Counters.Group cg : counters) { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ReducersSkewPerJob.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReducersSkewPerJob.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReducersSkewPerJob.java (working copy) @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.List; + +/* + * Stores the Skew in the Run time of all the Reducers + * per Job based upon the task completion events of these Reducers. + * The values are computed in the HadoopJobExecHelper when the + * job completes and then populated inside the QueryPlan for + * each job, from where it can be later on accessed. + * The skew statistics consist of minimum/maximum/mean/stdv of the + * run times of the reducers for a job. All the Run times are + * in Milliseconds. + */ +public class ReducersSkewPerJob { + // The statistics for storing skew in the reducers. + // The values are in Milliseconds. + private final long minimumSkew; + private final long maximumSkew; + private final double meanSkew; + private final double standardDeviationSkew; + private final String jobId; + + + /* + * Computes the skew parameters based upon the run times of the reducers in a job. + */ + public ReducersSkewPerJob(List reducersRunTimes, String jobId) { + this.jobId = jobId; + + // If no Run times present, then set -1, indicating no values + if (!reducersRunTimes.isEmpty()) { + long minimumSkew = reducersRunTimes.get(0); + long maximumSkew = reducersRunTimes.get(0); + long sumSkew = reducersRunTimes.get(0); + double standardDeviationSkew = 0.0; + double meanSkew; + + for (int i = 1; i < reducersRunTimes.size(); i++) { + if (reducersRunTimes.get(i) < minimumSkew) { + minimumSkew = reducersRunTimes.get(i); + } + if (reducersRunTimes.get(i) > maximumSkew) { + maximumSkew = reducersRunTimes.get(i); + } + sumSkew += reducersRunTimes.get(i); + } + meanSkew = (double) sumSkew / reducersRunTimes.size(); + + for (int i = 0; i < reducersRunTimes.size(); i++) { + standardDeviationSkew += Math.pow(meanSkew - reducersRunTimes.get(i), 2); + } + standardDeviationSkew /= reducersRunTimes.size(); + standardDeviationSkew = Math.sqrt(standardDeviationSkew); + + this.minimumSkew = minimumSkew; + this.maximumSkew = maximumSkew; + this.meanSkew = meanSkew; + this.standardDeviationSkew = standardDeviationSkew; + return; + } + this.minimumSkew = -1; + this.maximumSkew = -1; + this.meanSkew = -1.0; + this.standardDeviationSkew = -1.0; + } + + public long getMinimumSkew() { + return this.minimumSkew; + } + + public long getMaximumSkew() { + return this.maximumSkew; + } + + public double getMeanSkew() { + return this.meanSkew; + } + + public double getStandardDeviationSkew() { + return this.standardDeviationSkew; + } + + public String getJobId() { + return this.jobId; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (revision 1389161) +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReducersSkewPerJob; import org.apache.hadoop.hive.ql.plan.api.AdjacencyType; import org.apache.hadoop.hive.ql.plan.api.NodeType; import org.apache.hadoop.hive.ql.plan.api.TaskType; @@ -67,6 +68,7 @@ private ArrayList> rootTasks; private FetchTask fetchTask; + private final List reducersSkewStats; private HashSet inputs; /** @@ -94,12 +96,14 @@ private transient Long queryStartTime; public QueryPlan() { + reducersSkewStats = new ArrayList(); } public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime) { this.queryString = queryString; rootTasks = new ArrayList>(); + reducersSkewStats = new ArrayList(); rootTasks.addAll(sem.getRootTasks()); fetchTask = sem.getFetchTask(); // Note that inputs and outputs can be changed when the query gets executed @@ -706,6 +710,10 @@ return query; } + public List getReducersSkewStats() { + return this.reducersSkewStats; + } + public void setQuery(org.apache.hadoop.hive.ql.plan.api.Query query) { this.query = query; }