From 9d7ad418dc635672aa47b792f3f35bf00508ebfa Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 11 Aug 2015 09:10:29 -0700 Subject: [PATCH] HBASE-14208 Remove yarn dependencies on -common and -client --- hbase-client/pom.xml | 46 --- .../hadoop/hbase/security/token/TokenUtil.java | 374 --------------------- hbase-common/pom.xml | 4 - .../org/apache/hadoop/hbase/security/User.java | 75 ----- .../hadoop/hbase/security/token/TokenUtil.java | 374 +++++++++++++++++++++ 5 files changed, 374 insertions(+), 499 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index dc0da77..425bd05 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -291,52 +291,6 @@ - - org.apache.hadoop - hadoop-mapreduce-client-core - - - com.sun.jersey.jersey-test-framework - jersey-test-framework-grizzly2 - - - javax.servlet - servlet-api - - - com.sun.jersey - jersey-server - - - com.sun.jersey - jersey-core - - - com.sun.jersey - jersey-json - - - com.sun.jersey.contribs - jersey-guice - - - com.google.inject - guice - - - com.google.inject.extensions - guice-servlet - - - org.codehaus.jackson - jackson-jaxrs - - - org.codehaus.jackson - jackson-xc - - - diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java deleted file mode 100644 index 9be33d7..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.security.token; - -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.security.PrivilegedExceptionAction; - -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.zookeeper.KeeperException; - -/** - * Utility methods for obtaining authentication tokens. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public class TokenUtil { - // This class is referenced indirectly by User out in common; instances are created by reflection - private static final Log LOG = LogFactory.getLog(TokenUtil.class); - - /** - * Obtain and return an authentication token for the current user. - * @param conf the configuration for connecting to the cluster - * @return the authentication token instance - * @deprecated Replaced by {@link #obtainToken(Connection)} - */ - @Deprecated - public static Token obtainToken( - Configuration conf) throws IOException { - try (Connection connection = ConnectionFactory.createConnection(conf)) { - return obtainToken(connection); - } - } - - /** - * Obtain and return an authentication token for the current user. - * @param conn The HBase cluster connection - * @return the authentication token instance - */ - public static Token obtainToken( - Connection conn) throws IOException { - Table meta = null; - try { - meta = conn.getTable(TableName.META_TABLE_NAME); - CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW); - AuthenticationProtos.AuthenticationService.BlockingInterface service = - AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); - AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null, - AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance()); - - return ProtobufUtil.toToken(response.getToken()); - } catch (ServiceException se) { - ProtobufUtil.toIOException(se); - } finally { - if (meta != null) { - meta.close(); - } - } - // dummy return for ServiceException block - return null; - } - - /** - * Obtain and return an authentication token for the current user. - * @param conn The HBase cluster connection - * @return the authentication token instance - */ - public static Token obtainToken( - final Connection conn, User user) throws IOException, InterruptedException { - return user.runAs(new PrivilegedExceptionAction>() { - @Override - public Token run() throws Exception { - return obtainToken(conn); - } - }); - } - - - private static Text getClusterId(Token token) - throws IOException { - return token.getService() != null - ? token.getService() : new Text("default"); - } - - /** - * Obtain an authentication token for the given user and add it to the - * user's credentials. - * @param conf The configuration for connecting to the cluster - * @param user The user for whom to obtain the token - * @throws IOException If making a remote call to the authentication service fails - * @throws InterruptedException If executing as the given user is interrupted - * @deprecated Replaced by {@link #obtainAndCacheToken(Connection,User)} - */ - @Deprecated - public static void obtainAndCacheToken(final Configuration conf, - UserGroupInformation user) - throws IOException, InterruptedException { - Connection conn = ConnectionFactory.createConnection(conf); - try { - UserProvider userProvider = UserProvider.instantiate(conf); - obtainAndCacheToken(conn, userProvider.create(user)); - } finally { - conn.close(); - } - } - - /** - * Obtain an authentication token for the given user and add it to the - * user's credentials. - * @param conn The HBase cluster connection - * @param user The user for whom to obtain the token - * @throws IOException If making a remote call to the authentication service fails - * @throws InterruptedException If executing as the given user is interrupted - */ - public static void obtainAndCacheToken(final Connection conn, - User user) - throws IOException, InterruptedException { - try { - Token token = obtainToken(conn, user); - - if (token == null) { - throw new IOException("No token returned for user " + user.getName()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Obtained token " + token.getKind().toString() + " for user " + - user.getName()); - } - user.addToken(token); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw ie; - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new UndeclaredThrowableException(e, - "Unexpected exception obtaining token for user " + user.getName()); - } - } - - /** - * Obtain an authentication token on behalf of the given user and add it to - * the credentials for the given map reduce job. - * @param conf The configuration for connecting to the cluster - * @param user The user for whom to obtain the token - * @param job The job instance in which the token should be stored - * @throws IOException If making a remote call to the authentication service fails - * @throws InterruptedException If executing as the given user is interrupted - * @deprecated Replaced by {@link #obtainTokenForJob(Connection,User,Job)} - */ - @Deprecated - public static void obtainTokenForJob(final Configuration conf, - UserGroupInformation user, Job job) - throws IOException, InterruptedException { - Connection conn = ConnectionFactory.createConnection(conf); - try { - UserProvider userProvider = UserProvider.instantiate(conf); - obtainTokenForJob(conn, userProvider.create(user), job); - } finally { - conn.close(); - } - } - - /** - * Obtain an authentication token on behalf of the given user and add it to - * the credentials for the given map reduce job. - * @param conn The HBase cluster connection - * @param user The user for whom to obtain the token - * @param job The job instance in which the token should be stored - * @throws IOException If making a remote call to the authentication service fails - * @throws InterruptedException If executing as the given user is interrupted - */ - public static void obtainTokenForJob(final Connection conn, - User user, Job job) - throws IOException, InterruptedException { - try { - Token token = obtainToken(conn, user); - - if (token == null) { - throw new IOException("No token returned for user " + user.getName()); - } - Text clusterId = getClusterId(token); - if (LOG.isDebugEnabled()) { - LOG.debug("Obtained token " + token.getKind().toString() + " for user " + - user.getName() + " on cluster " + clusterId.toString()); - } - job.getCredentials().addToken(clusterId, token); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw ie; - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new UndeclaredThrowableException(e, - "Unexpected exception obtaining token for user " + user.getName()); - } - } - - /** - * Obtain an authentication token on behalf of the given user and add it to - * the credentials for the given map reduce job. - * @param user The user for whom to obtain the token - * @param job The job configuration in which the token should be stored - * @throws IOException If making a remote call to the authentication service fails - * @throws InterruptedException If executing as the given user is interrupted - * @deprecated Replaced by {@link #obtainTokenForJob(Connection,JobConf,User)} - */ - @Deprecated - public static void obtainTokenForJob(final JobConf job, - UserGroupInformation user) - throws IOException, InterruptedException { - Connection conn = ConnectionFactory.createConnection(job); - try { - UserProvider userProvider = UserProvider.instantiate(job); - obtainTokenForJob(conn, job, userProvider.create(user)); - } finally { - conn.close(); - } - } - - /** - * Obtain an authentication token on behalf of the given user and add it to - * the credentials for the given map reduce job. - * @param conn The HBase cluster connection - * @param user The user for whom to obtain the token - * @param job The job configuration in which the token should be stored - * @throws IOException If making a remote call to the authentication service fails - * @throws InterruptedException If executing as the given user is interrupted - */ - public static void obtainTokenForJob(final Connection conn, final JobConf job, User user) - throws IOException, InterruptedException { - try { - Token token = obtainToken(conn, user); - - if (token == null) { - throw new IOException("No token returned for user " + user.getName()); - } - Text clusterId = getClusterId(token); - if (LOG.isDebugEnabled()) { - LOG.debug("Obtained token " + token.getKind().toString() + " for user " + - user.getName() + " on cluster " + clusterId.toString()); - } - job.getCredentials().addToken(clusterId, token); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw ie; - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new UndeclaredThrowableException(e, - "Unexpected exception obtaining token for user "+user.getName()); - } - } - - /** - * Checks for an authentication token for the given user, obtaining a new token if necessary, - * and adds it to the credentials for the given map reduce job. - * - * @param conn The HBase cluster connection - * @param user The user for whom to obtain the token - * @param job The job configuration in which the token should be stored - * @throws IOException If making a remote call to the authentication service fails - * @throws InterruptedException If executing as the given user is interrupted - */ - public static void addTokenForJob(final Connection conn, final JobConf job, User user) - throws IOException, InterruptedException { - - Token token = getAuthToken(conn.getConfiguration(), user); - if (token == null) { - token = obtainToken(conn, user); - } - job.getCredentials().addToken(token.getService(), token); - } - - /** - * Checks for an authentication token for the given user, obtaining a new token if necessary, - * and adds it to the credentials for the given map reduce job. - * - * @param conn The HBase cluster connection - * @param user The user for whom to obtain the token - * @param job The job instance in which the token should be stored - * @throws IOException If making a remote call to the authentication service fails - * @throws InterruptedException If executing as the given user is interrupted - */ - public static void addTokenForJob(final Connection conn, User user, Job job) - throws IOException, InterruptedException { - Token token = getAuthToken(conn.getConfiguration(), user); - if (token == null) { - token = obtainToken(conn, user); - } - job.getCredentials().addToken(token.getService(), token); - } - - /** - * Checks if an authentication tokens exists for the connected cluster, - * obtaining one if needed and adding it to the user's credentials. - * - * @param conn The HBase cluster connection - * @param user The user for whom to obtain the token - * @throws IOException If making a remote call to the authentication service fails - * @throws InterruptedException If executing as the given user is interrupted - * @return true if the token was added, false if it already existed - */ - public static boolean addTokenIfMissing(Connection conn, User user) - throws IOException, InterruptedException { - Token token = getAuthToken(conn.getConfiguration(), user); - if (token == null) { - token = obtainToken(conn, user); - user.getUGI().addToken(token.getService(), token); - return true; - } - return false; - } - - /** - * Get the authentication token of the user for the cluster specified in the configuration - * @return null if the user does not have the token, otherwise the auth token for the cluster. - */ - private static Token getAuthToken(Configuration conf, User user) - throws IOException, InterruptedException { - ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null); - try { - String clusterId = ZKClusterId.readClusterIdZNode(zkw); - if (clusterId == null) { - throw new IOException("Failed to get cluster ID"); - } - return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens()); - } catch (KeeperException e) { - throw new IOException(e); - } finally { - zkw.close(); - } - } -} diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml index d782c6c..3e315e4 100644 --- a/hbase-common/pom.xml +++ b/hbase-common/pom.xml @@ -357,10 +357,6 @@ org.apache.hadoop hadoop-common - - org.apache.hadoop - hadoop-mapreduce-client-core - diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java index 0efb402..8248326 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java @@ -29,8 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Methods; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -100,30 +98,6 @@ public abstract class User { throws IOException, InterruptedException; /** - * Requests an authentication token for this user and stores it in the - * user's credentials. - * - * @throws IOException - * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(Connection,User,Job)} - * instead. - */ - @Deprecated - public abstract void obtainAuthTokenForJob(Configuration conf, Job job) - throws IOException, InterruptedException; - - /** - * Requests an authentication token for this user and stores it in the - * user's credentials. - * - * @throws IOException - * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(Connection,JobConf,User)} - * instead. - */ - @Deprecated - public abstract void obtainAuthTokenForJob(JobConf job) - throws IOException, InterruptedException; - - /** * Returns the Token of the specified kind associated with this user, * or null if the Token is not present. * @@ -312,55 +286,6 @@ public abstract class User { return ugi.doAs(action); } - @Override - public void obtainAuthTokenForJob(Configuration conf, Job job) - throws IOException, InterruptedException { - try { - Class c = Class.forName( - "org.apache.hadoop.hbase.security.token.TokenUtil"); - Methods.call(c, null, "obtainTokenForJob", - new Class[]{Configuration.class, UserGroupInformation.class, - Job.class}, - new Object[]{conf, ugi, job}); - } catch (ClassNotFoundException cnfe) { - throw new RuntimeException("Failure loading TokenUtil class, " - +"is secure RPC available?", cnfe); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw ie; - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new UndeclaredThrowableException(e, - "Unexpected error calling TokenUtil.obtainAndCacheToken()"); - } - } - - @Override - public void obtainAuthTokenForJob(JobConf job) - throws IOException, InterruptedException { - try { - Class c = Class.forName( - "org.apache.hadoop.hbase.security.token.TokenUtil"); - Methods.call(c, null, "obtainTokenForJob", - new Class[]{JobConf.class, UserGroupInformation.class}, - new Object[]{job, ugi}); - } catch (ClassNotFoundException cnfe) { - throw new RuntimeException("Failure loading TokenUtil class, " - +"is secure RPC available?", cnfe); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw ie; - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new UndeclaredThrowableException(e, - "Unexpected error calling TokenUtil.obtainAndCacheToken()"); - } - } - /** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */ public static User createUserForTesting(Configuration conf, String name, String[] groups) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java new file mode 100644 index 0000000..9be33d7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; + +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.zookeeper.KeeperException; + +/** + * Utility methods for obtaining authentication tokens. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class TokenUtil { + // This class is referenced indirectly by User out in common; instances are created by reflection + private static final Log LOG = LogFactory.getLog(TokenUtil.class); + + /** + * Obtain and return an authentication token for the current user. + * @param conf the configuration for connecting to the cluster + * @return the authentication token instance + * @deprecated Replaced by {@link #obtainToken(Connection)} + */ + @Deprecated + public static Token obtainToken( + Configuration conf) throws IOException { + try (Connection connection = ConnectionFactory.createConnection(conf)) { + return obtainToken(connection); + } + } + + /** + * Obtain and return an authentication token for the current user. + * @param conn The HBase cluster connection + * @return the authentication token instance + */ + public static Token obtainToken( + Connection conn) throws IOException { + Table meta = null; + try { + meta = conn.getTable(TableName.META_TABLE_NAME); + CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW); + AuthenticationProtos.AuthenticationService.BlockingInterface service = + AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); + AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null, + AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance()); + + return ProtobufUtil.toToken(response.getToken()); + } catch (ServiceException se) { + ProtobufUtil.toIOException(se); + } finally { + if (meta != null) { + meta.close(); + } + } + // dummy return for ServiceException block + return null; + } + + /** + * Obtain and return an authentication token for the current user. + * @param conn The HBase cluster connection + * @return the authentication token instance + */ + public static Token obtainToken( + final Connection conn, User user) throws IOException, InterruptedException { + return user.runAs(new PrivilegedExceptionAction>() { + @Override + public Token run() throws Exception { + return obtainToken(conn); + } + }); + } + + + private static Text getClusterId(Token token) + throws IOException { + return token.getService() != null + ? token.getService() : new Text("default"); + } + + /** + * Obtain an authentication token for the given user and add it to the + * user's credentials. + * @param conf The configuration for connecting to the cluster + * @param user The user for whom to obtain the token + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + * @deprecated Replaced by {@link #obtainAndCacheToken(Connection,User)} + */ + @Deprecated + public static void obtainAndCacheToken(final Configuration conf, + UserGroupInformation user) + throws IOException, InterruptedException { + Connection conn = ConnectionFactory.createConnection(conf); + try { + UserProvider userProvider = UserProvider.instantiate(conf); + obtainAndCacheToken(conn, userProvider.create(user)); + } finally { + conn.close(); + } + } + + /** + * Obtain an authentication token for the given user and add it to the + * user's credentials. + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void obtainAndCacheToken(final Connection conn, + User user) + throws IOException, InterruptedException { + try { + Token token = obtainToken(conn, user); + + if (token == null) { + throw new IOException("No token returned for user " + user.getName()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Obtained token " + token.getKind().toString() + " for user " + + user.getName()); + } + user.addToken(token); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw ie; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected exception obtaining token for user " + user.getName()); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param conf The configuration for connecting to the cluster + * @param user The user for whom to obtain the token + * @param job The job instance in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + * @deprecated Replaced by {@link #obtainTokenForJob(Connection,User,Job)} + */ + @Deprecated + public static void obtainTokenForJob(final Configuration conf, + UserGroupInformation user, Job job) + throws IOException, InterruptedException { + Connection conn = ConnectionFactory.createConnection(conf); + try { + UserProvider userProvider = UserProvider.instantiate(conf); + obtainTokenForJob(conn, userProvider.create(user), job); + } finally { + conn.close(); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @param job The job instance in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void obtainTokenForJob(final Connection conn, + User user, Job job) + throws IOException, InterruptedException { + try { + Token token = obtainToken(conn, user); + + if (token == null) { + throw new IOException("No token returned for user " + user.getName()); + } + Text clusterId = getClusterId(token); + if (LOG.isDebugEnabled()) { + LOG.debug("Obtained token " + token.getKind().toString() + " for user " + + user.getName() + " on cluster " + clusterId.toString()); + } + job.getCredentials().addToken(clusterId, token); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw ie; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected exception obtaining token for user " + user.getName()); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param user The user for whom to obtain the token + * @param job The job configuration in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + * @deprecated Replaced by {@link #obtainTokenForJob(Connection,JobConf,User)} + */ + @Deprecated + public static void obtainTokenForJob(final JobConf job, + UserGroupInformation user) + throws IOException, InterruptedException { + Connection conn = ConnectionFactory.createConnection(job); + try { + UserProvider userProvider = UserProvider.instantiate(job); + obtainTokenForJob(conn, job, userProvider.create(user)); + } finally { + conn.close(); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @param job The job configuration in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void obtainTokenForJob(final Connection conn, final JobConf job, User user) + throws IOException, InterruptedException { + try { + Token token = obtainToken(conn, user); + + if (token == null) { + throw new IOException("No token returned for user " + user.getName()); + } + Text clusterId = getClusterId(token); + if (LOG.isDebugEnabled()) { + LOG.debug("Obtained token " + token.getKind().toString() + " for user " + + user.getName() + " on cluster " + clusterId.toString()); + } + job.getCredentials().addToken(clusterId, token); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw ie; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected exception obtaining token for user "+user.getName()); + } + } + + /** + * Checks for an authentication token for the given user, obtaining a new token if necessary, + * and adds it to the credentials for the given map reduce job. + * + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @param job The job configuration in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void addTokenForJob(final Connection conn, final JobConf job, User user) + throws IOException, InterruptedException { + + Token token = getAuthToken(conn.getConfiguration(), user); + if (token == null) { + token = obtainToken(conn, user); + } + job.getCredentials().addToken(token.getService(), token); + } + + /** + * Checks for an authentication token for the given user, obtaining a new token if necessary, + * and adds it to the credentials for the given map reduce job. + * + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @param job The job instance in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void addTokenForJob(final Connection conn, User user, Job job) + throws IOException, InterruptedException { + Token token = getAuthToken(conn.getConfiguration(), user); + if (token == null) { + token = obtainToken(conn, user); + } + job.getCredentials().addToken(token.getService(), token); + } + + /** + * Checks if an authentication tokens exists for the connected cluster, + * obtaining one if needed and adding it to the user's credentials. + * + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + * @return true if the token was added, false if it already existed + */ + public static boolean addTokenIfMissing(Connection conn, User user) + throws IOException, InterruptedException { + Token token = getAuthToken(conn.getConfiguration(), user); + if (token == null) { + token = obtainToken(conn, user); + user.getUGI().addToken(token.getService(), token); + return true; + } + return false; + } + + /** + * Get the authentication token of the user for the cluster specified in the configuration + * @return null if the user does not have the token, otherwise the auth token for the cluster. + */ + private static Token getAuthToken(Configuration conf, User user) + throws IOException, InterruptedException { + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null); + try { + String clusterId = ZKClusterId.readClusterIdZNode(zkw); + if (clusterId == null) { + throw new IOException("Failed to get cluster ID"); + } + return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens()); + } catch (KeeperException e) { + throw new IOException(e); + } finally { + zkw.close(); + } + } +} -- 2.4.3