diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 288da8e..fdee6ec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -182,6 +182,7 @@ public ExecDriver(MapredWork plan, JobConf job, boolean isSilent) throws HiveExc * * @return true if fatal errors happened during job execution, false otherwise. */ + @Override public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { Counters.Counter cntr = ctrs.findCounter( HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP), @@ -450,7 +451,7 @@ public int execute(DriverContext driverContext) { if (returnVal != 0) { rj.killJob(); } - HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID()); + HadoopJobExecHelper.runningJobs.remove(rj); jobID = rj.getID().toString(); } } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index a4585de..c2af869 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -50,6 +51,7 @@ import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; @@ -67,9 +69,9 @@ protected transient int mapProgress = 0; protected transient int reduceProgress = 0; - public transient String jobId; - private LogHelper console; - private HadoopJobExecHook callBackObj; + public transient JobID jobId; + private final LogHelper console; + private final HadoopJobExecHook callBackObj; /** * Update counters relevant to this task. @@ -89,7 +91,7 @@ private void updateCounters(Counters ctrs, RunningJob rj) throws IOException { * @param jobId * @return */ - private static String getJobStartMsg(String jobId) { + private static String getJobStartMsg(JobID jobId) { return "Starting Job = " + jobId; } @@ -99,7 +101,7 @@ private static String getJobStartMsg(String jobId) { * @param jobId * @return the job end message */ - public static String getJobEndMsg(String jobId) { + public static String getJobEndMsg(JobID jobId) { return "Ended Job = " + jobId; } @@ -120,11 +122,11 @@ public boolean reduceDone() { } - public String getJobId() { + public JobID getJobId() { return jobId; } - public void setJobId(String jobId) { + public void setJobId(JobID jobId) { this.jobId = jobId; } @@ -148,8 +150,8 @@ public HadoopJobExecHelper(JobConf job, LogHelper console, * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are * still jobs running. */ - public static Map runningJobKillURIs = Collections - .synchronizedMap(new HashMap()); + public static List runningJobs = Collections + .synchronizedList(new LinkedList()); /** @@ -161,32 +163,23 @@ public HadoopJobExecHelper(JobConf job, LogHelper console, * */ static { - if (new org.apache.hadoop.conf.Configuration() - .getBoolean("webinterface.private.actions", false)) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { killRunningJobs(); } }); - } } public static void killRunningJobs() { - synchronized (runningJobKillURIs) { - for (String uri : runningJobKillURIs.values()) { + synchronized (runningJobs) { + for (RunningJob rj : runningJobs) { try { - System.err.println("killing job with: " + uri); - java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri) - .openConnection(); - conn.setRequestMethod("POST"); - int retCode = conn.getResponseCode(); - if (retCode != 200) { - System.err.println("Got an error trying to kill job with URI: " + uri + " = " - + retCode); - } + System.err.println("killing job with: " + rj.getID()); + rj.killJob(); } catch (Exception e) { - System.err.println("trying to kill job, caught: " + e); + LOG.warn(e); + System.err.println("Failed to kill job: "+ rj.getID()); // do nothing } } @@ -252,7 +245,7 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { String logMapper; String logReducer; - TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID()); + TaskReport[] mappers = jc.getMapTaskReports(rj.getID()); if (mappers == null) { logMapper = "no information for number of mappers; "; } else { @@ -264,7 +257,7 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { logMapper = "number of mappers: " + numMap + "; "; } - TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID()); + TaskReport[] reducers = jc.getReduceTaskReports(rj.getID()); if (reducers == null) { logReducer = "no information for number of reducers. "; } else { @@ -281,13 +274,13 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { initOutputPrinted = true; } - RunningJob newRj = jc.getJob(rj.getJobID()); + RunningJob newRj = jc.getJob(rj.getID()); if (newRj == null) { // under exceptional load, hadoop may not be able to look up status // of finished jobs (because it has purged them from memory). From // hive's perspective - it's equivalent to the job having failed. // So raise a meaningful exception - throw new IOException("Could not find status of job:" + rj.getJobID()); + throw new IOException("Could not find status of job:" + rj.getID()); } else { th.setRunningJob(newRj); rj = newRj; @@ -428,12 +421,12 @@ public void jobInfo(RunningJob rj) { } else { if (SessionState.get() != null) { SessionState.get().getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), - getId(), Keys.TASK_HADOOP_ID, rj.getJobID()); + getId(), Keys.TASK_HADOOP_ID, rj.getID().toString()); } - console.printInfo(getJobStartMsg(rj.getJobID()) + ", Tracking URL = " + console.printInfo(getJobStartMsg(rj.getID()) + ", Tracking URL = " + rj.getTrackingURL()); console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN) - + " job -kill " + rj.getJobID()); + + " job -kill " + rj.getID()); } } @@ -509,7 +502,7 @@ public int progressLocal(Process runningJob, String taskId) { public int progress(RunningJob rj, JobClient jc) throws IOException { - jobId = rj.getJobID(); + jobId = rj.getID(); int returnVal = 0; @@ -527,7 +520,7 @@ public int progress(RunningJob rj, JobClient jc) throws IOException { // add to list of running jobs to kill in case of abnormal shutdown - runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill"); + runningJobs.add(rj); ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); jobInfo(rj); @@ -548,7 +541,7 @@ public int progress(RunningJob rj, JobClient jc) throws IOException { boolean success = mapRedStats.isSuccess(); - String statusMesg = getJobEndMsg(rj.getJobID()); + String statusMesg = getJobEndMsg(rj.getID()); if (!success) { statusMesg += " with errors"; returnVal = 2; @@ -592,8 +585,7 @@ private void computeReducerTimeStatsPerJob(RunningJob rj) throws IOException { } } // Compute the reducers run time statistics for the job - ReducerTimeStatsPerJob reducerTimeStatsPerJob = new ReducerTimeStatsPerJob(reducersRunTimes, - new String(this.jobId)); + ReducerTimeStatsPerJob reducerTimeStatsPerJob = new ReducerTimeStatsPerJob(reducersRunTimes); // Adding the reducers run time statistics for the job in the QueryPlan this.task.getQueryPlan().getReducerTimeStatsPerJobList().add(reducerTimeStatsPerJob); return; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java index 5a6899c..6ce596a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java @@ -241,7 +241,7 @@ public int execute(DriverContext driverContext) { if (returnVal != 0) { rj.killJob(); } - HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID()); + HadoopJobExecHelper.runningJobs.remove(rj); jobID = rj.getID().toString(); } RCFileMergeMapper.jobClose(outputPath, success, job, console, @@ -372,5 +372,5 @@ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { @Override public void logPlanProgress(SessionState ss) throws IOException { // no op - } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index 4b58d92..ee62ecd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -248,7 +248,7 @@ public int execute(DriverContext driverContext) { if (returnVal != 0) { rj.killJob(); } - HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID()); + HadoopJobExecHelper.runningJobs.remove(rj); jobID = rj.getID().toString(); } } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 21b537c..4f9d29e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -217,7 +217,7 @@ public int execute(DriverContext driverContext) { if (returnVal != 0) { rj.killJob(); } - HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID()); + HadoopJobExecHelper.runningJobs.remove(rj); jobID = rj.getID().toString(); } ColumnTruncateMapper.jobClose(outputPath, success, job, console, diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java index 40c27e3..9c11000 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java @@ -32,9 +32,6 @@ */ public class ReducerTimeStatsPerJob { - // stores the JobId of the job - private final String jobId; - // Stores the temporal statistics in milliseconds for reducers // specific to a Job private final long minimumTime; @@ -47,8 +44,7 @@ * Computes the temporal run time statistics of the reducers * for a specific JobId. */ - public ReducerTimeStatsPerJob(List reducersRunTimes, String jobId) { - this.jobId = jobId; + public ReducerTimeStatsPerJob(List reducersRunTimes) { // If no Run times present, then set -1, indicating no values if (!reducersRunTimes.isEmpty()) { @@ -103,9 +99,4 @@ public double getMeanTime() { public double getStandardDeviationTime() { return this.standardDeviationTime; } - - public String getJobId() { - return this.jobId; - } - }