diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index 23b1c4f..12ad517 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -82,6 +82,10 @@ public EnqueueBean run(String user, Map userArgs, args.add("--hiveconf"); args.add(JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ); + //add mapreduce job tag placeholder + args.add("--hiveconf"); + args.add(TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER); + for (String prop : appConf.hiveProps()) { args.add("--hiveconf"); args.add(TempletonUtils.quoteForWindows(prop)); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 41b1dc5..ebe1179 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -98,6 +98,10 @@ public EnqueueBean run(String user, Map userArgs, String jar, St args.add("-D"); args.add(TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + //add mapreduce job tag placeholder + args.add("-D"); + args.add(TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER); + for (String d : defines) { args.add("-D"); args.add(TempletonUtils.quoteForWindows(d)); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index 04a5c6f..634bc02 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -136,6 +136,8 @@ public String run() throws Exception { Boolean.toString(enablelog)); addDef(args, TempletonControllerJob.JOB_TYPE, jobType.toString()); + addDef(args, TempletonControllerJob.TEMPLETON_JOB_LAUNCH_TIME_NAME, + Long.toString(System.currentTimeMillis())); // Hadoop queue information addDef(args, "mapred.job.queue.name", appConf.hadoopQueueName()); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index 04e061d..a07f66a 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -126,6 +126,9 @@ public EnqueueBean run(String user, Map userArgs, //the token file location should be first argument of pig args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + //add mapreduce job tag placeholder + args.add("-D" + TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER); + for (String pigArg : pigArgs) { args.add(TempletonUtils.quoteForWindows(pigArg)); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java index 645acb3..ac8b57d 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java @@ -71,6 +71,9 @@ public EnqueueBean run(String user, //the token file location should be right after the tool argument if (i == 0 && !temArgs[i].startsWith("--")) { args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + + //also add mapreduce job tag placeholder + args.add("-D" + TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER); } } } else if (TempletonUtils.isset(optionsFile)) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java index 5f6c18c..b517d41 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java @@ -24,6 +24,7 @@ public static final String ENABLE_LOG = "templeton.enablelog"; public static final String JOB_TYPE = "templeton.jobtype"; public static final String JAR_ARGS_NAME = "templeton.args"; + public static final String TEMPLETON_JOB_LAUNCH_TIME_NAME = "templeton.job.launch.time"; public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath"; public static final String OVERRIDE_CONTAINER_LOG4J_PROPS = "override.containerLog4j"; //name of file @@ -43,6 +44,11 @@ "__MR_JOB_CREDENTIALS_OPTION=WEBHCAT_TOKEN_FILE_LOCATION__"; public static final String TOKEN_FILE_ARG_PLACEHOLDER_TEZ = "__TEZ_CREDENTIALS_OPTION=WEBHCAT_TOKEN_FILE_LOCATION_TEZ__"; + // MRv2 job tag used to identify Templeton launcher child jobs + public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags"; + public static final String MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER = + "__MR_JOB_TAGS_OPTION=MR_JOB_TAGS_JOBID__"; + /** * constants needed for Pig job submission * The string values here are what Pig expects to see in it's environment diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java index a1dfee8..72922f9 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java @@ -24,10 +24,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hive.hcatalog.templeton.BadParam; @@ -89,6 +92,11 @@ private static void handlePigEnvVars(Configuration conf, Map env protected Process startJob(Context context, String user, String overrideClasspath) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); + + // Kill previously launched child MR jobs started by this launcher to prevent having + // same jobs running side-by-side + killLauncherChildJobs(conf, context.getJobID().toString()); + copyLocal(COPY_NAME, conf); String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME)); @@ -103,12 +111,54 @@ protected Process startJob(Context context, String user, String overrideClasspat List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER, "mapreduce.job.credentials.binary"); handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ, "tez.credentials.path"); + handleMapReduceJobTag(jarArgsList, JobSubmissionConstants.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER, + JobSubmissionConstants.MAPREDUCE_JOB_TAGS, context.getJobID().toString()); boolean overrideLog4jProps = conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS) == null ? false : Boolean.valueOf(conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS)); return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env, overrideLog4jProps); } /** + * Kills child jobs of this launcher that have been tagged with this job's ID. + */ + private void killLauncherChildJobs(Configuration conf, String jobId) throws IOException { + // Extract the launcher job submit/start time and use that to scope down + // the search interval when we look for child jobs + long startTime = getTempletonLaunchTime(conf); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + WebHCatJTShim tracker = ShimLoader.getHadoopShims().getWebHCatShim(conf, ugi); + try { + tracker.killJobs(jobId, startTime); + } finally { + tracker.close(); + } + } + + /** + * Retrieves the templeton launcher job submit time from the configuration. + * If not available throws. + */ + private long getTempletonLaunchTime(Configuration conf) { + long startTime = 0L; + try { + String launchTimeStr = conf.get(JobSubmissionConstants.TEMPLETON_JOB_LAUNCH_TIME_NAME); + LOG.info("Launch time = " + launchTimeStr); + if (launchTimeStr != null && launchTimeStr.length() > 0) { + startTime = Long.parseLong(launchTimeStr); + } + } catch(NumberFormatException nfe) { + throw new RuntimeException("Could not parse Templeton job launch time", nfe); + } + + if (startTime == 0L) { + throw new RuntimeException(String.format("Launch time property '%s' not found", + JobSubmissionConstants.TEMPLETON_JOB_LAUNCH_TIME_NAME)); + } + + return startTime; + } + + /** * Replace placeholder with actual "prop=file". This is done multiple times (possibly) since * Tez and MR use different property names */ @@ -144,6 +194,27 @@ private static void handleTokenFile(List jarArgsList, String tokenPlaceH } } + /** + * Replace the placeholder mapreduce tags with our MR jobid so that all child jobs + * get tagged with it. This is used on launcher task restart to prevent from having + * same jobs running in parallel. + */ + private static void handleMapReduceJobTag(List jarArgsList, String placeholder, + String mapReduceJobTagsProp, String currentJobId) throws IOException { + String arg = String.format("%s=%s", mapReduceJobTagsProp, currentJobId); + for(int i = 0; i < jarArgsList.size(); i++) { + if (jarArgsList.get(i).contains(placeholder)) { + String newArg = jarArgsList.get(i).replace(placeholder, arg); + jarArgsList.set(i, newArg); + return; + } + } + + // Unexpected error, placeholder tag is not found, throw + throw new RuntimeException( + String.format("Unexpected Error: Tag '%s' not found in the list of launcher args", placeholder)); + } + private void copyLocal(String var, Configuration conf) throws IOException { String[] filenames = TempletonUtils.decodeArray(conf.get(var)); if (filenames != null) { diff --git shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java index d3552c1..8b165f3 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java +++ shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java @@ -102,5 +102,12 @@ private InetSocketAddress getAddress(Configuration conf) { public void addCacheFile(URI uri, Job job) { DistributedCache.addCacheFile(uri, job.getConfiguration()); } + /** + * Kill jobs is only supported on hadoop 2.0+. + */ + @Override + public void killJobs(String tag, long timestamp) { + return; + } } diff --git shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java index fa8bbc1..0721890 100644 --- shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java +++ shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java @@ -24,13 +24,28 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; + import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; public class WebHCatJTShim23 implements WebHCatJTShim { private static final Log LOG = LogFactory.getLog(WebHCatJTShim23.class); private JobClient jc; + private final Configuration conf; /** * Create a connection to the Job Tracker. @@ -38,6 +53,7 @@ public WebHCatJTShim23(final Configuration conf, final UserGroupInformation ugi) throws IOException { try { + this.conf = conf; jc = ugi.doAs(new PrivilegedExceptionAction() { public JobClient run() throws IOException, InterruptedException { return ugi.doAs(new PrivilegedExceptionAction() { @@ -134,4 +150,65 @@ private RunningJob getJob(JobID jobid) throws IOException { throw ex; } } + + /** + * Kills all jobs tagged with the given tag that have been started after the + * given timestamp. + */ + @Override + public void killJobs(String tag, long timestamp) { + try { + LOG.info("Looking for jobs to kill..."); + Set childJobs = getYarnChildJobs(tag, timestamp); + if (childJobs.isEmpty()) { + LOG.info("No jobs found from"); + return; + } else { + LOG.info(String.format("Found MR jobs count: %d", childJobs.size())); + LOG.info("Killing all found jobs"); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + for (ApplicationId app: childJobs) { + LOG.info(String.format("Killing job: %s ...", app)); + yarnClient.killApplication(app); + LOG.info(String.format("Job %s killed", app)); + } + } + } catch (YarnException ye) { + throw new RuntimeException("Exception occurred while killing child job(s)", ye); + } catch (IOException ioe) { + throw new RuntimeException("Exception occurred while killing child job(s)", ioe); + } + } + + /** + * Queries RM for the list of applications with the given tag that have started + * after the given timestamp. + */ + private Set getYarnChildJobs(String tag, long timestamp) { + Set childYarnJobs = new HashSet(); + + LOG.info(String.format("Querying RM for tag = %s, starting with ts = %s", tag, timestamp)); + + GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); + gar.setScope(ApplicationsRequestScope.OWN); + gar.setStartRange(timestamp, System.currentTimeMillis()); + gar.setApplicationTags(Collections.singleton(tag)); + try { + ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + GetApplicationsResponse apps = proxy.getApplications(gar); + List appsList = apps.getApplicationList(); + for(ApplicationReport appReport : appsList) { + childYarnJobs.add(appReport.getApplicationId()); + } + } catch (IOException ioe) { + throw new RuntimeException("Exception occurred while finding child jobs", ioe); + } catch (YarnException ye) { + throw new RuntimeException("Exception occurred while finding child jobs", ye); + } + return childYarnJobs; + } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 06eb9c5..a89bdc1 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -550,6 +550,11 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte * Assumes that both parameters are not {@code null}. */ public void addCacheFile(URI uri, Job job); + /** + * Kills all jobs tagged with the given tag that have been started after the + * given timestamp. + */ + public void killJobs(String tag, long timestamp); } /**