diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java index 010e8af..32df44f 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java @@ -20,11 +20,15 @@ import java.io.IOException; import java.net.URL; -import java.net.MalformedURLException; import java.util.Date; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.templeton.tool.DelegationTokenCache; import org.apache.hive.hcatalog.templeton.tool.JobState; import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; @@ -39,11 +43,12 @@ * this at the same time. That should never happen. * * We use a Hadoop config var to notify this class on the completion - * of a job. Hadoop will call use multiple times in the event of + * of a job. Hadoop will call us multiple times in the event of * failure. Even if the failure is that the client callback failed. * * See LauncherDelegator for the HADOOP_END_RETRY* vars that are set. */ +@InterfaceAudience.Private public class CompleteDelegator extends TempletonDelegator { private static final Log LOG = LogFactory.getLog(CompleteDelegator.class); @@ -51,28 +56,36 @@ public CompleteDelegator(AppConfig appConf) { super(appConf); } - public CompleteBean run(String id) + public CompleteBean run(String id, String jobStatus) throws CallbackFailedException, IOException { if (id == null) acceptWithError("No jobid given"); JobState state = null; + /* we don't want to cancel the delegation token if we think the callback is going to + to be retried, for example, because the job is not complete yet */ + boolean cancelMetastoreToken = false; try { state = new JobState(id, Main.getAppConfigInstance()); if (state.getCompleteStatus() == null) - failed("Job not yet complete. jobId=" + id, null); + failed("Job not yet complete. jobId=" + id + " Status from JT=" + jobStatus, null); Long notified = state.getNotifiedTime(); - if (notified != null) + if (notified != null) { + cancelMetastoreToken = true; return acceptWithError("Callback already run for jobId=" + id + " at " + new Date(notified)); + } String callback = state.getCallback(); - if (callback == null) + if (callback == null) { + cancelMetastoreToken = true; return new CompleteBean("No callback registered"); + } try { doCallback(state.getId(), callback); + cancelMetastoreToken = true; } catch (Exception e) { failed("Callback failed " + callback + " for " + id, e); } @@ -80,8 +93,26 @@ public CompleteBean run(String id) state.setNotifiedTime(System.currentTimeMillis()); return new CompleteBean("Callback sent"); } finally { - if (state != null) - state.close(); + state.close(); + HiveMetaStoreClient client = null; + try { + if(cancelMetastoreToken) { + String metastoreTokenStrForm = + DelegationTokenCache.getStringFormTokenCache().getDelegationToken(id); + if(metastoreTokenStrForm != null) { + client = HCatUtil.getHiveClient(new HiveConf()); + client.cancelDelegationToken(metastoreTokenStrForm); + LOG.debug("Cancelled token for jobId=" + id + " status from JT=" + jobStatus); + DelegationTokenCache.getStringFormTokenCache().removeDelegationToken(id); + } + } + } + catch(Exception ex) { + LOG.warn("Failed to cancel metastore delegation token for jobId=" + id, ex); + } + finally { + HCatUtil.closeHiveClientQuietly(client); + } } } @@ -90,8 +121,7 @@ public CompleteBean run(String id) * finished. If the url has the string $jobId in it, it will be * replaced with the completed jobid. */ - public static void doCallback(String jobid, String url) - throws MalformedURLException, IOException { + public static void doCallback(String jobid, String url) throws IOException { if (url.contains("$jobId")) url = url.replace("$jobId", jobid); TempletonUtils.fetchUrl(new URL(url)); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index f558fc5..cb9bdcf 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -63,7 +63,7 @@ public EnqueueBean run(String user, try { args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl)); args.add("--"); - addHiveMetaStoreTokenArg(args); + addHiveMetaStoreTokenArg(); args.add(appConf.hivePath()); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index e5282a7..dfaae32 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -70,7 +70,7 @@ public EnqueueBean run(String user, String jar, String mainClass, //check if the rest command specified explicitly to use hcatalog if(usehcatalog){ - addHiveMetaStoreTokenArg(args); + addHiveMetaStoreTokenArg(); } args.add(appConf.clusterHadoop()); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index 7ba483b..161c78c 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -23,10 +23,10 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.exec.ExecuteException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; @@ -43,6 +43,7 @@ public class LauncherDelegator extends TempletonDelegator { private static final Log LOG = LogFactory.getLog(LauncherDelegator.class); protected String runAs = null; + private boolean secureMeatastoreAccess = false; public LauncherDelegator(AppConfig appConf) { super(appConf); @@ -67,7 +68,7 @@ public void registerJob(String id, String user, String callback) */ public EnqueueBean enqueueController(String user, String callback, List args) - throws NotAuthorizedException, BusyException, ExecuteException, + throws NotAuthorizedException, BusyException, IOException, QueueException { try { UserGroupInformation ugi = UgiFactory.getUgi(user); @@ -93,16 +94,14 @@ public EnqueueBean enqueueController(String user, String callback, private String queueAsUser(UserGroupInformation ugi, final List args) throws IOException, InterruptedException { - String id = ugi.doAs(new PrivilegedExceptionAction() { + return ugi.doAs(new PrivilegedExceptionAction() { public String run() throws Exception { String[] array = new String[args.size()]; - TempletonControllerJob ctrl = new TempletonControllerJob(); + TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess); ToolRunner.run(ctrl, args.toArray(array)); return ctrl.getSubmittedId(); } }); - - return id; } public List makeLauncherArgs(AppConfig appConf, String statusdir, @@ -199,15 +198,17 @@ public static void addDef(List args, String name, String val) { } /** - * Add argument to ask TempletonControllerJob to get - * metastore delegation token - * @param args + * This is called by subclasses when they determined that the sumbmitted job requires + * metastore access (e.g. Pig job that uses HCatalog). This then determines if + * secure access is required and causes TempletonControllerJob to set up a delegation token. */ - protected void addHiveMetaStoreTokenArg(ArrayList args) { - LOG.debug("Setting argument for controller job " - + TempletonControllerJob.HIVE_MS_DTOKEN_ENABLE_ARG); - - args.add(TempletonControllerJob.HIVE_MS_DTOKEN_ENABLE_ARG); + void addHiveMetaStoreTokenArg() { + //todo: in order for this to work hive-site.xml must be on the classpath + HiveConf hiveConf = new HiveConf(); + if(!hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL)) { + return; + } + secureMeatastoreAccess = true; } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index 4ac35ae..7698e42 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -81,8 +81,7 @@ public EnqueueBean run(String user, try { ArrayList allFiles = new ArrayList(); if (TempletonUtils.isset(srcFile)) { - allFiles.add(TempletonUtils.hadoopFsFilename - (srcFile, appConf, runAs)); + allFiles.add(TempletonUtils.hadoopFsFilename(srcFile, appConf, runAs)); } if (TempletonUtils.isset(otherFiles)) { String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs); @@ -101,7 +100,7 @@ public EnqueueBean run(String user, //check if the REST command specified explicitly to use hcatalog // or if it says that implicitly using the pig -useHCatalog arg if(usehcatalog || hasPigArgUseHcat(pigArgs)){ - addHiveMetaStoreTokenArg(args); + addHiveMetaStoreTokenArg(); } args.addAll(pigArgs); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 116e10e..aea723b 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -852,10 +852,12 @@ public QueueStatusBean deleteJobId(@PathParam("jobid") String jobid) @GET @Path("internal/complete/{jobid}") @Produces({MediaType.APPLICATION_JSON}) - public CompleteBean completeJob(@PathParam("jobid") String jobid) + public CompleteBean completeJob(@PathParam("jobid") String jobid, + @QueryParam("status") String jobStatus) throws CallbackFailedException, IOException { + LOG.debug("Received callback " + theUriInfo.getRequestUri()); CompleteDelegator d = new CompleteDelegator(appConf); - return d.run(jobid); + return d.run(jobid, jobStatus); } /** @@ -952,7 +954,7 @@ public String getCompletedUrl() { return null; } return theUriInfo.getBaseUri() + VERSION - + "/internal/complete/$jobId"; + + "/internal/complete/$jobId?status=$jobStatus"; } /** * Returns canonical host name from which the request is made; used for doAs validation diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java new file mode 100644 index 0000000..78789e0 --- /dev/null +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton.tool; + +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; + +import java.util.concurrent.ConcurrentHashMap; + +/* + * Cache of delegation tokens. When {@link TempletonControllerJob} submits a job that requires + * metastore access and this access should be secure, TCJ will add a delegation token to the + * submitted job. When the job completes we need to cancel the token since by default the token + * lives for 7 days and over time can cause OOM (if not cancelled). Cancelling from + * TempletonControllerJob.LauchMapper mapper (via custom OutputCommitter for example) requires + * the jar containing HiveMetastoreClient (and any dependent jars) to be available on the node + * running LaunchMapper. Specifying transitive closure of the necessary jars is + * configuration/maintenance headache for each release. Caching the token means cancellation is + * done from WebHCat server and thus has Hive jars on the classpath. + * + * While it's possible that WebHCat crashes and looses this in-memory state, but this would be an + * exceptional condition and since tokens will automatically be cancelled after 7 days, + * the fact that this info is not persisted is OK. (Persisting it also complicates things + * because that needs to be done securely) + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class DelegationTokenCache { + private ConcurrentHashMap tokenCache = + new ConcurrentHashMap(); + private static final DelegationTokenCache stringFormTokenCache = + new DelegationTokenCache(); + + /* + * Returns the singleton instance of jobId->delegation-token-in-string-form cache + */ + public static DelegationTokenCache getStringFormTokenCache() { + return stringFormTokenCache; + } + TokenObject storeDelegationToken(JobId jobId, TokenObject token) { + return tokenCache.put(jobId, token); + } + public TokenObject getDelegationToken(JobId jobId) { + return tokenCache.get(jobId); + } + public void removeDelegationToken(JobId jobId) { + tokenCache.remove(jobId); + } +} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/MSTokenCleanOutputFormat.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/MSTokenCleanOutputFormat.java deleted file mode 100644 index 8d517ae..0000000 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/MSTokenCleanOutputFormat.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hive.hcatalog.templeton.tool; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hcatalog.common.HCatUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Outputformat that cancels hcatalog delegation token once job is done. - */ -public class MSTokenCleanOutputFormat - extends NullOutputFormat { - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) { - return new CleanupOutputCommitter(); - } - - static class CleanupOutputCommitter extends OutputCommitter { - private static final Logger LOG - = LoggerFactory.getLogger(CleanupOutputCommitter.class); - - @Override - public void abortTask(TaskAttemptContext taskContext) { } - - @Override - public void cleanupJob(JobContext jobContext) { } - - @Override - public void commitTask(TaskAttemptContext taskContext) { } - - @Override - public boolean needsTaskCommit(TaskAttemptContext taskContext) { - return false; - } - - @Override - public void setupJob(JobContext jobContext) { } - - @Override - public void setupTask(TaskAttemptContext taskContext) { } - - @Override - public void commitJob(JobContext jobContext) { - cancelHcatDelegationTokens(jobContext); - } - - @Override - public void abortJob(JobContext jobContext, JobStatus.State state) { - cancelHcatDelegationTokens(jobContext); - } - - private void cancelHcatDelegationTokens(JobContext context) { - try{ - docancelHcatDelegationTokens(context); - }catch(Throwable t){ - LOG.warn("Error cancelling delegation token", t); - } - } - - private void docancelHcatDelegationTokens(JobContext context) { - //Cancel hive metastore token - HiveMetaStoreClient client = null; - try { - HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); - client = HCatUtil.getHiveClient(hiveConf); - String tokenStrForm = client.getTokenStrForm(); - if (tokenStrForm != null) { - client.cancelDelegationToken(tokenStrForm); - } - } catch (Exception e) { - LOG.warn("Failed to cancel delegation token", e); - } finally { - HCatUtil.closeHiveClientQuietly(client); - } - } - - - }; - -} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index cfc6b2b..abb689c 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -42,9 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; @@ -57,8 +55,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.templeton.SecureProxySupport; import org.apache.hive.hcatalog.templeton.UgiFactory; import org.apache.thrift.TException; @@ -89,17 +85,21 @@ public static final int KEEP_ALIVE_MSEC = 60 * 1000; public static final String TOKEN_FILE_ARG_PLACEHOLDER - = "__WEBHCAT_TOKEN_FILE_LOCATION__"; - - public static final String HIVE_MS_DTOKEN_ENABLE_ARG - = "__TEMPLETON_FETCH_HIVE_METASTORE_DELEGATION_TOKEN__"; - + = "__WEBHCAT_TOKEN_FILE_LOCATION__"; private static TrivialExecService execService = TrivialExecService.getInstance(); private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class); + private final boolean secureMetastoreAccess; - + /** + * @param secureMetastoreAccess - if true, a delegation token will be created + * and added to the job + */ + public TempletonControllerJob(boolean secureMetastoreAccess) { + super(); + this.secureMetastoreAccess = secureMetastoreAccess; + } public static class LaunchMapper extends Mapper { protected Process startJob(Context context, String user, @@ -327,28 +327,16 @@ public String getSubmittedId() { /** * Enqueue the job and print out the job id for later collection. + * @see org.apache.hive.hcatalog.templeton.CompleteDelegator */ @Override public int run(String[] args) - throws IOException, InterruptedException, ClassNotFoundException, - MetaException, TException { + throws IOException, InterruptedException, ClassNotFoundException, TException { Configuration conf = getConf(); - boolean fetchHMetaStoreToken = checkHMSTokenArg(args); - if(fetchHMetaStoreToken){ - args = removeHMSTokenArg(args); - } - - HiveConf hconf = new HiveConf(); - if(isHMSDelegationNeeded(fetchHMetaStoreToken, hconf)){ - //this util function serializes hive configuration and - //stores it as a property in conf! - //It returns deserialized hiveconf when called in backend - HCatUtil.getHiveConf(conf); - } conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args)); - String user = UserGroupInformation.getCurrentUser().getShortUserName(); - conf.set("user.name", user); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + conf.set("user.name", user); Job job = new Job(conf); job.setJarByClass(TempletonControllerJob.class); job.setJobName("TempletonControllerJob"); @@ -357,15 +345,8 @@ public int run(String[] args) job.setMapOutputValueClass(Text.class); job.setInputFormatClass(SingleInputFormat.class); - if(isHMSDelegationNeeded(fetchHMetaStoreToken, hconf)){ - //need to cancel the meta store delegation token after the job is done - //setup a OutputFormat that can do that - job.setOutputFormatClass(MSTokenCleanOutputFormat.class); - } else { - NullOutputFormat of - = new NullOutputFormat(); - job.setOutputFormatClass(of.getClass()); - } + NullOutputFormat of = new NullOutputFormat(); + job.setOutputFormatClass(of.getClass()); job.setNumReduceTasks(0); JobClient jc = new JobClient(new JobConf(job.getConfiguration())); @@ -373,87 +354,49 @@ public int run(String[] args) Token mrdt = jc.getDelegationToken(new Text("mr token")); job.getCredentials().addToken(new Text("mr token"), mrdt); - if(isHMSDelegationNeeded(fetchHMetaStoreToken, hconf)){ - // Get a token for the Hive metastore. - LOG.debug("Getting delegation token from hive metastore"); - addHMSToken(job, user); - } + String metastoreTokenStrForm = addHMSToken(job, user); job.submit(); submittedJobId = job.getJobID(); - + if(metastoreTokenStrForm != null) { + //so that it can be cancelled later from CompleteDelegator + DelegationTokenCache.getStringFormTokenCache().storeDelegationToken( + submittedJobId.toString(), metastoreTokenStrForm); + LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + " " + + "user=" + user); + } return 0; } - - - private boolean isHMSDelegationNeeded(boolean fetchHMetaStoreToken, HiveConf hconf) { - return fetchHMetaStoreToken - && hconf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); - } - - private void addHMSToken(Job job, String user) - throws MetaException, IOException, InterruptedException, TException { - Token - hiveToken = - new Token(); - hiveToken.decodeFromUrlString(buildHcatDelegationToken(user)); + private String addHMSToken(Job job, String user) throws IOException, InterruptedException, + TException { + if(!secureMetastoreAccess) { + return null; + } + Token hiveToken = + new Token(); + String metastoreTokenStrForm = buildHcatDelegationToken(user); + hiveToken.decodeFromUrlString(metastoreTokenStrForm); job.getCredentials().addToken(new Text(SecureProxySupport.HCAT_SERVICE), hiveToken); + return metastoreTokenStrForm; } - - private String[] removeHMSTokenArg(String[] args) { - String[] newArgs = new String[args.length-1]; - int i = 0; - for(String arg : args){ - if(!arg.equals(HIVE_MS_DTOKEN_ENABLE_ARG)){ - newArgs[i++] = arg; + private String buildHcatDelegationToken(String user) throws IOException, InterruptedException, + TException { + final HiveConf c = new HiveConf(); + LOG.debug("Creating hive metastore delegation token for user " + user); + final UserGroupInformation ugi = UgiFactory.getUgi(user); + UserGroupInformation real = ugi.getRealUser(); + return real.doAs(new PrivilegedExceptionAction() { + public String run() throws IOException, TException, InterruptedException { + final HiveMetaStoreClient client = new HiveMetaStoreClient(c); + return ugi.doAs(new PrivilegedExceptionAction() { + public String run() throws IOException, TException, InterruptedException { + String u = ugi.getUserName(); + return client.getDelegationToken(u); + } + }); } - } - if(i != newArgs.length){ - //should never happen! - throw new AssertionError("Error creating args list, " + - "while processing -usehcatalog arg. " + " i " + newArgs.length ); - } - return newArgs; + }); } - - private boolean checkHMSTokenArg(String[] args) { - for(String arg : args){ - if(arg.equals(HIVE_MS_DTOKEN_ENABLE_ARG)){ - return true; - } - } - return false; - } - - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new TempletonControllerJob(), args); - if (ret != 0) { - System.err.println("TempletonControllerJob failed!"); - } - System.exit(ret); - } - - private String buildHcatDelegationToken(String user) - throws IOException, InterruptedException, MetaException, TException { - final HiveConf c = new HiveConf(); - LOG.debug("Creating hive metastore delegation token for user " + user); - final UserGroupInformation ugi = UgiFactory.getUgi(user); - UserGroupInformation real = ugi.getRealUser(); - String s = real.doAs(new PrivilegedExceptionAction() { - public String run() - throws IOException, MetaException, TException, InterruptedException { - final HiveMetaStoreClient client = new HiveMetaStoreClient(c); - return ugi.doAs(new PrivilegedExceptionAction() { - public String run() - throws IOException, MetaException, TException, InterruptedException { - String u = ugi.getUserName(); - return client.getDelegationToken(u); - } - }); - } - }); - return s; - } }