diff --git a/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf b/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf index b14c991..f394731 100644 --- a/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf +++ b/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf @@ -241,6 +241,28 @@ $cfg = }, + { + 'num' => 9, + 'setup' => [ + { + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/ddl', + 'status_code' => 200, + 'post_options' => ['user.name=:UNAME:','exec=drop table if exists hcattest_pig; create table hcattest_pig(i int, j int) STORED AS textfile;'], + 'json_field_substr_match' => {'stderr' => 'OK'} + } + ], + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/pig', + 'post_options' => ['user.name=:UNAME:', 'arg=-useHCatalog', 'arg=-p', 'arg=INPDIR=:INPDIR_HDFS:', 'arg=-p', 'arg= OUTDIR=:OUTDIR:', 'file=:INPDIR_HDFS:/hcatloadstore.pig'], + 'json_field_substr_match' => { 'id' => '\d+'}, + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'SUCCESS', + 'check_job_exit_value' => 0, + 'check_call_back' => 1, + + }, #test 9 #TODO jython test diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index f472c47..81be627 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -60,6 +60,8 @@ public EnqueueBean run(String user, try { args.addAll(makeBasicArgs(execute, srcFile, statusdir, completedUrl)); args.add("--"); + addHiveMetaStoreTokenArg(args); + args.add(appConf.hivePath()); args.add("--service"); @@ -102,9 +104,10 @@ public EnqueueBean run(String user, ArrayList args = new ArrayList(); ArrayList allFiles = new ArrayList(); - if (TempletonUtils.isset(srcFile)) + if (TempletonUtils.isset(srcFile)) { allFiles.add(TempletonUtils.hadoopFsFilename(srcFile, appConf, runAs)); + } args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles)); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 663da94..e5282a7 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -41,13 +41,14 @@ public JarDelegator(AppConfig appConf) { public EnqueueBean run(String user, String jar, String mainClass, String libjars, String files, List jarArgs, List defines, - String statusdir, String callback, String completedUrl) + String statusdir, String callback, + boolean usehcatalog, String completedUrl) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, - statusdir, completedUrl); + statusdir, usehcatalog, completedUrl); return enqueueController(user, callback, args); } @@ -55,7 +56,8 @@ public EnqueueBean run(String user, String jar, String mainClass, private List makeArgs(String jar, String mainClass, String libjars, String files, List jarArgs, List defines, - String statusdir, String completedUrl) + String statusdir, boolean usehcatalog, + String completedUrl) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { @@ -65,11 +67,18 @@ public EnqueueBean run(String user, String jar, String mainClass, args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles)); args.add("--"); + + //check if the rest command specified explicitly to use hcatalog + if(usehcatalog){ + addHiveMetaStoreTokenArg(args); + } + args.add(appConf.clusterHadoop()); args.add("jar"); args.add(TempletonUtils.hadoopFsPath(jar, appConf, runAs).getName()); - if (TempletonUtils.isset(mainClass)) + if (TempletonUtils.isset(mainClass)) { args.add(mainClass); + } if (TempletonUtils.isset(libjars)) { args.add("-libjars"); args.add(TempletonUtils.hadoopFsListAsString(libjars, appConf, @@ -83,8 +92,9 @@ public EnqueueBean run(String user, String jar, String mainClass, //the token file location comes after mainClass, as a -Dprop=val args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); - for (String d : defines) + for (String d : defines) { args.add("-D" + d); + } args.addAll(jarArgs); } catch (FileNotFoundException e) { diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index 75d57eb..7ba483b 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -56,8 +56,9 @@ public void registerJob(String id, String user, String callback) state.setUser(user); state.setCallback(callback); } finally { - if (state != null) + if (state != null) { state.close(); + } } } @@ -78,8 +79,9 @@ public EnqueueBean enqueueController(String user, String callback, long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6)); LOG.debug("queued job " + id + " in " + elapsed + " ms"); - if (id == null) + if (id == null) { throw new QueueException("Unable to get job id"); + } registerJob(id, user, callback); @@ -172,8 +174,9 @@ public static void addCacheFiles(List args, AppConfig appConf) { */ public static String makeOverrideClasspath(AppConfig appConf) { String[] overrides = appConf.overrideJars(); - if (overrides == null) + if (overrides == null) { return null; + } ArrayList cp = new ArrayList(); for (String fname : overrides) { @@ -195,4 +198,16 @@ public static void addDef(List args, String name, String val) { } } + /** + * Add argument to ask TempletonControllerJob to get + * metastore delegation token + * @param args + */ + protected void addHiveMetaStoreTokenArg(ArrayList args) { + LOG.debug("Setting argument for controller job " + + TempletonControllerJob.HIVE_MS_DTOKEN_ENABLE_ARG); + + args.add(TempletonControllerJob.HIVE_MS_DTOKEN_ENABLE_ARG); + } + } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index 14c3fbd..4ac35ae 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -26,6 +26,8 @@ import java.util.List; import org.apache.commons.exec.ExecuteException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; @@ -35,6 +37,7 @@ * This is the backend of the pig web service. */ public class PigDelegator extends LauncherDelegator { + private static final Log LOG = LogFactory.getLog(PigDelegator.class); public PigDelegator(AppConfig appConf) { super(appConf); } @@ -42,27 +45,45 @@ public PigDelegator(AppConfig appConf) { public EnqueueBean run(String user, String execute, String srcFile, List pigArgs, String otherFiles, - String statusdir, String callback, String completedUrl) + String statusdir, String callback, + boolean usehcatalog, String completedUrl) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(execute, srcFile, pigArgs, - otherFiles, statusdir, completedUrl); + otherFiles, statusdir, + usehcatalog, completedUrl); return enqueueController(user, callback, args); } + /** + * @param execute pig query string to be executed + * @param srcFile pig query file to be executed + * @param pigArgs pig command line arguments + * @param otherFiles files to be copied to the map reduce cluster + * @param statusdir status dir location + * @param usehcatalog whether the command uses hcatalog/needs to connect + * to hive metastore server + * @param completedUrl call back url + * @return + * @throws BadParam + * @throws IOException + * @throws InterruptedException + */ private List makeArgs(String execute, String srcFile, List pigArgs, String otherFiles, - String statusdir, String completedUrl) + String statusdir, boolean usehcatalog, + String completedUrl) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { ArrayList allFiles = new ArrayList(); - if (TempletonUtils.isset(srcFile)) + if (TempletonUtils.isset(srcFile)) { allFiles.add(TempletonUtils.hadoopFsFilename - (srcFile, appConf, runAs)); + (srcFile, appConf, runAs)); + } if (TempletonUtils.isset(otherFiles)) { String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs); allFiles.addAll(Arrays.asList(ofs)); @@ -77,6 +98,12 @@ public EnqueueBean run(String user, //the token file location should be first argument of pig args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); + //check if the REST command specified explicitly to use hcatalog + // or if it says that implicitly using the pig -useHCatalog arg + if(usehcatalog || hasPigArgUseHcat(pigArgs)){ + addHiveMetaStoreTokenArg(args); + } + args.addAll(pigArgs); if (TempletonUtils.isset(execute)) { args.add("-execute"); @@ -94,4 +121,18 @@ public EnqueueBean run(String user, return args; } + + /** + * Check if the pig arguments has -useHCatalog set + * @param pigArgs + * @return + */ + private boolean hasPigArgUseHcat(List pigArgs) { + for(int i = 0; i < pigArgs.size(); i++){ + if(pigArgs.get(i).equals("-useHCatalog")){ + return true; + } + } + return false; + } } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java index 663aa58..d47f97e 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java @@ -33,10 +33,10 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.io.Text; -import org.apache.thrift.TException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.thrift.TException; /** * Helper class to run jobs using Kerberos security. Always safe to @@ -44,8 +44,8 @@ */ public class SecureProxySupport { private Path tokenPath; - private final String HCAT_SERVICE = "hcat"; - private boolean isEnabled; + public static final String HCAT_SERVICE = "hcat"; + private final boolean isEnabled; private String user; public SecureProxySupport() { diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 3fcae2b..3a67159 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; + import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DELETE; import javax.ws.rs.FormParam; @@ -185,8 +186,9 @@ public Response listTables(@PathParam("db") String db, verifyDdlParam(db, ":db"); HcatDelegator d = new HcatDelegator(appConf, execService); - if (!TempletonUtils.isset(tablePattern)) + if (!TempletonUtils.isset(tablePattern)) { tablePattern = "*"; + } return d.listTables(getDoAsUser(), db, tablePattern); } @@ -251,10 +253,11 @@ public Response descTable(@PathParam("db") String db, verifyDdlParam(table, ":table"); HcatDelegator d = new HcatDelegator(appConf, execService); - if ("extended".equals(format)) + if ("extended".equals(format)) { return d.descExtendedTable(getDoAsUser(), db, table); - else + } else { return d.descTable(getDoAsUser(), db, table, false); + } } /** @@ -454,8 +457,9 @@ public Response listDatabases(@QueryParam("like") String dbPattern) verifyUser(); HcatDelegator d = new HcatDelegator(appConf, execService); - if (!TempletonUtils.isset(dbPattern)) + if (!TempletonUtils.isset(dbPattern)) { dbPattern = "*"; + } return d.listDatabases(getDoAsUser(), dbPattern); } @@ -507,8 +511,9 @@ public Response dropDatabase(@PathParam("db") String db, BadParam, ExecuteException, IOException { verifyUser(); verifyDdlParam(db, ":db"); - if (TempletonUtils.isset(option)) + if (TempletonUtils.isset(option)) { verifyDdlParam(option, "option"); + } HcatDelegator d = new HcatDelegator(appConf, execService); return d.dropDatabase(getDoAsUser(), db, ifExists, option, group, permissions); @@ -607,6 +612,24 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, /** * Run a MapReduce Jar job. + * Params correspond to the REST api params + * @param jar + * @param mainClass + * @param libjars + * @param files + * @param args + * @param defines + * @param statusdir + * @param callback + * @param usehcatalog + * @return EnqueueBean + * @throws NotAuthorizedException + * @throws BusyException + * @throws BadParam + * @throws QueueException + * @throws ExecuteException + * @throws IOException + * @throws InterruptedException */ @POST @Path("mapreduce/jar") @@ -618,7 +641,8 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("arg") List args, @FormParam("define") List defines, @FormParam("statusdir") String statusdir, - @FormParam("callback") String callback) + @FormParam("callback") String callback, + @FormParam("usehcatalog") boolean usehcatalog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); @@ -627,13 +651,29 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, JarDelegator d = new JarDelegator(appConf); return d.run(getDoAsUser(), - jar, mainClass, - libjars, files, args, defines, - statusdir, callback, getCompletedUrl()); + jar, mainClass, + libjars, files, args, defines, + statusdir, callback, usehcatalog, getCompletedUrl()); } /** * Run a Pig job. + * Params correspond to the REST api params + * @param execute + * @param srcFile + * @param pigArgs + * @param otherFiles + * @param statusdir + * @param callback + * @param usehcatalog + * @return EnqueueBean + * @throws NotAuthorizedException + * @throws BusyException + * @throws BadParam + * @throws QueueException + * @throws ExecuteException + * @throws IOException + * @throws InterruptedException */ @POST @Path("pig") @@ -643,18 +683,20 @@ public EnqueueBean pig(@FormParam("execute") String execute, @FormParam("arg") List pigArgs, @FormParam("files") String otherFiles, @FormParam("statusdir") String statusdir, - @FormParam("callback") String callback) + @FormParam("callback") String callback, + @FormParam("usehcatalog") boolean usehcatalog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); - if (execute == null && srcFile == null) + if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); + } PigDelegator d = new PigDelegator(appConf); return d.run(getDoAsUser(), execute, srcFile, pigArgs, otherFiles, - statusdir, callback, getCompletedUrl()); + statusdir, callback, usehcatalog, getCompletedUrl()); } /** @@ -671,8 +713,9 @@ public EnqueueBean hive(@FormParam("execute") String execute, throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); - if (execute == null && srcFile == null) + if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); + } HiveDelegator d = new HiveDelegator(appConf); return d.run(getDoAsUser(), execute, srcFile, defines, @@ -745,8 +788,9 @@ public void verifyUser() throws NotAuthorizedException { String requestingUser = getRequestingUser(); if (requestingUser == null) { String msg = "No user found."; - if (!UserGroupInformation.isSecurityEnabled()) + if (!UserGroupInformation.isSecurityEnabled()) { msg += " Missing " + PseudoAuthenticator.USER_NAME + " parameter."; + } throw new NotAuthorizedException(msg); } if(doAs != null && !doAs.equals(requestingUser)) { @@ -755,9 +799,10 @@ public void verifyUser() throws NotAuthorizedException { ProxyUserSupport.validate(requestingUser, getRequestingHost(requestingUser, request), doAs); } } + /** * All 'tasks' spawned by WebHCat should be run as this user. W/o doAs query parameter - * this is just the user making the request (or + * this is just the user making the request (or * {@link org.apache.hadoop.security.authentication.client.PseudoAuthenticator#USER_NAME} * query param). * @return value of doAs query parameter or {@link #getRequestingUser()} @@ -770,8 +815,9 @@ private String getDoAsUser() { */ public void verifyParam(String param, String name) throws BadParam { - if (param == null) + if (param == null) { throw new BadParam("Missing " + name + " parameter"); + } } /** @@ -779,8 +825,9 @@ public void verifyParam(String param, String name) */ public void verifyParam(List param, String name) throws BadParam { - if (param == null || param.isEmpty()) + if (param == null || param.isEmpty()) { throw new BadParam("Missing " + name + " parameter"); + } } public static final Pattern DDL_ID = Pattern.compile("[a-zA-Z]\\w*"); @@ -795,8 +842,9 @@ public void verifyDdlParam(String param, String name) throws BadParam { verifyParam(param, name); Matcher m = DDL_ID.matcher(param); - if (!m.matches()) + if (!m.matches()) { throw new BadParam("Invalid DDL identifier " + name); + } } /** * Get the user name from the security context, i.e. the user making the HTTP request. @@ -804,11 +852,13 @@ public void verifyDdlParam(String param, String name) * value of user.name query param, in kerberos mode it's the kinit'ed user. */ private String getRequestingUser() { - if (theSecurityContext == null) + if (theSecurityContext == null) { return null; - if (theSecurityContext.getUserPrincipal() == null) + } + if (theSecurityContext.getUserPrincipal() == null) { return null; - //map hue/foo.bar@something.com->hue since user group checks + } + //map hue/foo.bar@something.com->hue since user group checks // and config files are in terms of short name return UserGroupInformation.createRemoteUser( theSecurityContext.getUserPrincipal().getName()).getShortUserName(); @@ -818,15 +868,17 @@ private String getRequestingUser() { * The callback url on this server when a task is completed. */ public String getCompletedUrl() { - if (theUriInfo == null) + if (theUriInfo == null) { return null; - if (theUriInfo.getBaseUri() == null) + } + if (theUriInfo.getBaseUri() == null) { return null; + } return theUriInfo.getBaseUri() + VERSION + "/internal/complete/$jobId"; } /** - * Returns canonical host name from which the request is made; used for doAs validation + * Returns canonical host name from which the request is made; used for doAs validation */ private static String getRequestingHost(String requestingUser, HttpServletRequest request) { final String unkHost = "???"; diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index f9f6c94..4021a5d 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -51,9 +51,9 @@ public EnqueueBean run(String user, JarDelegator d = new JarDelegator(appConf); return d.run(user, - appConf.streamingJar(), null, - null, null, args, defines, - statusdir, callback, completedUrl); + appConf.streamingJar(), null, + null, null, args, defines, + statusdir, callback, false, completedUrl); } private List makeArgs(List inputs, @@ -76,12 +76,15 @@ public EnqueueBean run(String user, args.add("-reducer"); args.add(reducer); - for (String f : files) + for (String f : files) { args.add("-file" + f); - for (String d : defines) + } + for (String d : defines) { args.add("-D" + d); - for (String e : cmdenvs) + } + for (String e : cmdenvs) { args.add("-cmdenv" + e); + } args.addAll(jarArgs); return args; diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/MSTokenCleanOutputFormat.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/MSTokenCleanOutputFormat.java new file mode 100644 index 0000000..8d517ae --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/MSTokenCleanOutputFormat.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton.tool; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hcatalog.common.HCatUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Outputformat that cancels hcatalog delegation token once job is done. + */ +public class MSTokenCleanOutputFormat + extends NullOutputFormat { + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return new CleanupOutputCommitter(); + } + + static class CleanupOutputCommitter extends OutputCommitter { + private static final Logger LOG + = LoggerFactory.getLogger(CleanupOutputCommitter.class); + + @Override + public void abortTask(TaskAttemptContext taskContext) { } + + @Override + public void cleanupJob(JobContext jobContext) { } + + @Override + public void commitTask(TaskAttemptContext taskContext) { } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + + @Override + public void setupJob(JobContext jobContext) { } + + @Override + public void setupTask(TaskAttemptContext taskContext) { } + + @Override + public void commitJob(JobContext jobContext) { + cancelHcatDelegationTokens(jobContext); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) { + cancelHcatDelegationTokens(jobContext); + } + + private void cancelHcatDelegationTokens(JobContext context) { + try{ + docancelHcatDelegationTokens(context); + }catch(Throwable t){ + LOG.warn("Error cancelling delegation token", t); + } + } + + private void docancelHcatDelegationTokens(JobContext context) { + //Cancel hive metastore token + HiveMetaStoreClient client = null; + try { + HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); + client = HCatUtil.getHiveClient(hiveConf); + String tokenStrForm = client.getTokenStrForm(); + if (tokenStrForm != null) { + client.cancelDelegationToken(tokenStrForm); + } + } catch (Exception e) { + LOG.warn("Failed to cancel delegation token", e); + } finally { + HCatUtil.closeHiveClientQuietly(client); + } + } + + + }; + +} diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 4deafbb..cfc6b2b 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -24,6 +24,7 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -40,6 +41,10 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; @@ -53,6 +58,10 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.templeton.SecureProxySupport; +import org.apache.hive.hcatalog.templeton.UgiFactory; +import org.apache.thrift.TException; /** * A Map Reduce job that will start another job. @@ -79,8 +88,11 @@ public static final int WATCHER_TIMEOUT_SECS = 10; public static final int KEEP_ALIVE_MSEC = 60 * 1000; - public static final String TOKEN_FILE_ARG_PLACEHOLDER - = "__WEBHCAT_TOKEN_FILE_LOCATION__"; + public static final String TOKEN_FILE_ARG_PLACEHOLDER + = "__WEBHCAT_TOKEN_FILE_LOCATION__"; + + public static final String HIVE_MS_DTOKEN_ENABLE_ARG + = "__TEMPLETON_FETCH_HIVE_METASTORE_DELEGATION_TOKEN__"; private static TrivialExecService execService = TrivialExecService.getInstance(); @@ -110,8 +122,8 @@ protected Process startJob(Context context, String user, //Token is available, so replace the placeholder String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile; for(int i=0; i of + + if(isHMSDelegationNeeded(fetchHMetaStoreToken, hconf)){ + //need to cancel the meta store delegation token after the job is done + //setup a OutputFormat that can do that + job.setOutputFormatClass(MSTokenCleanOutputFormat.class); + } else { + NullOutputFormat of = new NullOutputFormat(); - job.setOutputFormatClass(of.getClass()); + job.setOutputFormatClass(of.getClass()); + } job.setNumReduceTasks(0); JobClient jc = new JobClient(new JobConf(job.getConfiguration())); Token mrdt = jc.getDelegationToken(new Text("mr token")); job.getCredentials().addToken(new Text("mr token"), mrdt); + + if(isHMSDelegationNeeded(fetchHMetaStoreToken, hconf)){ + // Get a token for the Hive metastore. + LOG.debug("Getting delegation token from hive metastore"); + addHMSToken(job, user); + } + job.submit(); submittedJobId = job.getJobID(); @@ -342,10 +387,73 @@ public int run(String[] args) } + private boolean isHMSDelegationNeeded(boolean fetchHMetaStoreToken, HiveConf hconf) { + return fetchHMetaStoreToken + && hconf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); + } + + private void addHMSToken(Job job, String user) + throws MetaException, IOException, InterruptedException, TException { + Token + hiveToken = + new Token(); + hiveToken.decodeFromUrlString(buildHcatDelegationToken(user)); + job.getCredentials().addToken(new + Text(SecureProxySupport.HCAT_SERVICE), hiveToken); + } + + private String[] removeHMSTokenArg(String[] args) { + String[] newArgs = new String[args.length-1]; + int i = 0; + for(String arg : args){ + if(!arg.equals(HIVE_MS_DTOKEN_ENABLE_ARG)){ + newArgs[i++] = arg; + } + } + if(i != newArgs.length){ + //should never happen! + throw new AssertionError("Error creating args list, " + + "while processing -usehcatalog arg. " + " i " + newArgs.length ); + } + return newArgs; + } + + private boolean checkHMSTokenArg(String[] args) { + for(String arg : args){ + if(arg.equals(HIVE_MS_DTOKEN_ENABLE_ARG)){ + return true; + } + } + return false; + } + public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new TempletonControllerJob(), args); - if (ret != 0) + if (ret != 0) { System.err.println("TempletonControllerJob failed!"); + } System.exit(ret); } + + private String buildHcatDelegationToken(String user) + throws IOException, InterruptedException, MetaException, TException { + final HiveConf c = new HiveConf(); + LOG.debug("Creating hive metastore delegation token for user " + user); + final UserGroupInformation ugi = UgiFactory.getUgi(user); + UserGroupInformation real = ugi.getRealUser(); + String s = real.doAs(new PrivilegedExceptionAction() { + public String run() + throws IOException, MetaException, TException, InterruptedException { + final HiveMetaStoreClient client = new HiveMetaStoreClient(c); + return ugi.doAs(new PrivilegedExceptionAction() { + public String run() + throws IOException, MetaException, TException, InterruptedException { + String u = ugi.getUserName(); + return client.getDelegationToken(u); + } + }); + } + }); + return s; + } }