diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 2b37e7f..8244274 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -96,6 +96,7 @@ public static final String EXEC_MAX_PROCS_NAME = "templeton.exec.max-procs"; public static final String EXEC_TIMEOUT_NAME = "templeton.exec.timeout"; public static final String HADOOP_QUEUE_NAME = "templeton.hadoop.queue.name"; + public static final String ENABLE_JOB_RECONNECT_DEFAULT = "templeton.enable.job.reconnect.default"; public static final String HADOOP_NAME = "templeton.hadoop"; public static final String HADOOP_CONF_DIR = "templeton.hadoop.conf.dir"; public static final String HCAT_NAME = "templeton.hcat"; @@ -306,6 +307,7 @@ private boolean loadOneClasspathConfig(String fname) { public String libJars() { return get(LIB_JARS_NAME); } public String hadoopQueueName() { return get(HADOOP_QUEUE_NAME); } + public String enableJobReconnectDefault() { return get(ENABLE_JOB_RECONNECT_DEFAULT); } public String clusterHadoop() { return get(HADOOP_NAME); } public String clusterHcat() { return get(HCAT_NAME); } public String clusterPython() { return get(PYTHON_NAME); } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index 7b09b8a..0ea964f 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -46,25 +46,28 @@ public HiveDelegator(AppConfig appConf) { public EnqueueBean run(String user, Map userArgs, String execute, String srcFile, List defines, List hiveArgs, String otherFiles, - String statusdir, String callback, String completedUrl, boolean enablelog) + String statusdir, String callback, String completedUrl, boolean enablelog, + Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir, - completedUrl, enablelog); + completedUrl, enablelog, enableJobReconnect); return enqueueController(user, userArgs, callback, args); } private List makeArgs(String execute, String srcFile, List defines, List hiveArgs, String otherFiles, - String statusdir, String completedUrl, boolean enablelog) + String statusdir, String completedUrl, boolean enablelog, + Boolean enableJobReconnect) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { - args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl, enablelog)); + args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl, + enablelog, enableJobReconnect)); args.add("--"); TempletonUtils.addCmdForWindows(args); addHiveMetaStoreTokenArg(); @@ -117,7 +120,7 @@ public EnqueueBean run(String user, Map userArgs, private List makeBasicArgs(String execute, String srcFile, String otherFiles, String statusdir, String completedUrl, - boolean enablelog) + boolean enablelog, Boolean enableJobReconnect) throws URISyntaxException, IOException, InterruptedException { @@ -135,7 +138,7 @@ public EnqueueBean run(String user, Map userArgs, } args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, - enablelog, JobType.HIVE)); + enablelog, enableJobReconnect, JobType.HIVE)); if (appConf.hiveArchive() != null && !appConf.hiveArchive().equals("")) { diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index e5832a8..10ff2c0 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -44,13 +44,13 @@ public EnqueueBean run(String user, Map userArgs, String jar, St List jarArgs, List defines, String statusdir, String callback, boolean usesHcatalog, String completedUrl, - boolean enablelog, JobType jobType) + boolean enablelog, Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, - statusdir, usesHcatalog, completedUrl, enablelog, jobType); + statusdir, usesHcatalog, completedUrl, enablelog, enableJobReconnect, jobType); return enqueueController(user, userArgs, callback, args); } @@ -59,7 +59,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St String libjars, String files, List jarArgs, List defines, String statusdir, boolean usesHcatalog, String completedUrl, - boolean enablelog, JobType jobType) + boolean enablelog, Boolean enableJobReconnect, JobType jobType) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { @@ -67,7 +67,7 @@ public EnqueueBean run(String user, Map userArgs, String jar, St allFiles.add(TempletonUtils.hadoopFsFilename(jar, appConf, runAs)); args.addAll(makeLauncherArgs(appConf, statusdir, - completedUrl, allFiles, enablelog, jobType)); + completedUrl, allFiles, enablelog, enableJobReconnect, jobType)); args.add("--"); TempletonUtils.addCmdForWindows(args); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index 71a3107..82e5cb8 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -118,6 +118,7 @@ public String run() throws Exception { String completedUrl, List copyFiles, boolean enablelog, + Boolean enableJobReconnect, JobType jobType) { ArrayList args = new ArrayList(); @@ -150,6 +151,19 @@ public String run() throws Exception { addDef(args, TempletonControllerJob.TEMPLETON_JOB_LAUNCH_TIME_NAME, Long.toString(System.currentTimeMillis())); + if (enableJobReconnect == null) { + // If enablejobreconnect param was not passed by a user, use a cluster + // wide default + if (appConf.enableJobReconnectDefault() != null) { + enableJobReconnect = Boolean.parseBoolean(appConf.enableJobReconnectDefault()); + } else { + // default is false + enableJobReconnect = false; + } + } + addDef(args, TempletonControllerJob.ENABLE_JOB_RECONNECT, + Boolean.toString(enableJobReconnect)); + // Hadoop queue information addDef(args, "mapred.job.queue.name", appConf.hadoopQueueName()); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index a07f66a..2679a97 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -48,13 +48,14 @@ public EnqueueBean run(String user, Map userArgs, String execute, String srcFile, List pigArgs, String otherFiles, String statusdir, String callback, - boolean usesHcatalog, String completedUrl, boolean enablelog) + boolean usesHcatalog, String completedUrl, boolean enablelog, + Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(execute, srcFile, pigArgs, - otherFiles, statusdir, usesHcatalog, completedUrl, enablelog); + otherFiles, statusdir, usesHcatalog, completedUrl, enablelog, enableJobReconnect); return enqueueController(user, userArgs, callback, args); } @@ -68,6 +69,8 @@ public EnqueueBean run(String user, Map userArgs, * @param usesHcatalog whether the command uses hcatalog/needs to connect * to hive metastore server * @param completedUrl call back url + * @param enablelog + * @param enableJobReconnect * @return list of arguments * @throws BadParam * @throws IOException @@ -76,7 +79,8 @@ public EnqueueBean run(String user, Map userArgs, private List makeArgs(String execute, String srcFile, List pigArgs, String otherFiles, String statusdir, boolean usesHcatalog, - String completedUrl, boolean enablelog) + String completedUrl, boolean enablelog, + Boolean enableJobReconnect) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); //check if the REST command specified explicitly to use hcatalog @@ -93,7 +97,8 @@ public EnqueueBean run(String user, Map userArgs, allFiles.addAll(Arrays.asList(ofs)); } - args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, JobType.PIG)); + args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, + enableJobReconnect, JobType.PIG)); boolean shipPigTar = appConf.pigArchive() != null && !appConf.pigArchive().equals(""); boolean shipHiveTar = needsMetastoreAccess && appConf.hiveArchive() != null && !appConf.hiveArchive().equals(""); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 80d9d2c..27b8e38 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -647,7 +647,8 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, @FormParam("arg") List args, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, - @FormParam("enablelog") boolean enablelog) + @FormParam("enablelog") boolean enablelog, + @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); @@ -671,12 +672,13 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, userArgs.put("statusdir", statusdir); userArgs.put("callback", callback); userArgs.put("enablelog", Boolean.toString(enablelog)); + userArgs.put("enablejobreconnect", enablejobreconnect); checkEnableLogPrerequisite(enablelog, statusdir); StreamingDelegator d = new StreamingDelegator(appConf); return d.run(getDoAsUser(), userArgs, inputs, inputreader, output, mapper, reducer, combiner, fileList, files, defines, cmdenvs, args, - statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING); + statusdir, callback, getCompletedUrl(), enablelog, enablejobreconnect, JobType.STREAMING); } /** @@ -699,7 +701,8 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, @FormParam("usehcatalog") boolean usesHcatalog, - @FormParam("enablelog") boolean enablelog) + @FormParam("enablelog") boolean enablelog, + @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); @@ -717,6 +720,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, userArgs.put("statusdir", statusdir); userArgs.put("callback", callback); userArgs.put("enablelog", Boolean.toString(enablelog)); + userArgs.put("enablejobreconnect", enablejobreconnect); checkEnableLogPrerequisite(enablelog, statusdir); @@ -724,7 +728,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, return d.run(getDoAsUser(), userArgs, jar, mainClass, libjars, files, args, defines, - statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, JobType.JAR); + statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, enablejobreconnect, JobType.JAR); } /** @@ -747,7 +751,8 @@ public EnqueueBean pig(@FormParam("execute") String execute, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, @FormParam("usehcatalog") boolean usesHcatalog, - @FormParam("enablelog") boolean enablelog) + @FormParam("enablelog") boolean enablelog, + @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); @@ -765,6 +770,7 @@ public EnqueueBean pig(@FormParam("execute") String execute, userArgs.put("statusdir", statusdir); userArgs.put("callback", callback); userArgs.put("enablelog", Boolean.toString(enablelog)); + userArgs.put("enablejobreconnect", enablejobreconnect); checkEnableLogPrerequisite(enablelog, statusdir); @@ -772,7 +778,7 @@ public EnqueueBean pig(@FormParam("execute") String execute, return d.run(getDoAsUser(), userArgs, execute, srcFile, pigArgs, otherFiles, - statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog); + statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, enablejobreconnect); } /** @@ -784,6 +790,8 @@ public EnqueueBean pig(@FormParam("execute") String execute, * @param statusdir where the stderr/stdout of templeton controller job goes * @param callback URL which WebHCat will call when the sqoop job finishes * @param enablelog whether to collect mapreduce log into statusdir/logs + * @param enablejobreconnect whether to reconnect to a running child job on templeton + * controller job retry */ @POST @Path("sqoop") @@ -794,7 +802,8 @@ public EnqueueBean sqoop(@FormParam("command") String command, @FormParam("files") String otherFiles, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, - @FormParam("enablelog") boolean enablelog) + @FormParam("enablelog") boolean enablelog, + @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, IOException, InterruptedException { verifyUser(); @@ -814,9 +823,10 @@ public EnqueueBean sqoop(@FormParam("command") String command, userArgs.put("statusdir", statusdir); userArgs.put("callback", callback); userArgs.put("enablelog", Boolean.toString(enablelog)); + userArgs.put("enablejobreconnect", enablejobreconnect); SqoopDelegator d = new SqoopDelegator(appConf); return d.run(getDoAsUser(), userArgs, command, optionsFile, otherFiles, - statusdir, callback, getCompletedUrl(), enablelog, libdir); + statusdir, callback, getCompletedUrl(), enablelog, enablejobreconnect, libdir); } /** @@ -833,6 +843,8 @@ public EnqueueBean sqoop(@FormParam("command") String command, * @param statusdir where the stderr/stdout of templeton controller job goes * @param callback URL which WebHCat will call when the hive job finishes * @param enablelog whether to collect mapreduce log into statusdir/logs + * @param enablejobreconnect whether to reconnect to a running child job on templeton + * controller job retry */ @POST @Path("hive") @@ -844,7 +856,8 @@ public EnqueueBean hive(@FormParam("execute") String execute, @FormParam("define") List defines, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, - @FormParam("enablelog") boolean enablelog) + @FormParam("enablelog") boolean enablelog, + @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); @@ -862,12 +875,13 @@ public EnqueueBean hive(@FormParam("execute") String execute, userArgs.put("statusdir", statusdir); userArgs.put("callback", callback); userArgs.put("enablelog", Boolean.toString(enablelog)); + userArgs.put("enablejobreconnect", enablejobreconnect); checkEnableLogPrerequisite(enablelog, statusdir); HiveDelegator d = new HiveDelegator(appConf); return d.run(getDoAsUser(), userArgs, execute, srcFile, defines, hiveArgs, otherFiles, - statusdir, callback, getCompletedUrl(), enablelog); + statusdir, callback, getCompletedUrl(), enablelog, enablejobreconnect); } /** diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java index b205bda..9002482 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java @@ -45,9 +45,10 @@ public SqoopDelegator(AppConfig appConf) { } public EnqueueBean run(String user, - Map userArgs, String command, - String optionsFile, String otherFiles, String statusdir, - String callback, String completedUrl, boolean enablelog, String libdir) + Map userArgs, String command, + String optionsFile, String otherFiles, String statusdir, + String callback, String completedUrl, boolean enablelog, + Boolean enableJobReconnect, String libdir) throws NotAuthorizedException, BadParam, BusyException, QueueException, IOException, InterruptedException { @@ -59,17 +60,19 @@ public EnqueueBean run(String user, } runAs = user; List args = makeArgs(command, optionsFile, otherFiles, statusdir, - completedUrl, enablelog, libdir); + completedUrl, enablelog, enableJobReconnect, libdir); return enqueueController(user, userArgs, callback, args); } private List makeArgs(String command, String optionsFile, String otherFiles, - String statusdir, String completedUrl, boolean enablelog, String libdir) + String statusdir, String completedUrl, boolean enablelog, + Boolean enableJobReconnect, String libdir) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { - args.addAll(makeBasicArgs(optionsFile, otherFiles, statusdir, completedUrl, enablelog, libdir)); + args.addAll(makeBasicArgs(optionsFile, otherFiles, statusdir, completedUrl, enablelog, + enableJobReconnect, libdir)); args.add("--"); TempletonUtils.addCmdForWindows(args); args.add(appConf.sqoopPath()); @@ -112,7 +115,8 @@ public EnqueueBean run(String user, } private List makeBasicArgs(String optionsFile, String otherFiles, - String statusdir, String completedUrl, boolean enablelog, String libdir) + String statusdir, String completedUrl, boolean enablelog, + Boolean enableJobReconnect, String libdir) throws URISyntaxException, FileNotFoundException, IOException, InterruptedException { @@ -152,7 +156,7 @@ public EnqueueBean run(String user, } } args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, - enablelog, JobType.SQOOP)); + enablelog, enableJobReconnect, JobType.SQOOP)); if(TempletonUtils.isset(appConf.sqoopArchive())) { args.add("-archives"); args.add(appConf.sqoopArchive()); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index 622440b..f487d51 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -48,6 +48,7 @@ public EnqueueBean run(String user, Map userArgs, String callback, String completedUrl, boolean enableLog, + Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { @@ -58,7 +59,7 @@ public EnqueueBean run(String user, Map userArgs, return d.run(user, userArgs, appConf.streamingJar(), null, null, files, args, defines, - statusdir, callback, false, completedUrl, enableLog, jobType); + statusdir, callback, false, completedUrl, enableLog, enableJobReconnect, jobType); } private List makeArgs(List inputs, diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java index d3dc3f7..4e4fcd4 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java @@ -22,6 +22,7 @@ public static final String COPY_NAME = "templeton.copy"; public static final String STATUSDIR_NAME = "templeton.statusdir"; public static final String ENABLE_LOG = "templeton.enablelog"; + public static final String ENABLE_JOB_RECONNECT = "templeton.enablejobreconnect"; public static final String JOB_TYPE = "templeton.jobtype"; public static final String JAR_ARGS_NAME = "templeton.args"; public static final String TEMPLETON_JOB_LAUNCH_TIME_NAME = "templeton.job.launch.time"; @@ -31,6 +32,7 @@ public static final String EXIT_FNAME = "exit"; public static final int WATCHER_TIMEOUT_SECS = 10; public static final int KEEP_ALIVE_MSEC = 60 * 1000; + public static final int POLL_JOBPROGRESS_MSEC = 30 * 1000; /** * A comma-separated list of files to be added to HADOOP_CLASSPATH in * {@link org.apache.hive.hcatalog.templeton.tool.LaunchMapper}. Used to localize additional diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java index 91fe247..422e75e 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java @@ -51,6 +51,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -150,13 +151,8 @@ else if(TempletonUtils.isset(System.getenv(pathVarName))) { env.put(pathVarName, paths); } } - protected Process startJob(Context context, String user, String overrideClasspath) + protected Process startJob(Configuration conf, String jobId, String user, String overrideClasspath) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - - // Kill previously launched child MR jobs started by this launcher to prevent having - // same jobs running side-by-side - killLauncherChildJobs(conf, context.getJobID().toString()); copyLocal(COPY_NAME, conf); String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME)); @@ -175,7 +171,7 @@ protected Process startJob(Context context, String user, String overrideClasspat handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER, "mapreduce.job.credentials.binary"); handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ, "tez.credentials.path"); handleMapReduceJobTag(jarArgsList, JobSubmissionConstants.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER, - JobSubmissionConstants.MAPREDUCE_JOB_TAGS, context.getJobID().toString()); + JobSubmissionConstants.MAPREDUCE_JOB_TAGS, jobId); return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env); } @@ -289,17 +285,105 @@ private void copyLocal(String var, Configuration conf) throws IOException { } } + /** + * Checks if reconnection to an already running job is enabled and supported for a given + * job type. + */ + private boolean reconnectToRunningJobEnabledAndSupported(Configuration conf, + LauncherDelegator.JobType jobType) { + if (conf.get(ENABLE_JOB_RECONNECT) == null) { + return false; + } + + Boolean enableJobReconnect = Boolean.parseBoolean(conf.get(ENABLE_JOB_RECONNECT)); + if (!enableJobReconnect) { + return false; + } + + // Reconnect is only supported for MR and Streaming jobs at this time + return jobType.equals(LauncherDelegator.JobType.JAR) + || jobType.equals(LauncherDelegator.JobType.STREAMING); + } + + /** + * Attempts to reconnect to an already running child job of the templeton launcher. This + * is used in cases where the templeton launcher task has failed and is retried by the + * MR framework. If reconnect to the child job is possible, the method will continue + * tracking its progress until completion. + * @return Returns true if reconnect was successful, false if not supported or + * no child jobs were found. + */ + private boolean tryReconnectToRunningJob(Configuration conf, Context context, + LauncherDelegator.JobType jobType, String statusdir) throws IOException, InterruptedException { + if (!reconnectToRunningJobEnabledAndSupported(conf, jobType)) { + return false; + } + + long startTime = getTempletonLaunchTime(conf); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + WebHCatJTShim tracker = ShimLoader.getHadoopShims().getWebHCatShim(conf, ugi); + try { + Set childJobs = tracker.getJobs(context.getJobID().toString(), startTime); + if (childJobs.size() == 0) { + LOG.info("No child jobs found to reconnect with"); + return false; + } + + if (childJobs.size() > 1) { + LOG.warn(String.format( + "Found more than one child job to reconnect with: %s, skipping reconnect", + Arrays.toString(childJobs.toArray()))); + return false; + } + + String childJobIdString = childJobs.iterator().next(); + org.apache.hadoop.mapred.JobID childJobId = + org.apache.hadoop.mapred.JobID.forName(childJobIdString); + LOG.info(String.format("Reconnecting to an existing job %s", childJobIdString)); + + // Update job state with the childJob id + updateJobStatePercentAndChildId(conf, context.getJobID().toString(), null, childJobIdString); + + do { + org.apache.hadoop.mapred.JobStatus jobStatus = tracker.getJobStatus(childJobId); + if (jobStatus.isJobComplete()) { + LOG.info(String.format("Child job %s completed", childJobIdString)); + int exitCode = 0; + if (jobStatus.getRunState() != org.apache.hadoop.mapred.JobStatus.SUCCEEDED) { + exitCode = 1; + } + updateJobStateToDoneAndWriteExitValue(conf, statusdir, context.getJobID().toString(), + exitCode); + break; + } + + String percent = String.format("map %s%%, reduce %s%%", + jobStatus.mapProgress()*100, jobStatus.reduceProgress()*100); + updateJobStatePercentAndChildId(conf, context.getJobID().toString(), percent, null); + + LOG.info("KeepAlive Heart beat"); + + context.progress(); + Thread.sleep(POLL_JOBPROGRESS_MSEC); + } while (true); + + // Reconnect was successful + return true; + } + catch (IOException ex) { + LOG.error("Exception encountered in tryReconnectToRunningJob", ex); + throw ex; + } finally { + tracker.close(); + } + } + @Override public void run(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - - Process proc = startJob(context, - conf.get("user.name"), - conf.get(OVERRIDE_CLASSPATH)); - + LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE)); String statusdir = conf.get(STATUSDIR_NAME); - if (statusdir != null) { try { statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir, @@ -311,8 +395,20 @@ public void run(Context context) throws IOException, InterruptedException { } } - Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG)); - LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE)); + // Try to reconnect to a child job if one is found + if (tryReconnectToRunningJob(conf, context, jobType, statusdir)) { + return; + } + + // Kill previously launched child MR jobs started by this launcher to prevent having + // same jobs running side-by-side + killLauncherChildJobs(conf, context.getJobID().toString()); + + // Start the job + Process proc = startJob(conf, + context.getJobID().toString(), + conf.get("user.name"), + conf.get(OVERRIDE_CLASSPATH)); ExecutorService pool = Executors.newCachedThreadPool(); executeWatcher(pool, conf, context.getJobID(), @@ -328,27 +424,67 @@ public void run(Context context) throws IOException, InterruptedException { pool.shutdownNow(); } - writeExitValue(conf, proc.exitValue(), statusdir); - JobState state = new JobState(context.getJobID().toString(), conf); - state.setExitValue(proc.exitValue()); - state.setCompleteStatus("done"); - state.close(); + updateJobStateToDoneAndWriteExitValue(conf, statusdir, context.getJobID().toString(), + proc.exitValue()); + Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG)); if (enablelog && TempletonUtils.isset(statusdir)) { LOG.info("templeton: collecting logs for " + context.getJobID().toString() - + " to " + statusdir + "/logs"); + + " to " + statusdir + "/logs"); LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf); logRetriever.run(); } + } - if (proc.exitValue() != 0) { + private void updateJobStateToDoneAndWriteExitValue(Configuration conf, + String statusdir, String jobId, int exitCode) throws IOException { + writeExitValue(conf, exitCode, statusdir); + JobState state = new JobState(jobId, conf); + state.setExitValue(exitCode); + state.setCompleteStatus("done"); + state.close(); + + if (exitCode != 0) { LOG.info("templeton: job failed with exit code " - + proc.exitValue()); + + exitCode); } else { LOG.info("templeton: job completed with exit code 0"); } } + /** + * Updates the job state percent and childid in the templeton storage. Update only + * takes place for non-null values. + */ + private static void updateJobStatePercentAndChildId(Configuration conf, + String jobId, String percent, String childid) { + JobState state = null; + try { + if (percent != null || childid != null) { + state = new JobState(jobId, conf); + if (percent != null) { + state.setPercentComplete(percent); + } + if (childid != null) { + JobState childState = new JobState(childid, conf); + childState.setParent(jobId); + state.addChild(childid); + state.close(); + } + } + } catch (IOException e) { + LOG.error("templeton: state error: ", e); + } finally { + if (state != null) { + try { + state.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + } + private void executeWatcher(ExecutorService pool, Configuration conf, JobID jobid, InputStream in, String statusdir, String name) throws IOException { Watcher w = new Watcher(conf, jobid, in, statusdir, name); @@ -371,6 +507,7 @@ private void writeExitValue(Configuration conf, int exitValue, String statusdir) PrintWriter writer = new PrintWriter(out); writer.println(exitValue); writer.close(); + LOG.info("templeton: Exit value successfully written"); } } @@ -414,34 +551,9 @@ public void run() { String line; while ((line = reader.readLine()) != null) { writer.println(line); - JobState state = null; - try { - String percent = TempletonUtils.extractPercentComplete(line); - String childid = TempletonUtils.extractChildJobId(line); - - if (percent != null || childid != null) { - state = new JobState(jobid.toString(), conf); - if (percent != null) { - state.setPercentComplete(percent); - } - if (childid != null) { - JobState childState = new JobState(childid, conf); - childState.setParent(jobid.toString()); - state.addChild(childid); - state.close(); - } - } - } catch (IOException e) { - LOG.error("templeton: state error: ", e); - } finally { - if (state != null) { - try { - state.close(); - } catch (IOException e) { - LOG.warn(e); - } - } - } + String percent = TempletonUtils.extractPercentComplete(line); + String childid = TempletonUtils.extractChildJobId(line); + updateJobStatePercentAndChildId(conf, jobid.toString(), percent, childid); } writer.flush(); if(out != System.err && out != System.out) { diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java b/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java index 8b165f3..367ea60 100644 --- a/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java +++ b/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.util.HashSet; +import java.util.Set; /** * This is in org.apache.hadoop.mapred package because it relies on @@ -109,5 +111,13 @@ public void addCacheFile(URI uri, Job job) { public void killJobs(String tag, long timestamp) { return; } + /** + * Get jobs is only supported on hadoop 2.0+. + */ + @Override + public Set getJobs(String tag, long timestamp) + { + return new HashSet(); + } } diff --git a/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java b/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java index dd27cce..c85a739 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java @@ -21,9 +21,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; - import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -180,6 +180,22 @@ public void killJobs(String tag, long timestamp) { } /** + * Returns all jobs tagged with the given tag that have been started after the + * given timestamp. Returned jobIds are MapReduce JobIds. + */ + @Override + public Set getJobs(String tag, long timestamp) { + Set childYarnJobs = getYarnChildJobs(tag, timestamp); + Set childJobs = new HashSet(); + for(ApplicationId id : childYarnJobs) { + // Convert to a MapReduce job id + String childJobId = TypeConverter.fromYarn(id).toString(); + childJobs.add(childJobId); + } + return childJobs; + } + + /** * Queries RM for the list of applications with the given tag that have started * after the given timestamp. */ diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 5b7e7f6..74785e5 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -393,6 +393,11 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, * given timestamp. */ public void killJobs(String tag, long timestamp); + /** + * Returns all jobs tagged with the given tag that have been started after the + * given timestamp. Returned jobIds are MapReduce JobIds. + */ + public Set getJobs(String tag, long timestamp); } /**