diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index 23b1c4f..12ad517 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -82,6 +82,10 @@ public EnqueueBean run(String user, Map userArgs, args.add("--hiveconf"); args.add(JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ); + //add mapreduce job tag placeholder + args.add("--hiveconf"); + args.add(TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER); + for (String prop : appConf.hiveProps()) { args.add("--hiveconf"); args.add(TempletonUtils.quoteForWindows(prop)); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 41b1dc5..ebe1179 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -98,6 +98,10 @@ public EnqueueBean run(String user, Map userArgs, String jar, St args.add("-D"); args.add(TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + //add mapreduce job tag placeholder + args.add("-D"); + args.add(TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER); + for (String d : defines) { args.add("-D"); args.add(TempletonUtils.quoteForWindows(d)); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index 04a5c6f..4c805c5 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -28,6 +28,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.HadoopShimsSecure; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; @@ -46,6 +49,7 @@ protected String runAs = null; static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP} private boolean secureMeatastoreAccess = false; + private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*"; public LauncherDelegator(AppConfig appConf) { super(appConf); @@ -118,7 +122,11 @@ public String run() throws Exception { ArrayList args = new ArrayList(); args.add("-libjars"); - args.add(appConf.libJars()); + + // Include shim and admin specified libjars + String libJars = String.format("%s,%s", getShimLibjars(), appConf.libJars()); + args.add(libJars); + addCacheFiles(args, appConf); // Hadoop vars @@ -136,6 +144,8 @@ public String run() throws Exception { Boolean.toString(enablelog)); addDef(args, TempletonControllerJob.JOB_TYPE, jobType.toString()); + addDef(args, TempletonControllerJob.TEMPLETON_JOB_LAUNCH_TIME_NAME, + Long.toString(System.currentTimeMillis())); // Hadoop queue information addDef(args, "mapred.job.queue.name", appConf.hadoopQueueName()); @@ -147,6 +157,32 @@ public String run() throws Exception { return args; } + /** + * Dynamically determine the list of hive shim jars that need to be added + * to the Templeton launcher job classpath. + */ + private String getShimLibjars() { + WebHCatJTShim shim = null; + try { + shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, UserGroupInformation.getCurrentUser()); + } catch (IOException e) { + throw new RuntimeException("Failed to get WebHCatShim", e); + } + + // Besides the HiveShims jar which is Hadoop version dependent we also + // always need to include hive shims common jars. + Path shimCommonJar = new Path( + TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN)); + Path shimCommonSecureJar = new Path( + TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN)); + Path shimJar = new Path( + TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN)); + + return String.format( + "%s,%s,%s", + shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString()); + } + // Storage vars private void addStorageVars(List args) { addDef(args, TempletonStorage.STORAGE_CLASS, diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index 04e061d..a07f66a 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -126,6 +126,9 @@ public EnqueueBean run(String user, Map userArgs, //the token file location should be first argument of pig args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + //add mapreduce job tag placeholder + args.add("-D" + TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER); + for (String pigArg : pigArgs) { args.add(TempletonUtils.quoteForWindows(pigArg)); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java index adcd917..24b45ef 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java @@ -69,9 +69,10 @@ public EnqueueBean run(String user, for (int i = 0; i < temArgs.length; i++) { args.add(TempletonUtils.quoteForWindows(temArgs[i])); - // The token file location should be right after the tool argument + // The token file location and mapreduce job tag should be right after the tool argument if (i == 0 && !temArgs[i].startsWith("--")) { args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + args.add("-D" + TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER); } } } else if (TempletonUtils.isset(optionsFile)) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java index a6355a6..ccf6107 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java @@ -24,6 +24,7 @@ public static final String ENABLE_LOG = "templeton.enablelog"; public static final String JOB_TYPE = "templeton.jobtype"; public static final String JAR_ARGS_NAME = "templeton.args"; + public static final String TEMPLETON_JOB_LAUNCH_TIME_NAME = "templeton.job.launch.time"; public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath"; public static final String STDOUT_FNAME = "stdout"; public static final String STDERR_FNAME = "stderr"; @@ -40,6 +41,14 @@ "__MR_JOB_CREDENTIALS_OPTION=WEBHCAT_TOKEN_FILE_LOCATION__"; public static final String TOKEN_FILE_ARG_PLACEHOLDER_TEZ = "__TEZ_CREDENTIALS_OPTION=WEBHCAT_TOKEN_FILE_LOCATION_TEZ__"; + // MRv2 job tag used to identify Templeton launcher child jobs. Each child job + // will be tagged with the parent jobid so that on launcher task restart, all + // previously running child jobs can be killed before the child job is launched + // again. + public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags"; + public static final String MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER = + "__MR_JOB_TAGS_OPTION=MR_JOB_TAGS_JOBID__"; + /** * constants needed for Pig job submission * The string values here are what Pig expects to see in it's environment diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java index 556ee62..fb9d767 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java @@ -24,10 +24,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hive.hcatalog.templeton.BadParam; @@ -89,6 +92,11 @@ private static void handlePigEnvVars(Configuration conf, Map env protected Process startJob(Context context, String user, String overrideClasspath) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); + + // Kill previously launched child MR jobs started by this launcher to prevent having + // same jobs running side-by-side + killLauncherChildJobs(conf, context.getJobID().toString()); + copyLocal(COPY_NAME, conf); String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME)); @@ -103,10 +111,52 @@ protected Process startJob(Context context, String user, String overrideClasspat List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER, "mapreduce.job.credentials.binary"); handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ, "tez.credentials.path"); + handleMapReduceJobTag(jarArgsList, JobSubmissionConstants.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER, + JobSubmissionConstants.MAPREDUCE_JOB_TAGS, context.getJobID().toString()); return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env); } /** + * Kills child jobs of this launcher that have been tagged with this job's ID. + */ + private void killLauncherChildJobs(Configuration conf, String jobId) throws IOException { + // Extract the launcher job submit/start time and use that to scope down + // the search interval when we look for child jobs + long startTime = getTempletonLaunchTime(conf); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + WebHCatJTShim tracker = ShimLoader.getHadoopShims().getWebHCatShim(conf, ugi); + try { + tracker.killJobs(jobId, startTime); + } finally { + tracker.close(); + } + } + + /** + * Retrieves the templeton launcher job submit time from the configuration. + * If not available throws. + */ + private long getTempletonLaunchTime(Configuration conf) { + long startTime = 0L; + try { + String launchTimeStr = conf.get(JobSubmissionConstants.TEMPLETON_JOB_LAUNCH_TIME_NAME); + LOG.info("Launch time = " + launchTimeStr); + if (launchTimeStr != null && launchTimeStr.length() > 0) { + startTime = Long.parseLong(launchTimeStr); + } + } catch(NumberFormatException nfe) { + throw new RuntimeException("Could not parse Templeton job launch time", nfe); + } + + if (startTime == 0L) { + throw new RuntimeException(String.format("Launch time property '%s' not found", + JobSubmissionConstants.TEMPLETON_JOB_LAUNCH_TIME_NAME)); + } + + return startTime; + } + + /** * Replace placeholder with actual "prop=file". This is done multiple times (possibly) since * Tez and MR use different property names */ @@ -142,6 +192,27 @@ private static void handleTokenFile(List jarArgsList, String tokenPlaceH } } + /** + * Replace the placeholder mapreduce tags with our MR jobid so that all child jobs + * get tagged with it. This is used on launcher task restart to prevent from having + * same jobs running in parallel. + */ + private static void handleMapReduceJobTag(List jarArgsList, String placeholder, + String mapReduceJobTagsProp, String currentJobId) throws IOException { + String arg = String.format("%s=%s", mapReduceJobTagsProp, currentJobId); + for(int i = 0; i < jarArgsList.size(); i++) { + if (jarArgsList.get(i).contains(placeholder)) { + String newArg = jarArgsList.get(i).replace(placeholder, arg); + jarArgsList.set(i, newArg); + return; + } + } + + // Unexpected error, placeholder tag is not found, throw + throw new RuntimeException( + String.format("Unexpected Error: Tag '%s' not found in the list of launcher args", placeholder)); + } + private void copyLocal(String var, Configuration conf) throws IOException { String[] filenames = TempletonUtils.decodeArray(conf.get(var)); if (filenames != null) { diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java index fff4b68..42008b6 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java @@ -25,9 +25,11 @@ import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; +import java.net.URLDecoder; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,7 +38,6 @@ import javax.ws.rs.core.UriBuilder; -import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -347,4 +348,33 @@ public static void addCmdForWindows(ArrayList args) { args.add("call"); } } + + /** + * Find a jar that contains a class of the same name and which + * file name matches the given pattern. + * + * @param clazz the class to find. + * @param fileNamePattern regex pattern that must match the jar full path + * @return a jar file that contains the class, or null + */ + public static String findContainingJar(Class clazz, String fileNamePattern) { + ClassLoader loader = clazz.getClassLoader(); + String classFile = clazz.getName().replaceAll("\\.", "/") + ".class"; + try { + for(final Enumeration itr = loader.getResources(classFile); + itr.hasMoreElements();) { + final URL url = itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (fileNamePattern == null || toReturn.matches(fileNamePattern)) { + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } } diff --git hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java index 8b46d38..8eacda8 100644 --- hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java +++ hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java @@ -24,6 +24,8 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.HadoopShimsSecure; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; import org.junit.After; import org.junit.Assert; @@ -309,4 +311,14 @@ public void testPropertiesParsing() throws Exception { StringUtils.unEscapeString(props[i]), StringUtils.unEscapeString(newProps[i])); } } + + @Test + public void testFindContainingJar() throws Exception { + String result = TempletonUtils.findContainingJar(ShimLoader.class, ".*hive-shims.*"); + Assert.assertNotNull(result); + result = TempletonUtils.findContainingJar(HadoopShimsSecure.class, ".*hive-shims.*"); + Assert.assertNotNull(result); + result = TempletonUtils.findContainingJar(HadoopShimsSecure.class, ".*unknownjar.*"); + Assert.assertNull(result); + } } diff --git shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java index d3552c1..8b165f3 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java +++ shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java @@ -102,5 +102,12 @@ private InetSocketAddress getAddress(Configuration conf) { public void addCacheFile(URI uri, Job job) { DistributedCache.addCacheFile(uri, job.getConfiguration()); } + /** + * Kill jobs is only supported on hadoop 2.0+. + */ + @Override + public void killJobs(String tag, long timestamp) { + return; + } } diff --git shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java index 5a728b2..dd27cce 100644 --- shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java +++ shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java @@ -24,13 +24,28 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; + import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; public class WebHCatJTShim23 implements WebHCatJTShim { private static final Log LOG = LogFactory.getLog(WebHCatJTShim23.class); private JobClient jc; + private final Configuration conf; /** * Create a connection to the Job Tracker. @@ -38,6 +53,7 @@ public WebHCatJTShim23(final Configuration conf, final UserGroupInformation ugi) throws IOException { try { + this.conf = conf; jc = ugi.doAs(new PrivilegedExceptionAction() { public JobClient run() throws IOException, InterruptedException { //create this in doAs() so that it gets a security context based passed in 'ugi' @@ -130,4 +146,65 @@ private RunningJob getJob(JobID jobid) throws IOException { throw ex; } } + + /** + * Kills all jobs tagged with the given tag that have been started after the + * given timestamp. + */ + @Override + public void killJobs(String tag, long timestamp) { + try { + LOG.info("Looking for jobs to kill..."); + Set childJobs = getYarnChildJobs(tag, timestamp); + if (childJobs.isEmpty()) { + LOG.info("No jobs found from"); + return; + } else { + LOG.info(String.format("Found MR jobs count: %d", childJobs.size())); + LOG.info("Killing all found jobs"); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + for (ApplicationId app: childJobs) { + LOG.info(String.format("Killing job: %s ...", app)); + yarnClient.killApplication(app); + LOG.info(String.format("Job %s killed", app)); + } + } + } catch (YarnException ye) { + throw new RuntimeException("Exception occurred while killing child job(s)", ye); + } catch (IOException ioe) { + throw new RuntimeException("Exception occurred while killing child job(s)", ioe); + } + } + + /** + * Queries RM for the list of applications with the given tag that have started + * after the given timestamp. + */ + private Set getYarnChildJobs(String tag, long timestamp) { + Set childYarnJobs = new HashSet(); + + LOG.info(String.format("Querying RM for tag = %s, starting with ts = %s", tag, timestamp)); + + GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); + gar.setScope(ApplicationsRequestScope.OWN); + gar.setStartRange(timestamp, System.currentTimeMillis()); + gar.setApplicationTags(Collections.singleton(tag)); + try { + ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + GetApplicationsResponse apps = proxy.getApplications(gar); + List appsList = apps.getApplicationList(); + for(ApplicationReport appReport : appsList) { + childYarnJobs.add(appReport.getApplicationId()); + } + } catch (IOException ioe) { + throw new RuntimeException("Exception occurred while finding child jobs", ioe); + } catch (YarnException ye) { + throw new RuntimeException("Exception occurred while finding child jobs", ye); + } + return childYarnJobs; + } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 299e918..eefd5e5 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -581,6 +581,11 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, * Assumes that both parameters are not {@code null}. */ public void addCacheFile(URI uri, Job job); + /** + * Kills all jobs tagged with the given tag that have been started after the + * given timestamp. + */ + public void killJobs(String tag, long timestamp); } /**