diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index d1530dc3ddc1f4d574df7bb99d688e168425e79c..32186391e7e4cfc9b4d06d7376663e82ec08d9e6 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -42,9 +42,13 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -60,11 +64,14 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.KeeperException; /** * HBaseStorageHandler provides a HiveStorageHandler implementation for @@ -450,13 +457,37 @@ private void addHBaseResources(Configuration jobConf, private void addHBaseDelegationToken(Configuration conf) throws IOException { if (User.isHBaseSecurityEnabled(conf)) { try { - User.getCurrent().obtainAuthTokenForJob(conf,new Job(conf)); + User curUser = User.getCurrent(); + Token authToken = getAuthToken(conf, curUser); + Job job = new Job(conf); + if (authToken == null) { + curUser.obtainAuthTokenForJob(conf,job); + } else { + job.getCredentials().addToken(authToken.getService(), authToken); + } } catch (InterruptedException e) { throw new IOException("Error while obtaining hbase delegation token", e); } } } + /** + * Get the authentication token of the user for the cluster specified in the configuration + * @return null if the user does not have the token, otherwise the auth token for the cluster. + */ + private static Token getAuthToken(Configuration conf, User user) + throws IOException, InterruptedException { + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null); + try { + String clusterId = ZKClusterId.readClusterIdZNode(zkw); + return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens()); + } catch (KeeperException e) { + throw new IOException(e); + } finally { + zkw.close(); + } + } + @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { try {