diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java index 3c207cb..22da67c 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/HiveDelegator.java @@ -60,8 +60,10 @@ public EnqueueBean run(String user, try { args.addAll(makeBasicArgs(execute, srcFile, statusdir, completedUrl)); args.add("--"); + addHiveMetaStoreTokenArg(args); + args.add(appConf.hivePath()); - + args.add("--service"); args.add("cli"); @@ -102,9 +104,10 @@ public EnqueueBean run(String user, ArrayList args = new ArrayList(); ArrayList allFiles = new ArrayList(); - if (TempletonUtils.isset(srcFile)) + if (TempletonUtils.isset(srcFile)) { allFiles.add(TempletonUtils.hadoopFsFilename(srcFile, appConf, runAs)); + } args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles)); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java index a2dc23e..adf0d4d 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/JarDelegator.java @@ -41,13 +41,14 @@ public JarDelegator(AppConfig appConf) { public EnqueueBean run(String user, String jar, String mainClass, String libjars, String files, List jarArgs, List defines, - String statusdir, String callback, String completedUrl) + String statusdir, String callback, + boolean usehcatalog, String completedUrl) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, - statusdir, completedUrl); + statusdir, usehcatalog, completedUrl); return enqueueController(user, callback, args); } @@ -55,7 +56,8 @@ public EnqueueBean run(String user, String jar, String mainClass, private List makeArgs(String jar, String mainClass, String libjars, String files, List jarArgs, List defines, - String statusdir, String completedUrl) + String statusdir, boolean usehcatalog, + String completedUrl) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { @@ -65,11 +67,18 @@ public EnqueueBean run(String user, String jar, String mainClass, args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles)); args.add("--"); + + //check if the rest command specified explicitly to use hcatalog + if(usehcatalog){ + addHiveMetaStoreTokenArg(args); + } + args.add(appConf.clusterHadoop()); args.add("jar"); args.add(TempletonUtils.hadoopFsPath(jar, appConf, runAs).getName()); - if (TempletonUtils.isset(mainClass)) + if (TempletonUtils.isset(mainClass)) { args.add(mainClass); + } if (TempletonUtils.isset(libjars)) { args.add("-libjars"); args.add(TempletonUtils.hadoopFsListAsString(libjars, appConf, @@ -82,9 +91,10 @@ public EnqueueBean run(String user, String jar, String mainClass, } //the token file location comes after mainClass, as a -Dprop=val args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); - - for (String d : defines) + + for (String d : defines) { args.add("-D" + d); + } args.addAll(jarArgs); } catch (FileNotFoundException e) { diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/LauncherDelegator.java index cb89409..e477019 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/LauncherDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/LauncherDelegator.java @@ -57,8 +57,9 @@ public void registerJob(String id, String user, String callback) state.setUser(user); state.setCallback(callback); } finally { - if (state != null) + if (state != null) { state.close(); + } } } @@ -79,8 +80,9 @@ public EnqueueBean enqueueController(String user, String callback, long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6)); LOG.debug("queued job " + id + " in " + elapsed + " ms"); - if (id == null) + if (id == null) { throw new QueueException("Unable to get job id"); + } registerJob(id, user, callback); @@ -173,8 +175,9 @@ public static void addCacheFiles(List args, AppConfig appConf) { */ public static String makeOverrideClasspath(AppConfig appConf) { String[] overrides = appConf.overrideJars(); - if (overrides == null) + if (overrides == null) { return null; + } ArrayList cp = new ArrayList(); for (String fname : overrides) { @@ -196,4 +199,16 @@ public static void addDef(List args, String name, String val) { } } + /** + * Add argument to ask TempletonControllerJob to get + * metastore delegation token + * @param args + */ + protected void addHiveMetaStoreTokenArg(ArrayList args) { + LOG.debug("Setting argument for controller job " + + TempletonControllerJob.HIVE_MS_DTOKEN_ENABLE_ARG); + + args.add(TempletonControllerJob.HIVE_MS_DTOKEN_ENABLE_ARG); + } + } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java index b318373..a20292d 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/PigDelegator.java @@ -26,6 +26,8 @@ import java.util.List; import org.apache.commons.exec.ExecuteException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hcatalog.templeton.tool.TempletonControllerJob; import org.apache.hcatalog.templeton.tool.TempletonUtils; @@ -35,6 +37,7 @@ * This is the backend of the pig web service. */ public class PigDelegator extends LauncherDelegator { + private static final Log LOG = LogFactory.getLog(PigDelegator.class); public PigDelegator(AppConfig appConf) { super(appConf); } @@ -42,27 +45,45 @@ public PigDelegator(AppConfig appConf) { public EnqueueBean run(String user, String execute, String srcFile, List pigArgs, String otherFiles, - String statusdir, String callback, String completedUrl) + String statusdir, String callback, + boolean usehcatalog, String completedUrl) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(execute, srcFile, pigArgs, - otherFiles, statusdir, completedUrl); + otherFiles, statusdir, + usehcatalog, completedUrl); return enqueueController(user, callback, args); } + /** + * @param execute pig query string to be executed + * @param srcFile pig query file to be executed + * @param pigArgs pig command line arguments + * @param otherFiles files to be copied to the map reduce cluster + * @param statusdir status dir location + * @param usehcatalog whether the command uses hcatalog/needs to connect + * to hive metastore server + * @param completedUrl call back url + * @return + * @throws BadParam + * @throws IOException + * @throws InterruptedException + */ private List makeArgs(String execute, String srcFile, List pigArgs, String otherFiles, - String statusdir, String completedUrl) + String statusdir, boolean usehcatalog, + String completedUrl) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { ArrayList allFiles = new ArrayList(); - if (TempletonUtils.isset(srcFile)) + if (TempletonUtils.isset(srcFile)) { allFiles.add(TempletonUtils.hadoopFsFilename - (srcFile, appConf, runAs)); + (srcFile, appConf, runAs)); + } if (TempletonUtils.isset(otherFiles)) { String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs); allFiles.addAll(Arrays.asList(ofs)); @@ -76,7 +97,13 @@ public EnqueueBean run(String user, args.add(appConf.pigPath()); //the token file location should be first argument of pig args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); - + + //check if the REST command specified explicitly to use hcatalog + // or if it says that implicitly using the pig -useHCatalog arg + if(usehcatalog || hasPigArgUseHcat(pigArgs)){ + addHiveMetaStoreTokenArg(args); + } + args.addAll(pigArgs); if (TempletonUtils.isset(execute)) { args.add("-execute"); @@ -94,4 +121,18 @@ public EnqueueBean run(String user, return args; } + + /** + * Check if the pig arguments has -useHCatalog set + * @param pigArgs + * @return + */ + private boolean hasPigArgUseHcat(List pigArgs) { + for(int i = 0; i < pigArgs.size(); i++){ + if(pigArgs.get(i).equals("-useHCatalog")){ + return true; + } + } + return false; + } } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/SecureProxySupport.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/SecureProxySupport.java index 7bf78c4..4d6673b 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/SecureProxySupport.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/SecureProxySupport.java @@ -44,7 +44,7 @@ */ public class SecureProxySupport { private Path tokenPath; - private final String HCAT_SERVICE = "hcat"; + public static final String HCAT_SERVICE = "hcat"; private boolean isEnabled; private String user; diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Server.java index 29ac4b3..979dcf1 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Server.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/Server.java @@ -500,8 +500,9 @@ public Response dropDatabase(@PathParam("db") String db, BadParam, ExecuteException, IOException { verifyUser(); verifyDdlParam(db, ":db"); - if (TempletonUtils.isset(option)) + if (TempletonUtils.isset(option)) { verifyDdlParam(option, "option"); + } HcatDelegator d = new HcatDelegator(appConf, execService); return d.dropDatabase(getUser(), db, ifExists, option, group, permissions); @@ -600,6 +601,24 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, /** * Run a MapReduce Jar job. + * Params correspond to the REST api params + * @param jar + * @param mainClass + * @param libjars + * @param files + * @param args + * @param defines + * @param statusdir + * @param callback + * @param usehcatalog + * @return EnqueueBean + * @throws NotAuthorizedException + * @throws BusyException + * @throws BadParam + * @throws QueueException + * @throws ExecuteException + * @throws IOException + * @throws InterruptedException */ @POST @Path("mapreduce/jar") @@ -611,7 +630,8 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("arg") List args, @FormParam("define") List defines, @FormParam("statusdir") String statusdir, - @FormParam("callback") String callback) + @FormParam("callback") String callback, + @FormParam("usehcatalog") boolean usehcatalog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); @@ -622,11 +642,27 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, return d.run(getUser(), jar, mainClass, libjars, files, args, defines, - statusdir, callback, getCompletedUrl()); + statusdir, callback, usehcatalog, getCompletedUrl()); } /** * Run a Pig job. + * Params correspond to the REST api params + * @param execute + * @param srcFile + * @param pigArgs + * @param otherFiles + * @param statusdir + * @param callback + * @param usehcatalog + * @return EnqueueBean + * @throws NotAuthorizedException + * @throws BusyException + * @throws BadParam + * @throws QueueException + * @throws ExecuteException + * @throws IOException + * @throws InterruptedException */ @POST @Path("pig") @@ -636,7 +672,8 @@ public EnqueueBean pig(@FormParam("execute") String execute, @FormParam("arg") List pigArgs, @FormParam("files") String otherFiles, @FormParam("statusdir") String statusdir, - @FormParam("callback") String callback) + @FormParam("callback") String callback, + @FormParam("usehcatalog") boolean usehcatalog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); @@ -647,7 +684,7 @@ public EnqueueBean pig(@FormParam("execute") String execute, return d.run(getUser(), execute, srcFile, pigArgs, otherFiles, - statusdir, callback, getCompletedUrl()); + statusdir, callback, usehcatalog, getCompletedUrl()); } /** @@ -664,8 +701,9 @@ public EnqueueBean hive(@FormParam("execute") String execute, throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); - if (execute == null && srcFile == null) + if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); + } HiveDelegator d = new HiveDelegator(appConf); return d.run(getUser(), execute, srcFile, defines, @@ -680,7 +718,7 @@ public EnqueueBean hive(@FormParam("execute") String execute, @Produces({MediaType.APPLICATION_JSON}) public QueueStatusBean showQueueId(@PathParam("jobid") String jobid) throws NotAuthorizedException, BadParam, IOException, InterruptedException { - + verifyUser(); verifyParam(jobid, ":jobid"); @@ -696,7 +734,7 @@ public QueueStatusBean showQueueId(@PathParam("jobid") String jobid) @Produces({MediaType.APPLICATION_JSON}) public QueueStatusBean deleteQueueId(@PathParam("jobid") String jobid) throws NotAuthorizedException, BadParam, IOException, InterruptedException { - + verifyUser(); verifyParam(jobid, ":jobid"); @@ -712,7 +750,7 @@ public QueueStatusBean deleteQueueId(@PathParam("jobid") String jobid) @Produces({MediaType.APPLICATION_JSON}) public List showQueueList() throws NotAuthorizedException, BadParam, IOException, InterruptedException { - + verifyUser(); ListDelegator d = new ListDelegator(appConf); @@ -738,8 +776,9 @@ public void verifyUser() throws NotAuthorizedException { if (getUser() == null) { String msg = "No user found."; - if (!UserGroupInformation.isSecurityEnabled()) + if (!UserGroupInformation.isSecurityEnabled()) { msg += " Missing " + PseudoAuthenticator.USER_NAME + " parameter."; + } throw new NotAuthorizedException(msg); } } @@ -749,8 +788,9 @@ public void verifyUser() */ public void verifyParam(String param, String name) throws BadParam { - if (param == null) + if (param == null) { throw new BadParam("Missing " + name + " parameter"); + } } /** @@ -758,8 +798,9 @@ public void verifyParam(String param, String name) */ public void verifyParam(List param, String name) throws BadParam { - if (param == null || param.isEmpty()) + if (param == null || param.isEmpty()) { throw new BadParam("Missing " + name + " parameter"); + } } public static final Pattern DDL_ID = Pattern.compile("[a-zA-Z]\\w*"); @@ -774,8 +815,9 @@ public void verifyDdlParam(String param, String name) throws BadParam { verifyParam(param, name); Matcher m = DDL_ID.matcher(param); - if (!m.matches()) + if (!m.matches()) { throw new BadParam("Invalid DDL identifier " + name); + } } /** diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StreamingDelegator.java index 85557ba..c616686 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StreamingDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StreamingDelegator.java @@ -53,7 +53,7 @@ public EnqueueBean run(String user, return d.run(user, appConf.streamingJar(), null, null, null, args, defines, - statusdir, callback, completedUrl); + statusdir, callback, false, completedUrl); } private List makeArgs(List inputs, @@ -76,12 +76,15 @@ public EnqueueBean run(String user, args.add("-reducer"); args.add(reducer); - for (String f : files) + for (String f : files) { args.add("-file" + f); - for (String d : defines) + } + for (String d : defines) { args.add("-D" + d); - for (String e : cmdenvs) + } + for (String e : cmdenvs) { args.add("-cmdenv" + e); + } args.addAll(jarArgs); return args; diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/UgiFactory.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/UgiFactory.java index f617341..d717771 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/UgiFactory.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/UgiFactory.java @@ -27,7 +27,7 @@ private static ConcurrentHashMap userUgiMap = new ConcurrentHashMap(); - static UserGroupInformation getUgi(String user) throws IOException { + public static UserGroupInformation getUgi(String user) throws IOException { UserGroupInformation ugi = userUgiMap.get(user); if (ugi == null) { //create new ugi and add to map diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java index fc69529..0936eb7 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java @@ -24,6 +24,7 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -40,6 +41,10 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; @@ -53,6 +58,10 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.templeton.SecureProxySupport; +import org.apache.hcatalog.templeton.UgiFactory; +import org.apache.thrift.TException; /** * A Map Reduce job that will start another job. @@ -78,11 +87,14 @@ public static final int WATCHER_TIMEOUT_SECS = 10; public static final int KEEP_ALIVE_MSEC = 60 * 1000; - - public static final String TOKEN_FILE_ARG_PLACEHOLDER + + public static final String TOKEN_FILE_ARG_PLACEHOLDER = "__WEBHCAT_TOKEN_FILE_LOCATION__"; - - + + public static final String HIVE_MS_DTOKEN_ENABLE_ARG + = "__TEMPLETON_FETCH_HIVE_METASTORE_DELEGATION_TOKEN__"; + + private static TrivialExecService execService = TrivialExecService.getInstance(); private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class); @@ -110,11 +122,11 @@ protected Process startJob(Context context, String user, //Token is available, so replace the placeholder String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile; for(int i=0; i it = jarArgsList.iterator(); @@ -164,8 +176,9 @@ public void run(Context context) proc.waitFor(); keepAlive.sendReport = false; pool.shutdown(); - if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) - pool.shutdownNow(); + if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) { + pool.shutdownNow(); + } writeExitValue(conf, proc.exitValue(), statusdir); JobState state = new JobState(context.getJobID().toString(), conf); @@ -173,11 +186,12 @@ public void run(Context context) state.setCompleteStatus("done"); state.close(); - if (proc.exitValue() != 0) - System.err.println("templeton: job failed with exit code " - + proc.exitValue()); - else - System.err.println("templeton: job completed with exit code 0"); + if (proc.exitValue() != 0) { + System.err.println("templeton: job failed with exit code " + + proc.exitValue()); + } else { + System.err.println("templeton: job completed with exit code 0"); + } } private void executeWatcher(ExecutorService pool, Configuration conf, @@ -223,10 +237,11 @@ public Watcher(Configuration conf, JobID jobid, InputStream in, this.jobid = jobid; this.in = in; - if (name.equals(STDERR_FNAME)) - out = System.err; - else - out = System.out; + if (name.equals(STDERR_FNAME)) { + out = System.err; + } else { + out = System.out; + } if (TempletonUtils.isset(statusdir)) { Path p = new Path(statusdir, name); @@ -299,10 +314,11 @@ public void run() { private JobID submittedJobId; public String getSubmittedId() { - if (submittedJobId == null) - return null; - else - return submittedJobId.toString(); + if (submittedJobId == null) { + return null; + } else { + return submittedJobId.toString(); + } } /** @@ -310,10 +326,25 @@ public String getSubmittedId() { */ @Override public int run(String[] args) - throws IOException, InterruptedException, ClassNotFoundException { + throws IOException, InterruptedException, ClassNotFoundException, + MetaException, TException { Configuration conf = getConf(); + boolean fetchHMetaStoreToken = checkHMSTokenArg(args); + if(fetchHMetaStoreToken){ + args = removeHMSTokenArg(args); + } + + HiveConf hconf = new HiveConf(); + if(isHMSDelegationNeeded(fetchHMetaStoreToken, hconf)){ + //this util function serializes hive configuration and + //stores it as a property in conf! + //It returns deserialized hiveconf when called in backend + HCatUtil.getHiveConf(conf); + } + conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args)); - conf.set("user.name", UserGroupInformation.getCurrentUser().getShortUserName()); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + conf.set("user.name", user); Job job = new Job(conf); job.setJarByClass(TempletonControllerJob.class); job.setJobName("TempletonControllerJob"); @@ -321,15 +352,29 @@ public int run(String[] args) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(SingleInputFormat.class); - NullOutputFormat of + + if(isHMSDelegationNeeded(fetchHMetaStoreToken, hconf)){ + //need to cancel the meta store delegation token after the job is done + //setup a OutputFormat that can do that + job.setOutputFormatClass(MSTokenCleanOutputFormat.class); + } else { + NullOutputFormat of = new NullOutputFormat(); - job.setOutputFormatClass(of.getClass()); + job.setOutputFormatClass(of.getClass()); + } job.setNumReduceTasks(0); JobClient jc = new JobClient(new JobConf(job.getConfiguration())); Token mrdt = jc.getDelegationToken(new Text("mr token")); job.getCredentials().addToken(new Text("mr token"), mrdt); + + if(isHMSDelegationNeeded(fetchHMetaStoreToken, hconf)){ + // Get a token for the Hive metastore. + LOG.debug("Getting delegation token from hive metastore"); + addHMSToken(job, user); + } + job.submit(); submittedJobId = job.getJobID(); @@ -338,10 +383,73 @@ public int run(String[] args) } + private boolean isHMSDelegationNeeded(boolean fetchHMetaStoreToken, HiveConf hconf) { + return fetchHMetaStoreToken + && hconf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); + } + + private void addHMSToken(Job job, String user) + throws MetaException, IOException, InterruptedException, TException { + Token + hiveToken = + new Token(); + hiveToken.decodeFromUrlString(buildHcatDelegationToken(user)); + job.getCredentials().addToken(new + Text(SecureProxySupport.HCAT_SERVICE), hiveToken); + } + + private String[] removeHMSTokenArg(String[] args) { + String[] newArgs = new String[args.length-1]; + int i = 0; + for(String arg : args){ + if(!arg.equals(HIVE_MS_DTOKEN_ENABLE_ARG)){ + newArgs[i++] = arg; + } + } + if(i != newArgs.length){ + //should never happen! + throw new AssertionError("Error creating args list, " + + "while processing -usehcatalog arg. " + " i " + newArgs.length ); + } + return newArgs; + } + + private boolean checkHMSTokenArg(String[] args) { + for(String arg : args){ + if(arg.equals(HIVE_MS_DTOKEN_ENABLE_ARG)){ + return true; + } + } + return false; + } + public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new TempletonControllerJob(), args); - if (ret != 0) + if (ret != 0) { System.err.println("TempletonControllerJob failed!"); + } System.exit(ret); } + + private String buildHcatDelegationToken(String user) + throws IOException, InterruptedException, MetaException, TException { + final HiveConf c = new HiveConf(); + LOG.debug("Creating hive metastore delegation token for user " + user); + final UserGroupInformation ugi = UgiFactory.getUgi(user); + UserGroupInformation real = ugi.getRealUser(); + String s = real.doAs(new PrivilegedExceptionAction() { + public String run() + throws IOException, MetaException, TException, InterruptedException { + final HiveMetaStoreClient client = new HiveMetaStoreClient(c); + return ugi.doAs(new PrivilegedExceptionAction() { + public String run() + throws IOException, MetaException, TException, InterruptedException { + String u = ugi.getUserName(); + return client.getDelegationToken(u); + } + }); + } + }); + return s; + } }