diff --git webhcat/svr/src/main/java/org/apache/hadoop/mapred/TempletonJobTracker.java webhcat/svr/src/main/java/org/apache/hadoop/mapred/TempletonJobTracker.java index 4fdc2d7..138169c 100644 --- webhcat/svr/src/main/java/org/apache/hadoop/mapred/TempletonJobTracker.java +++ webhcat/svr/src/main/java/org/apache/hadoop/mapred/TempletonJobTracker.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; @@ -30,23 +31,29 @@ import org.apache.hadoop.security.UserGroupInformation; * Communicate with the JobTracker as a specific user. */ public class TempletonJobTracker { - private JobSubmissionProtocol cnx; + private final JobSubmissionProtocol cnx; /** * Create a connection to the Job Tracker. */ - public TempletonJobTracker(UserGroupInformation ugi, - InetSocketAddress addr, - Configuration conf) - throws IOException { - cnx = (JobSubmissionProtocol) - RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, - addr, - ugi, - conf, - NetUtils.getSocketFactory(conf, - JobSubmissionProtocol.class)); + public TempletonJobTracker(final InetSocketAddress addr, + final Configuration conf) + throws IOException, InterruptedException { + + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + cnx = + ugi.doAs(new PrivilegedExceptionAction() { + public JobSubmissionProtocol run () + throws IOException, InterruptedException { + return (JobSubmissionProtocol) + RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, + addr, + conf, + NetUtils.getSocketFactory(conf, + JobSubmissionProtocol.class)); + } + }); } /** diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/AppConfig.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/AppConfig.java index e74c39d..e9f14ad 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/AppConfig.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/AppConfig.java @@ -62,7 +62,7 @@ import org.apache.hcatalog.templeton.tool.ZooKeeperStorage; */ public class AppConfig extends Configuration { public static final String[] HADOOP_CONF_FILENAMES = { - "core-default.xml", "core-site.xml", "mapred-default.xml", "mapred-site.xml" + "core-default.xml", "core-site.xml", "mapred-default.xml", "mapred-site.xml", "hdfs-site.xml" }; public static final String[] HADOOP_PREFIX_VARS = { diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java index c7eb0ee..56b11ae 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java @@ -34,14 +34,13 @@ public class DeleteDelegator extends TempletonDelegator { } public QueueStatusBean run(String user, String id) - throws NotAuthorizedException, BadParam, IOException + throws NotAuthorizedException, BadParam, IOException, InterruptedException { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); TempletonJobTracker tracker = null; JobState state = null; try { - tracker = new TempletonJobTracker(ugi, - JobTracker.getAddress(appConf), + tracker = new TempletonJobTracker(JobTracker.getAddress(appConf), appConf); JobID jobid = StatusDelegator.StringToJobID(id); if (jobid == null) diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java index 249807b..3c207cb 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java @@ -23,7 +23,9 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; + import org.apache.commons.exec.ExecuteException; +import org.apache.hcatalog.templeton.tool.TempletonControllerJob; import org.apache.hcatalog.templeton.tool.TempletonUtils; /** @@ -59,8 +61,14 @@ public class HiveDelegator extends LauncherDelegator { args.addAll(makeBasicArgs(execute, srcFile, statusdir, completedUrl)); args.add("--"); args.add(appConf.hivePath()); + args.add("--service"); args.add("cli"); + + //the token file location as initial hiveconf arg + args.add("--hiveconf"); + args.add(TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + for (String prop : appConf.getStrings(AppConfig.HIVE_PROPS_NAME)) { args.add("--hiveconf"); args.add(prop); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java index 0bbb292..a2dc23e 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.exec.ExecuteException; +import org.apache.hcatalog.templeton.tool.TempletonControllerJob; import org.apache.hcatalog.templeton.tool.TempletonUtils; /** @@ -79,7 +80,9 @@ public class JarDelegator extends LauncherDelegator { args.add(TempletonUtils.hadoopFsListAsString(files, appConf, runAs)); } - + //the token file location comes after mainClass, as a -Dprop=val + args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + for (String d : defines) args.add("-D" + d); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java index 4ca3e14..e78d9b9 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java @@ -37,13 +37,13 @@ public class ListDelegator extends TempletonDelegator { } public List run(String user) - throws NotAuthorizedException, BadParam, IOException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); TempletonJobTracker tracker = null; try { - tracker = new TempletonJobTracker(ugi, - JobTracker.getAddress(appConf), - appConf); + tracker = new TempletonJobTracker(JobTracker.getAddress(appConf), + appConf); ArrayList ids = new ArrayList(); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Main.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Main.java index f3df6ea..c074452 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Main.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Main.java @@ -147,12 +147,33 @@ public class Main { ServletContextHandler root = new ServletContextHandler(server, "/"); // Add the Auth filter - root.addFilter(makeAuthFilter(), "/*", FilterMapping.REQUEST); + FilterHolder fHolder = makeAuthFilter(); + + /* + * We add filters for each of the URIs supported by templeton. + * If we added the entire sub-structure using '/*', the mapreduce + * notification cannot give the callback to templeton in secure mode. + * This is because mapreduce does not use secure credentials for + * callbacks. So jetty would fail the request as unauthorized. + */ + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*", + FilterMapping.REQUEST); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*", + FilterMapping.REQUEST); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", + FilterMapping.REQUEST); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", + FilterMapping.REQUEST); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*", + FilterMapping.REQUEST); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*", + FilterMapping.REQUEST); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*", + FilterMapping.REQUEST); // Connect Jersey ServletHolder h = new ServletHolder(new ServletContainer(makeJerseyConfig())); root.addServlet(h, "/" + SERVLET_PATH + "/*"); - // Add any redirects addRedirects(server); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java index 728b191..b318373 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.exec.ExecuteException; +import org.apache.hcatalog.templeton.tool.TempletonControllerJob; import org.apache.hcatalog.templeton.tool.TempletonUtils; /** @@ -73,6 +74,9 @@ public class PigDelegator extends LauncherDelegator { args.add("--"); args.add(appConf.pigPath()); + //the token file location should be first argument of pig + args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + args.addAll(pigArgs); if (TempletonUtils.isset(execute)) { args.add("-execute"); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Server.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Server.java index f89cf4a..29ac4b3 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Server.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Server.java @@ -679,7 +679,8 @@ public class Server { @Path("queue/{jobid}") @Produces({MediaType.APPLICATION_JSON}) public QueueStatusBean showQueueId(@PathParam("jobid") String jobid) - throws NotAuthorizedException, BadParam, IOException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException { + verifyUser(); verifyParam(jobid, ":jobid"); @@ -694,7 +695,8 @@ public class Server { @Path("queue/{jobid}") @Produces({MediaType.APPLICATION_JSON}) public QueueStatusBean deleteQueueId(@PathParam("jobid") String jobid) - throws NotAuthorizedException, BadParam, IOException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException { + verifyUser(); verifyParam(jobid, ":jobid"); @@ -709,7 +711,8 @@ public class Server { @Path("queue") @Produces({MediaType.APPLICATION_JSON}) public List showQueueList() - throws NotAuthorizedException, BadParam, IOException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException { + verifyUser(); ListDelegator d = new ListDelegator(appConf); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java index dd6aa05..2cce126 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java @@ -27,7 +27,6 @@ import org.apache.hadoop.mapred.JobProfile; import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapred.TempletonJobTracker; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hcatalog.templeton.tool.JobState; /** @@ -41,14 +40,13 @@ public class StatusDelegator extends TempletonDelegator { } public QueueStatusBean run(String user, String id) - throws NotAuthorizedException, BadParam, IOException { - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + throws NotAuthorizedException, BadParam, IOException, InterruptedException + { TempletonJobTracker tracker = null; JobState state = null; try { - tracker = new TempletonJobTracker(ugi, - JobTracker.getAddress(appConf), - appConf); + tracker = new TempletonJobTracker(JobTracker.getAddress(appConf), + appConf); JobID jobid = StatusDelegator.StringToJobID(id); if (jobid == null) throw new BadParam("Invalid jobid: " + id); diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java index 723bc65..c35f14d 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java @@ -26,6 +26,7 @@ import java.io.OutputStream; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -48,11 +49,11 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; /** * A Map Reduce job that will start another job. @@ -70,7 +71,6 @@ public class TempletonControllerJob extends Configured implements Tool { static enum ControllerCounters {SIMPLE_COUNTER} ; - public static final String COPY_NAME = "templeton.copy"; public static final String STATUSDIR_NAME = "templeton.statusdir"; public static final String JAR_ARGS_NAME = "templeton.args"; @@ -82,7 +82,11 @@ public class TempletonControllerJob extends Configured implements Tool { public static final int WATCHER_TIMEOUT_SECS = 10; public static final int KEEP_ALIVE_MSEC = 60 * 1000; - + + public static final String TOKEN_FILE_ARG_PLACEHOLDER + = "__WEBHCAT_TOKEN_FILE_LOCATION__"; + + private static TrivialExecService execService = TrivialExecService.getInstance(); private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class); @@ -104,8 +108,26 @@ public class TempletonControllerJob extends Configured implements Tool { overrideClasspath); List jarArgsList = new LinkedList(Arrays.asList(jarArgs)); String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); + + if (tokenFile != null) { - jarArgsList.add(1, "-Dmapreduce.job.credentials.binary=" + tokenFile); + //Token is available, so replace the placeholder + String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile; + for(int i=0; i it = jarArgsList.iterator(); + while(it.hasNext()){ + String arg = it.next(); + if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){ + it.remove(); + } + } } return execService.run(jarArgsList, removeEnv, env); } diff --git webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java index ebf4f17..af4e1cf 100644 --- webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java +++ webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java @@ -25,6 +25,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; +import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -35,6 +36,7 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; /** @@ -214,12 +216,24 @@ public class TempletonUtils { if (fname == null || conf == null) { return null; } - FileSystem defaultFs = FileSystem.get(new URI(fname), conf, user); + + final Configuration fConf = new Configuration(conf); + final String finalFName = new String(fname); + + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + final FileSystem defaultFs = + ugi.doAs(new PrivilegedExceptionAction() { + public FileSystem run() + throws URISyntaxException, FileNotFoundException, IOException, + InterruptedException { + return FileSystem.get(new URI(finalFName), fConf); + } + }); + URI u = new URI(fname); Path p = new Path(u).makeQualified(defaultFs); - FileSystem fs = p.getFileSystem(conf); - if (hadoopFsIsMissing(fs, p)) + if (hadoopFsIsMissing(defaultFs, p)) throw new FileNotFoundException("File " + fname + " does not exist."); return p;