Index: src/java/org/apache/hcatalog/mapreduce/Security.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/Security.java (revision 0) +++ src/java/org/apache/hcatalog/mapreduce/Security.java (revision 0) @@ -0,0 +1,138 @@ +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.thrift.DelegationTokenSelector; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.thrift.TException; + +final class Security { + + // making sure this is not initialized unless needed + private static final class LazyHolder { + public static final Security INSTANCE = new Security(); + } + + private static Map> tokenMap = new HashMap>(); + + public static Security getInstance() { + return LazyHolder.INSTANCE; + } + + // a signature string to associate with a HCatTableInfo - essentially + // a concatenation of dbname, tablename and partition keyvalues. + private String getTokenSignature(OutputJobInfo outputJobInfo) { + StringBuilder result = new StringBuilder(""); + String dbName = outputJobInfo.getDatabaseName(); + if(dbName != null) { + result.append(dbName); + } + String tableName = outputJobInfo.getTableName(); + if(tableName != null) { + result.append("+" + tableName); + } + Map partValues = outputJobInfo.getPartitionValues(); + if(partValues != null) { + for(Entry entry: partValues.entrySet()) { + result.append("+" + entry.getKey() + "=" + entry.getValue()); + } + } + return result.toString(); + } + + void handleSecurity( + Job job, + OutputJobInfo outputJobInfo, + HiveMetaStoreClient client, + Configuration conf, + boolean harRequested) + throws IOException, MetaException, TException, Exception { + if(UserGroupInformation.isSecurityEnabled()){ + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + // check if oozie has set up a hcat deleg. token - if so use it + TokenSelector tokenSelector = new DelegationTokenSelector(); + // TODO: will oozie use a "service" called "oozie" - then instead of + // new Text() do new Text("oozie") below - if this change is made also + // remember to do: + // job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie"); + // Also change code in OutputCommitter.cleanupJob() to cancel the + // token only if token.service is not "oozie" - remove the condition of + // HCAT_KEY_TOKEN_SIGNATURE != null in that code. + Token token = tokenSelector.selectToken( + new Text(), ugi.getTokens()); + if(token != null) { + + job.getCredentials().addToken(new Text(ugi.getUserName()),token); + + } else { + + // we did not get token set up by oozie, let's get them ourselves here. + // we essentially get a token per unique Output HCatTableInfo - this is + // done because through Pig, setOutput() method is called multiple times + // We want to only get the token once per unique output HCatTableInfo - + // we cannot just get one token since in multi-query case (> 1 store in 1 job) + // or the case when a single pig script results in > 1 jobs, the single + // token will get cancelled by the output committer and the subsequent + // stores will fail - by tying the token with the concatenation of + // dbname, tablename and partition keyvalues of the output + // TableInfo, we can have as many tokens as there are stores and the TokenSelector + // will correctly pick the right tokens which the committer will use and + // cancel. + + String tokenSignature = getTokenSignature(outputJobInfo); + if(tokenMap.get(tokenSignature) == null) { + // get delegation tokens from hcat server and store them into the "job" + // These will be used in to publish partitions to + // hcat normally in OutputCommitter.commitJob() + // when the JobTracker in Hadoop MapReduce starts supporting renewal of + // arbitrary tokens, the renewer should be the principal of the JobTracker + tokenMap.put(tokenSignature, HCatUtil.extractThriftToken( + client.getDelegationToken(ugi.getUserName()), + tokenSignature)); + } + + String jcTokenSignature = "jc."+tokenSignature; + if (harRequested){ + if(tokenMap.get(jcTokenSignature) == null) { + tokenMap.put(jcTokenSignature, + HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName())); + } + } + + job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature), + tokenMap.get(tokenSignature)); + // this will be used by the outputcommitter to pass on to the metastore client + // which in turn will pass on to the TokenSelector so that it can select + // the right token. + job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature); + + if (harRequested){ + job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature), + tokenMap.get(jcTokenSignature)); + + job.getConfiguration().set( + HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature); + job.getConfiguration().set( + HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM, + tokenMap.get(jcTokenSignature).encodeToUrlString()); + // LOG.info("Set hive dt["+tokenSignature+"]"); + // LOG.info("Set jt dt["+jcTokenSignature+"]"); + } + } + } + } +} \ No newline at end of file Index: src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (revision 1208851) +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (working copy) @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,18 +37,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.thrift.DelegationTokenSelector; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -61,10 +54,8 @@ * and should be given as null. The value is the HCatRecord to write.*/ public class HCatOutputFormat extends HCatBaseOutputFormat { -// static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class); + static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class); - private static Map> tokenMap = new HashMap>(); - private static int maxDynamicPartitions; private static boolean harRequested; @@ -88,12 +79,12 @@ if (table.getPartitionKeysSize() == 0 ){ if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())){ // attempt made to save partition values in non-partitioned table - throw error. - throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, + throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, "Partition values specified for non-partitioned table"); } // non-partitioned table outputJobInfo.setPartitionValues(new HashMap()); - + } else { // partitioned table, we expect partition values // convert user specified map to have lower case key names @@ -117,12 +108,12 @@ dynamicPartitioningKeys.add(fs.getName().toLowerCase()); } } - + if (valueMap.size() + dynamicPartitioningKeys.size() != table.getPartitionKeysSize()){ // If this isn't equal, then bogus key values have been inserted, error out. throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified"); } - + outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys); String dynHash; if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){ @@ -177,79 +168,12 @@ FsPermission.setUMask(conf, FsPermission.getDefault().applyUMask( tblPath.getFileSystem(conf).getFileStatus(tblPath).getPermission())); - if(UserGroupInformation.isSecurityEnabled()){ - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - // check if oozie has set up a hcat deleg. token - if so use it - TokenSelector tokenSelector = new DelegationTokenSelector(); - // TODO: will oozie use a "service" called "oozie" - then instead of - // new Text() do new Text("oozie") below - if this change is made also - // remember to do: - // job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie"); - // Also change code in OutputCommitter.cleanupJob() to cancel the - // token only if token.service is not "oozie" - remove the condition of - // HCAT_KEY_TOKEN_SIGNATURE != null in that code. - Token token = tokenSelector.selectToken( - new Text(), ugi.getTokens()); - if(token != null) { - - job.getCredentials().addToken(new Text(ugi.getUserName()),token); - - } else { - - // we did not get token set up by oozie, let's get them ourselves here. - // we essentially get a token per unique Output HCatTableInfo - this is - // done because through Pig, setOutput() method is called multiple times - // We want to only get the token once per unique output HCatTableInfo - - // we cannot just get one token since in multi-query case (> 1 store in 1 job) - // or the case when a single pig script results in > 1 jobs, the single - // token will get cancelled by the output committer and the subsequent - // stores will fail - by tying the token with the concatenation of - // dbname, tablename and partition keyvalues of the output - // TableInfo, we can have as many tokens as there are stores and the TokenSelector - // will correctly pick the right tokens which the committer will use and - // cancel. - - String tokenSignature = getTokenSignature(outputJobInfo); - if(tokenMap.get(tokenSignature) == null) { - // get delegation tokens from hcat server and store them into the "job" - // These will be used in to publish partitions to - // hcat normally in OutputCommitter.commitJob() - // when the JobTracker in Hadoop MapReduce starts supporting renewal of - // arbitrary tokens, the renewer should be the principal of the JobTracker - tokenMap.put(tokenSignature, HCatUtil.extractThriftToken( - client.getDelegationToken(ugi.getUserName()), - tokenSignature)); - } - - String jcTokenSignature = "jc."+tokenSignature; - if (harRequested){ - if(tokenMap.get(jcTokenSignature) == null) { - tokenMap.put(jcTokenSignature, - HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName())); - } - } - - job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature), - tokenMap.get(tokenSignature)); - // this will be used by the outputcommitter to pass on to the metastore client - // which in turn will pass on to the TokenSelector so that it can select - // the right token. - job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature); - - if (harRequested){ - job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature), - tokenMap.get(jcTokenSignature)); - - job.getConfiguration().set( - HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature); - job.getConfiguration().set( - HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM, - tokenMap.get(jcTokenSignature).encodeToUrlString()); - // LOG.info("Set hive dt["+tokenSignature+"]"); - // LOG.info("Set jt dt["+jcTokenSignature+"]"); - } - } - } + try { + UserGroupInformation.class.getMethod("isSecurityEnabled"); + Security.getInstance().handleSecurity(job, outputJobInfo, client, conf, harRequested); + } catch (NoSuchMethodException e) { + LOG.info("Security is not supported by this version of hadoop."); + } } catch(Exception e) { if( e instanceof HCatException ) { throw (HCatException) e; @@ -264,27 +188,6 @@ } } - // a signature string to associate with a HCatTableInfo - essentially - // a concatenation of dbname, tablename and partition keyvalues. - private static String getTokenSignature(OutputJobInfo outputJobInfo) { - StringBuilder result = new StringBuilder(""); - String dbName = outputJobInfo.getDatabaseName(); - if(dbName != null) { - result.append(dbName); - } - String tableName = outputJobInfo.getTableName(); - if(tableName != null) { - result.append("+" + tableName); - } - Map partValues = outputJobInfo.getPartitionValues(); - if(partValues != null) { - for(Entry entry: partValues.entrySet()) { - result.append("+" + entry.getKey() + "=" + entry.getValue()); - } - } - return result.toString(); - } - /** * Set the schema for the data being written out to the partition. The * table schema is used by default for the partition if this is not called. @@ -331,7 +234,13 @@ static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException { HiveConf hiveConf = getHiveConf(url, conf); // HCatUtil.logHiveConf(LOG, hiveConf); - return new HiveMetaStoreClient(hiveConf); + try { + return new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + LOG.error("Error connecting to the metastore (conf follows): "+e.getMessage(), e); + HCatUtil.logHiveConf(LOG, hiveConf); + throw e; + } } @@ -343,7 +252,7 @@ hiveConf.set("hive.metastore.local", "false"); hiveConf.set(ConfVars.METASTOREURIS.varname, url); - + String kerberosPrincipal = conf.get(HCatConstants.HCAT_METASTORE_PRINCIPAL); if (kerberosPrincipal == null){ kerberosPrincipal = conf.get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname); @@ -351,7 +260,7 @@ if (kerberosPrincipal != null){ hiveConf.setBoolean(ConfVars.METASTORE_USE_THRIFT_SASL.varname, true); hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, kerberosPrincipal); - } + } if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { hiveConf.set("hive.metastore.token.signature", conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); } @@ -381,12 +290,12 @@ } } - + // figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); }else{ - maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions + maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions } harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED); return hiveConf;