diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java index d3dc3f7..0977d9c 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java @@ -31,6 +31,7 @@ public static final String EXIT_FNAME = "exit"; public static final int WATCHER_TIMEOUT_SECS = 10; public static final int KEEP_ALIVE_MSEC = 60 * 1000; + public static final int POLL_JOBPROGRESS_MSEC = 30 * 1000; /** * A comma-separated list of files to be added to HADOOP_CLASSPATH in * {@link org.apache.hive.hcatalog.templeton.tool.LaunchMapper}. Used to localize additional diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java index 91fe247..89a647b 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java @@ -51,6 +51,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -150,13 +151,8 @@ else if(TempletonUtils.isset(System.getenv(pathVarName))) { env.put(pathVarName, paths); } } - protected Process startJob(Context context, String user, String overrideClasspath) + protected Process startJob(Configuration conf, String jobId, String user, String overrideClasspath) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - - // Kill previously launched child MR jobs started by this launcher to prevent having - // same jobs running side-by-side - killLauncherChildJobs(conf, context.getJobID().toString()); copyLocal(COPY_NAME, conf); String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME)); @@ -175,7 +171,7 @@ protected Process startJob(Context context, String user, String overrideClasspat handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER, "mapreduce.job.credentials.binary"); handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ, "tez.credentials.path"); handleMapReduceJobTag(jarArgsList, JobSubmissionConstants.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER, - JobSubmissionConstants.MAPREDUCE_JOB_TAGS, context.getJobID().toString()); + JobSubmissionConstants.MAPREDUCE_JOB_TAGS, jobId); return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env); } @@ -289,17 +285,96 @@ private void copyLocal(String var, Configuration conf) throws IOException { } } + /** + * Checks if reconnection to an already running job is supported for a given + * job type. + * @param jobType + * @return + */ + private boolean reconnectToRunningJobSupported(LauncherDelegator.JobType jobType) { + return jobType.equals(LauncherDelegator.JobType.JAR) + || jobType.equals(LauncherDelegator.JobType.STREAMING); + } + + /** + * Attempts to reconnect to an already running child job of the templeton launcher. This + * is used in cases where the templeton launcher task has failed and is retried by the + * MR framework. If reconnect to the child job is possible, the method will continue + * tracking its progress until completion. + * @return Returns true if reconnect was successful, false if not supported or + * no child jobs were found. + */ + private boolean tryReconnectToRunningJob(Configuration conf, Context context, + LauncherDelegator.JobType jobType, String statusdir) throws IOException, InterruptedException { + if (!reconnectToRunningJobSupported(jobType)) { + return false; + } + + long startTime = getTempletonLaunchTime(conf); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + WebHCatJTShim tracker = ShimLoader.getHadoopShims().getWebHCatShim(conf, ugi); + try { + Set childJobs = tracker.getJobs(context.getJobID().toString(), startTime); + if (childJobs.size() == 0) { + LOG.info("No child jobs found to reconnect with"); + return false; + } + + if (childJobs.size() > 1) { + LOG.warn(String.format( + "Found more than one child job to reconnect with: %s, skipping reconnect", + Arrays.toString(childJobs.toArray()))); + return false; + } + + String childJobIdString = childJobs.iterator().next(); + org.apache.hadoop.mapred.JobID childJobId = + org.apache.hadoop.mapred.JobID.forName(childJobIdString); + LOG.info(String.format("Reconnecting to an existing job %s", childJobIdString)); + + // Update job state with the childJob id + updateJobStatePercentAndChildId(conf, context.getJobID().toString(), null, childJobIdString); + + do { + org.apache.hadoop.mapred.JobStatus jobStatus = tracker.getJobStatus(childJobId); + if (jobStatus.isJobComplete()) { + LOG.info(String.format("Child job %s completed", childJobIdString)); + int exitCode = 0; + if (jobStatus.getState().getValue() != org.apache.hadoop.mapred.JobStatus.SUCCEEDED) { + exitCode = 1; + } + updateJobStateToDoneAndWriteExitValue(conf, statusdir, context.getJobID().toString(), + exitCode); + break; + } + + String percent = String.format("map %s%%, reduce %s%%", + jobStatus.getMapProgress()*100, jobStatus.getReduceProgress()*100); + updateJobStatePercentAndChildId(conf, context.getJobID().toString(), percent, null); + + LOG.info("KeepAlive Heart beat"); + + context.progress(); + Thread.sleep(POLL_JOBPROGRESS_MSEC); + } while (true); + + // Reconnect was successful + return true; + } + catch (IOException ex) { + LOG.error("Exception encountered in tryReconnectToRunningJob", ex); + throw ex; + } finally { + tracker.close(); + } + } + @Override public void run(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - - Process proc = startJob(context, - conf.get("user.name"), - conf.get(OVERRIDE_CLASSPATH)); - + LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE)); String statusdir = conf.get(STATUSDIR_NAME); - if (statusdir != null) { try { statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir, @@ -311,8 +386,20 @@ public void run(Context context) throws IOException, InterruptedException { } } - Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG)); - LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE)); + // Try to reconnect to a child job if one is found + if (tryReconnectToRunningJob(conf, context, jobType, statusdir)) { + return; + } + + // Kill previously launched child MR jobs started by this launcher to prevent having + // same jobs running side-by-side + killLauncherChildJobs(conf, context.getJobID().toString()); + + // Start the job + Process proc = startJob(conf, + context.getJobID().toString(), + conf.get("user.name"), + conf.get(OVERRIDE_CLASSPATH)); ExecutorService pool = Executors.newCachedThreadPool(); executeWatcher(pool, conf, context.getJobID(), @@ -328,27 +415,67 @@ public void run(Context context) throws IOException, InterruptedException { pool.shutdownNow(); } - writeExitValue(conf, proc.exitValue(), statusdir); - JobState state = new JobState(context.getJobID().toString(), conf); - state.setExitValue(proc.exitValue()); - state.setCompleteStatus("done"); - state.close(); + updateJobStateToDoneAndWriteExitValue(conf, statusdir, context.getJobID().toString(), + proc.exitValue()); + Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG)); if (enablelog && TempletonUtils.isset(statusdir)) { LOG.info("templeton: collecting logs for " + context.getJobID().toString() - + " to " + statusdir + "/logs"); + + " to " + statusdir + "/logs"); LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf); logRetriever.run(); } + } - if (proc.exitValue() != 0) { + private void updateJobStateToDoneAndWriteExitValue(Configuration conf, + String statusdir, String jobId, int exitCode) throws IOException { + writeExitValue(conf, exitCode, statusdir); + JobState state = new JobState(jobId, conf); + state.setExitValue(exitCode); + state.setCompleteStatus("done"); + state.close(); + + if (exitCode != 0) { LOG.info("templeton: job failed with exit code " - + proc.exitValue()); + + exitCode); } else { LOG.info("templeton: job completed with exit code 0"); } } + /** + * Updates the job state percent and childid in the templeton storage. Update only + * takes place for non-null values. + */ + private static void updateJobStatePercentAndChildId(Configuration conf, + String jobId, String percent, String childid) { + JobState state = null; + try { + if (percent != null || childid != null) { + state = new JobState(jobId, conf); + if (percent != null) { + state.setPercentComplete(percent); + } + if (childid != null) { + JobState childState = new JobState(childid, conf); + childState.setParent(jobId); + state.addChild(childid); + state.close(); + } + } + } catch (IOException e) { + LOG.error("templeton: state error: ", e); + } finally { + if (state != null) { + try { + state.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + } + private void executeWatcher(ExecutorService pool, Configuration conf, JobID jobid, InputStream in, String statusdir, String name) throws IOException { Watcher w = new Watcher(conf, jobid, in, statusdir, name); @@ -414,34 +541,9 @@ public void run() { String line; while ((line = reader.readLine()) != null) { writer.println(line); - JobState state = null; - try { - String percent = TempletonUtils.extractPercentComplete(line); - String childid = TempletonUtils.extractChildJobId(line); - - if (percent != null || childid != null) { - state = new JobState(jobid.toString(), conf); - if (percent != null) { - state.setPercentComplete(percent); - } - if (childid != null) { - JobState childState = new JobState(childid, conf); - childState.setParent(jobid.toString()); - state.addChild(childid); - state.close(); - } - } - } catch (IOException e) { - LOG.error("templeton: state error: ", e); - } finally { - if (state != null) { - try { - state.close(); - } catch (IOException e) { - LOG.warn(e); - } - } - } + String percent = TempletonUtils.extractPercentComplete(line); + String childid = TempletonUtils.extractChildJobId(line); + updateJobStatePercentAndChildId(conf, jobid.toString(), percent, childid); } writer.flush(); if(out != System.err && out != System.out) { diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java b/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java index 8b165f3..367ea60 100644 --- a/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java +++ b/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.util.HashSet; +import java.util.Set; /** * This is in org.apache.hadoop.mapred package because it relies on @@ -109,5 +111,13 @@ public void addCacheFile(URI uri, Job job) { public void killJobs(String tag, long timestamp) { return; } + /** + * Get jobs is only supported on hadoop 2.0+. + */ + @Override + public Set getJobs(String tag, long timestamp) + { + return new HashSet(); + } } diff --git a/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java b/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java index dd27cce..c85a739 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java @@ -21,9 +21,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; - import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -180,6 +180,22 @@ public void killJobs(String tag, long timestamp) { } /** + * Returns all jobs tagged with the given tag that have been started after the + * given timestamp. Returned jobIds are MapReduce JobIds. + */ + @Override + public Set getJobs(String tag, long timestamp) { + Set childYarnJobs = getYarnChildJobs(tag, timestamp); + Set childJobs = new HashSet(); + for(ApplicationId id : childYarnJobs) { + // Convert to a MapReduce job id + String childJobId = TypeConverter.fromYarn(id).toString(); + childJobs.add(childJobId); + } + return childJobs; + } + + /** * Queries RM for the list of applications with the given tag that have started * after the given timestamp. */ diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 5b7e7f6..74785e5 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -393,6 +393,11 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, * given timestamp. */ public void killJobs(String tag, long timestamp); + /** + * Returns all jobs tagged with the given tag that have been started after the + * given timestamp. Returned jobIds are MapReduce JobIds. + */ + public Set getJobs(String tag, long timestamp); } /**