From 9299c8b123eaa3dd8bbf455624930c6ce9be2b09 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Sat, 30 May 2015 00:37:00 -0400 Subject: [PATCH] HIVE-10857 SASL/Kerberos support for AccumuloStorageHandler Leverage existing SASL/Kerberos support in Accumulo 1.7. Implement in a backwards compatible way so as to not break 1.6 and earlier -- lots of reflection. Use KerberosToken instead of PasswordToken and fetch delegation tokens. --- .../accumulo/AccumuloConnectionParameters.java | 111 +++++++- .../hive/accumulo/AccumuloStorageHandler.java | 38 +++ .../hadoop/hive/accumulo/HiveAccumuloHelper.java | 280 +++++++++++++++++++++ .../accumulo/mr/HiveAccumuloTableInputFormat.java | 74 +++++- .../accumulo/mr/HiveAccumuloTableOutputFormat.java | 63 ++++- .../accumulo/TestAccumuloConnectionParameters.java | 19 ++ .../hive/accumulo/TestHiveAccumuloHelper.java | 75 ++++++ .../mr/TestHiveAccumuloTableInputFormat.java | 8 +- .../mr/TestHiveAccumuloTableOutputFormat.java | 2 +- 9 files changed, 647 insertions(+), 23 deletions(-) create mode 100644 accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java create mode 100644 accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java index 2b11f84..f34e820 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java @@ -16,14 +16,20 @@ */ package org.apache.hadoop.hive.accumulo; +import java.io.File; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; import com.google.common.base.Preconditions; @@ -31,12 +37,18 @@ * */ public class AccumuloConnectionParameters { + private static final String KERBEROS_TOKEN_CLASS = "org.apache.accumulo.core.client.security.tokens.KerberosToken"; + public static final String USER_NAME = "accumulo.user.name"; public static final String USER_PASS = "accumulo.user.pass"; public static final String ZOOKEEPERS = "accumulo.zookeepers"; public static final String INSTANCE_NAME = "accumulo.instance.name"; public static final String TABLE_NAME = "accumulo.table.name"; + // SASL/Kerberos properties + public static final String SASL_ENABLED = "accumulo.sasl.enabled"; + public static final String USER_KEYTAB = "accumulo.user.keytab"; + public static final String USE_MOCK_INSTANCE = "accumulo.mock.instance"; protected Configuration conf; @@ -84,6 +96,16 @@ public boolean useMockInstance() { return conf.getBoolean(USE_MOCK_INSTANCE, false); } + public boolean useSasl() { + Preconditions.checkNotNull(conf); + return conf.getBoolean(SASL_ENABLED, false); + } + + public String getAccumuloKeytab() { + Preconditions.checkNotNull(conf); + return conf.get(USER_KEYTAB); + } + public Instance getInstance() { String instanceName = getAccumuloInstanceName(); @@ -112,16 +134,97 @@ public Connector getConnector() throws AccumuloException, AccumuloSecurityExcept } public Connector getConnector(Instance inst) throws AccumuloException, AccumuloSecurityException { - String username = getAccumuloUserName(), password = getAccumuloPassword(); + String username = getAccumuloUserName(); // Fail with a good message if (null == username) { throw new IllegalArgumentException("Accumulo user name must be provided in hiveconf using " + USER_NAME); } - if (null == password) { - throw new IllegalArgumentException("Accumulo password must be provided in hiveconf using " + USER_PASS); + + if (useSasl()) { + return inst.getConnector(username, getKerberosToken()); + } else { + // Not using SASL/Kerberos -- use the password + String password = getAccumuloPassword(); + + if (null == password) { + throw new IllegalArgumentException("Accumulo password must be provided in hiveconf using " + USER_PASS); + } + + return inst.getConnector(username, new PasswordToken(password)); + } + } + + public AuthenticationToken getKerberosToken() { + if (!useSasl()) { + throw new IllegalArgumentException("Cannot construct KerberosToken when SASL is disabled"); + } + + final String keytab = getAccumuloKeytab(), username = getAccumuloUserName(); + + if (null != keytab) { + // Use the keytab if one was provided + return getKerberosToken(username, keytab); + } else { + // Otherwise, expect the user is already logged in + return getKerberosToken(username); + } + } + + /** + * Instantiate a KerberosToken in a backwards compatible manner. + * @param username Kerberos principal + */ + AuthenticationToken getKerberosToken(String username) { + // Get the Class + Class krbTokenClz = getKerberosTokenClass(); + + try { + // Invoke the `new KerberosToken(String)` constructor + // Expects that the user is already logged-in + Constructor constructor = krbTokenClz.getConstructor(String.class); + return constructor.newInstance(username); + } catch (NoSuchMethodException | SecurityException | InstantiationException | + IllegalArgumentException | InvocationTargetException | IllegalAccessException e) { + throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e); + } + } + + /** + * Instantiate a KerberosToken in a backwards compatible manner. + * @param username Kerberos principal + * @param keytab Keytab on local filesystem + */ + AuthenticationToken getKerberosToken(String username, String keytab) { + Class krbTokenClz = getKerberosTokenClass(); + + File keytabFile = new File(keytab); + if (!keytabFile.isFile() || !keytabFile.canRead()) { + throw new IllegalArgumentException("Keytab must be a readable file: " + keytab); } - return inst.getConnector(username, new PasswordToken(password)); + try { + // Invoke the `new KerberosToken(String, File, boolean)` constructor + // Tries to log in as the provided user with the given keytab, overriding an already logged-in user if present + Constructor constructor = krbTokenClz.getConstructor(String.class, File.class, boolean.class); + return constructor.newInstance(username, keytabFile, true); + } catch (NoSuchMethodException | SecurityException | InstantiationException | + IllegalArgumentException | InvocationTargetException | IllegalAccessException e) { + throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e); + } + } + + /** + * Attempt to instantiate the KerberosToken class + */ + Class getKerberosTokenClass() { + try { + // Instantiate the class + Class clz = JavaUtils.loadClass(KERBEROS_TOKEN_CLASS); + // Cast it to an AuthenticationToken since Connector will need that + return clz.asSubclass(AuthenticationToken.class); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Could not load KerberosToken class. >=Accumulo 1.7.0 required", e); + } } } diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java index 64eb18b..af98b6f 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java @@ -27,6 +27,11 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; +import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.fate.Fate; import org.apache.accumulo.start.Main; import org.apache.accumulo.trace.instrument.Tracer; @@ -53,6 +58,9 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -69,6 +77,7 @@ protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance(); protected AccumuloConnectionParameters connectionParams; protected Configuration conf; + protected HiveAccumuloHelper helper = new HiveAccumuloHelper(); /** * Push down table properties into the JobConf. @@ -314,6 +323,7 @@ public void rollbackDropTable(Table table) throws MetaException { // do nothing } + @SuppressWarnings("deprecation") @Override public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deserializer, ExprNodeDesc desc) { @@ -331,6 +341,7 @@ public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deseria } } + @SuppressWarnings("deprecation") @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { try { @@ -354,5 +365,32 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { } catch (IOException e) { log.error("Could not add necessary dependencies for " + serDeParams.getRowIdFactory().getClass(), e); } + + // When Kerberos is enabled, we have to add the Accumulo delegation token to the + // Job so that it gets passed down to the YARN/Tez task. + if (connectionParams.useSasl()) { + try { + // Obtain a delegation token from Accumulo + Connector conn = connectionParams.getConnector(); + AuthenticationToken token = helper.getDelegationToken(conn); + + // Make sure the Accumulo token is set in the Configuration (only a stub of the Accumulo + // AuthentiationToken is serialized, not the entire token) + InputConfigurator.setConnectorInfo(AccumuloInputFormat.class, jobConf, + connectionParams.getAccumuloUserName(), token); + OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, jobConf, + connectionParams.getAccumuloUserName(), token); + + // Convert the Accumulo token in a Hadoop token + Token accumuloToken = helper.getHadoopToken(token); + + log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); + + // Add the Hadoop token to the JobConf + helper.mergeTokenIntoJobConf(jobConf, accumuloToken); + } catch (Exception e) { + throw new RuntimeException("Failed to obtain DelegationToken for " + connectionParams.getAccumuloUserName(), e); + } + } } } diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java new file mode 100644 index 0000000..dfc5d03 --- /dev/null +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Collection; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class to hold common methods across the InputFormat, OutputFormat and StorageHandler. + */ +public class HiveAccumuloHelper { + private static final Logger log = LoggerFactory.getLogger(HiveAccumuloHelper.class); + // Constant from Accumulo's DelegationTokenImpl + public static final Text ACCUMULO_SERVICE = new Text("ACCUMULO_AUTH_TOKEN"); + + // Constants for DelegationToken reflection to continue to support 1.6 + private static final String DELEGATION_TOKEN_CONFIG_CLASS_NAME = + "org.apache.accumulo.core.client.admin.DelegationTokenConfig"; + private static final String DELEGATION_TOKEN_IMPL_CLASS_NAME = + "org.apache.accumulo.core.client.impl.DelegationTokenImpl"; + private static final String GET_DELEGATION_TOKEN_METHOD_NAME = "getDelegationToken"; + private static final String GET_IDENTIFIER_METHOD_NAME = "getIdentifier"; + private static final String GET_PASSWORD_METHOD_NAME = "getPassword"; + private static final String GET_SERVICE_NAME_METHOD_NAME = "getServiceName"; + + // Constants for ClientConfiguration and setZooKeeperInstance reflection + // to continue to support 1.5 + private static final String CLIENT_CONFIGURATION_CLASS_NAME = + "org.apache.accumulo.core.client.ClientConfiguration"; + private static final String LOAD_DEFAULT_METHOD_NAME = "loadDefault"; + private static final String SET_PROPERTY_METHOD_NAME = "setProperty"; + private static final String INSTANCE_ZOOKEEPER_HOST = "instance.zookeeper.host"; + private static final String INSTANCE_NAME = "instance.name"; + private static final String INSTANCE_RPC_SASL_ENABLED = "instance.rpc.sasl.enabled"; + private static final String SET_ZOOKEEPER_INSTANCE_METHOD_NAME = "setZooKeeperInstance"; + + // Constants for unwrapping the DelegationTokenStub into a DelegationTokenImpl + private static final String CONFIGURATOR_BASE_CLASS_NAME = + "org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase"; + private static final String UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME = "unwrapAuthenticationToken"; + + /** + * Extract the appropriate Token for Accumulo from the provided {@code user} and add it to the + * {@link JobConf}'s credentials. + * + * @param user + * User containing tokens + * @param jobConf + * The configuration for the job + * @throws IOException + * If the correct token is not found or the Token fails to be merged with the + * configuration + */ + public void addTokenFromUserToJobConf(UserGroupInformation user, JobConf jobConf) + throws IOException { + checkNotNull(user, "Provided UGI was null"); + checkNotNull(jobConf, "JobConf was null"); + + // Accumulo token already in Configuration, but the Token isn't in the Job credentials like the + // AccumuloInputFormat expects + Token accumuloToken = null; + Collection> tokens = user.getTokens(); + for (Token token : tokens) { + if (ACCUMULO_SERVICE.equals(token.getKind())) { + accumuloToken = token; + break; + } + } + + // If we didn't find the Token, we can't proceed. Log the tokens for debugging. + if (null == accumuloToken) { + log.error("Could not find accumulo token in user: " + tokens); + throw new IOException("Could not find Accumulo Token in user's tokens"); + } + + // Add the Hadoop token back to the Job, the configuration still has the necessary + // Accumulo token information. + mergeTokenIntoJobConf(jobConf, accumuloToken); + } + + /** + * Merge the provided Token into the JobConf. + * + * @param jobConf + * JobConf to merge token into + * @param accumuloToken + * The Token + * @throws IOException + * If the merging fails + */ + public void mergeTokenIntoJobConf(JobConf jobConf, Token accumuloToken) throws IOException { + JobConf accumuloJobConf = new JobConf(jobConf); + accumuloJobConf.getCredentials().addToken(accumuloToken.getService(), accumuloToken); + + // Merge them together. + ShimLoader.getHadoopShims().mergeCredentials(jobConf, accumuloJobConf); + } + + /** + * Obtain a DelegationToken from Accumulo in a backwards compatible manner. + * + * @param conn + * The Accumulo connector + * @return The DelegationToken instance + * @throws IOException + * If the token cannot be obtained + */ + public AuthenticationToken getDelegationToken(Connector conn) throws IOException { + try { + Class clz = JavaUtils.loadClass(DELEGATION_TOKEN_CONFIG_CLASS_NAME); + // DelegationTokenConfig delegationTokenConfig = new DelegationTokenConfig(); + Object delegationTokenConfig = clz.newInstance(); + + SecurityOperations secOps = conn.securityOperations(); + + Method getDelegationTokenMethod = secOps.getClass().getMethod( + GET_DELEGATION_TOKEN_METHOD_NAME, clz); + + // secOps.getDelegationToken(delegationTokenConfig) + return (AuthenticationToken) getDelegationTokenMethod.invoke(secOps, delegationTokenConfig); + } catch (Exception e) { + throw new IOException("Failed to obtain DelegationToken from Accumulo", e); + } + } + + public Token getHadoopToken(AuthenticationToken delegationToken) + throws IOException { + try { + // DelegationTokenImpl class + Class delegationTokenClass = JavaUtils.loadClass(DELEGATION_TOKEN_IMPL_CLASS_NAME); + // Methods on DelegationToken + Method getIdentifierMethod = delegationTokenClass.getMethod(GET_IDENTIFIER_METHOD_NAME); + Method getPasswordMethod = delegationTokenClass.getMethod(GET_PASSWORD_METHOD_NAME); + Method getServiceNameMethod = delegationTokenClass.getMethod(GET_SERVICE_NAME_METHOD_NAME); + + // Treat the TokenIdentifier implementation as the abstract class to avoid dependency issues + // AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); + TokenIdentifier identifier = (TokenIdentifier) getIdentifierMethod.invoke(delegationToken); + + // new Token(identifier.getBytes(), + // delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); + return new Token(identifier.getBytes(), (byte[]) + getPasswordMethod.invoke(delegationToken), identifier.getKind(), + (Text) getServiceNameMethod.invoke(delegationToken)); + } catch (Exception e) { + throw new IOException("Failed to create Hadoop token from Accumulo DelegationToken", e); + } + } + + /** + * Construct a ClientConfiguration instance in a backwards-compatible way. Allows us + * to support Accumulo 1.5 + * + * @param zookeepers + * ZooKeeper hosts + * @param instanceName + * Instance name + * @param useSasl + * Is SASL enabled + * @return A ClientConfiguration instance + * @throws IOException + * If the instance fails to be created + */ + public Object getClientConfiguration(String zookeepers, String instanceName, boolean useSasl) + throws IOException { + try { + // Construct a new instance of ClientConfiguration + Class clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME); + Method loadDefaultMethod = clientConfigClass.getMethod(LOAD_DEFAULT_METHOD_NAME); + Object clientConfig = loadDefaultMethod.invoke(null); + + // Set instance and zookeeper hosts + Method setPropertyMethod = clientConfigClass.getMethod(SET_PROPERTY_METHOD_NAME, + String.class, Object.class); + setPropertyMethod.invoke(clientConfig, INSTANCE_ZOOKEEPER_HOST, zookeepers); + setPropertyMethod.invoke(clientConfig, INSTANCE_NAME, instanceName); + + if (useSasl) { + // Defaults to not using SASL, set true if SASL is being used + setPropertyMethod.invoke(clientConfig, INSTANCE_RPC_SASL_ENABLED, true); + } + + return clientConfig; + } catch (Exception e) { + String msg = "Failed to instantiate and invoke methods on ClientConfiguration"; + log.error(msg, e); + throw new IOException(msg, e); + } + } + + /** + * Wrapper around setZooKeeperInstance(Configuration, ClientConfiguration) which only + * exists in 1.6.0 and newer. Support backwards compat. + * + * @param jobConf + * The JobConf + * @param inputOrOutputFormatClass + * The InputFormat or OutputFormat class + * @param zookeepers + * ZooKeeper hosts + * @param instanceName + * Accumulo instance name + * @param useSasl + * Is SASL enabled + * @throws IOException + * When invocation of the method fails + */ + public void setZooKeeperInstance(JobConf jobConf, Class inputOrOutputFormatClass, String + zookeepers, String instanceName, boolean useSasl) throws IOException { + try { + Class clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME); + + // get the ClientConfiguration + Object clientConfig = getClientConfiguration(zookeepers, instanceName, useSasl); + + // AccumuloOutputFormat.setZooKeeperInstance(JobConf, ClientConfiguration) or + // AccumuloInputFormat.setZooKeeperInstance(JobConf, ClientConfiguration) + Method setZooKeeperMethod = inputOrOutputFormatClass.getMethod( + SET_ZOOKEEPER_INSTANCE_METHOD_NAME, JobConf.class, clientConfigClass); + setZooKeeperMethod.invoke(null, jobConf, clientConfig); + } catch (Exception e) { + throw new IOException("Failed to invoke setZooKeeperInstance method", e); + } + } + + /** + * Wrapper around ConfiguratorBase.unwrapAuthenticationToken which only exists in + * 1.7.0 and new. Uses reflection to not break compat. + * + * @param jobConf + * JobConf object + * @param token + * The DelegationTokenStub instance + * @return A DelegationTokenImpl created from the Token in the Job's credentials + * @throws IOException + * If the token fails to be unwrapped + */ + public AuthenticationToken unwrapAuthenticationToken(JobConf jobConf, AuthenticationToken token) + throws IOException { + try { + Class configuratorBaseClass = JavaUtils.loadClass(CONFIGURATOR_BASE_CLASS_NAME); + Method unwrapAuthenticationTokenMethod = configuratorBaseClass.getMethod( + UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME, JobConf.class, AuthenticationToken.class); + // ConfiguratorBase.unwrapAuthenticationToken(conf, token); + return (AuthenticationToken) unwrapAuthenticationTokenMethod.invoke(null, jobConf, token); + } catch (Exception e) { + throw new IOException("Failed to unwrap AuthenticationToken", e); + } + } +} diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java index 08d396e..083678f 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; import org.apache.accumulo.core.client.mapred.AccumuloRowInputFormat; import org.apache.accumulo.core.client.mapred.RangeInputSplit; +import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; @@ -49,6 +50,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; import org.apache.hadoop.hive.accumulo.AccumuloHiveRow; +import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper; import org.apache.hadoop.hive.accumulo.columns.ColumnMapper; import org.apache.hadoop.hive.accumulo.columns.ColumnMapping; import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping; @@ -70,6 +72,9 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +92,7 @@ // Visible for testing protected AccumuloRowInputFormat accumuloInputFormat = new AccumuloRowInputFormat(); protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance(); + protected HiveAccumuloHelper helper = new HiveAccumuloHelper(); @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { @@ -103,7 +109,22 @@ Path[] tablePaths = FileInputFormat.getInputPaths(context); try { - final Connector connector = accumuloParams.getConnector(instance); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + final Connector connector; + + // Need to get a Connector so we look up the user's authorizations if not otherwise specified + if (accumuloParams.useSasl() && !ugi.hasKerberosCredentials()) { + // In a YARN/Tez job, don't have the Kerberos credentials anymore, use the delegation token + AuthenticationToken token = ConfiguratorBase.getAuthenticationToken( + AccumuloInputFormat.class, jobConf); + // Convert the stub from the configuration back into a normal Token + // More reflection to support 1.6 + token = helper.unwrapAuthenticationToken(jobConf, token); + connector = instance.getConnector(accumuloParams.getAccumuloUserName(), token); + } else { + // Still in the local JVM, use the username+password or Kerberos credentials + connector = accumuloParams.getConnector(instance); + } final List columnMappings = columnMapper.getColumnMappings(); final List iterators = predicateHandler.getIterators(jobConf, columnMapper); final Collection ranges = predicateHandler.getRanges(jobConf, columnMapper); @@ -254,18 +275,50 @@ protected ColumnMapper getColumnMapper(Configuration conf) throws IOException, protected void configure(JobConf conf, Instance instance, Connector connector, AccumuloConnectionParameters accumuloParams, ColumnMapper columnMapper, List iterators, Collection ranges) throws AccumuloSecurityException, - AccumuloException, SerDeException { + AccumuloException, SerDeException, IOException { // Handle implementation of Instance and invoke appropriate InputFormat method if (instance instanceof MockInstance) { setMockInstance(conf, instance.getInstanceName()); } else { - setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers()); + setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers(), + accumuloParams.useSasl()); } // Set the username/passwd for the Accumulo connection - setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), - new PasswordToken(accumuloParams.getAccumuloPassword())); + if (accumuloParams.useSasl()) { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + // If we have Kerberos credentials, we should obtain the delegation token + if (ugi.hasKerberosCredentials()) { + Connector conn = accumuloParams.getConnector(); + AuthenticationToken token = helper.getDelegationToken(conn); + + // Send the DelegationToken down to the Configuration for Accumulo to use + setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), token); + + // Convert the Accumulo token in a Hadoop token + Token accumuloToken = helper.getHadoopToken(token); + + log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); + + // Add the Hadoop token to the JobConf + helper.mergeTokenIntoJobConf(conf, accumuloToken); + + if (!ugi.addToken(accumuloToken)) { + throw new IOException("Failed to add Accumulo Token to UGI"); + } + } + + try { + helper.addTokenFromUserToJobConf(ugi, conf); + } catch (IOException e) { + throw new IOException("Current user did not contain necessary delegation Tokens " + ugi, e); + } + } else { + setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), + new PasswordToken(accumuloParams.getAccumuloPassword())); + } // Read from the given Accumulo table setInputTableName(conf, accumuloParams.getAccumuloTableName()); @@ -312,11 +365,18 @@ protected void setMockInstance(JobConf conf, String instanceName) { } @SuppressWarnings("deprecation") - protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts) { + protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts, + boolean isSasl) throws IOException { // To support builds against 1.5, we can't use the new 1.6 setZooKeeperInstance which // takes a ClientConfiguration class that only exists in 1.6 try { - AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts); + if (isSasl) { + // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped + // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only SASL support + helper.setZooKeeperInstance(conf, AccumuloInputFormat.class, zkHosts, instanceName, isSasl); + } else { + AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts); + } } catch (IllegalStateException ise) { // AccumuloInputFormat complains if you re-set an already set value. We just don't care. log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java index ce6da89..0189c07 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java @@ -18,17 +18,23 @@ import java.io.IOException; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; +import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Progressable; import com.google.common.base.Preconditions; @@ -38,6 +44,8 @@ */ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { + protected final HiveAccumuloHelper helper = new HiveAccumuloHelper(); + @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { configureAccumuloOutputFormat(job); @@ -63,16 +71,48 @@ protected void configureAccumuloOutputFormat(JobConf job) throws IOException { // Set the necessary Accumulo information try { - // Username/passwd for Accumulo - setAccumuloConnectorInfo(job, cnxnParams.getAccumuloUserName(), - new PasswordToken(cnxnParams.getAccumuloPassword())); - if (cnxnParams.useMockInstance()) { setAccumuloMockInstance(job, cnxnParams.getAccumuloInstanceName()); } else { // Accumulo instance name with ZK quorum setAccumuloZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(), - cnxnParams.getZooKeepers()); + cnxnParams.getZooKeepers(), cnxnParams.useSasl()); + } + + // Extract the delegation Token from the UGI and add it to the job + // The AccumuloOutputFormat will look for it there. + if (cnxnParams.useSasl()) { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + if (!ugi.hasKerberosCredentials()) { + helper.addTokenFromUserToJobConf(ugi, job); + } else { + // Still in the local JVM, can use Kerberos credentials + try { + Connector connector = cnxnParams.getConnector(); + AuthenticationToken token = helper.getDelegationToken(connector); + + // Send the DelegationToken down to the Configuration for Accumulo to use + setConnectorInfo(job, cnxnParams.getAccumuloUserName(), token); + + // Convert the Accumulo token in a Hadoop token + Token accumuloToken = helper.getHadoopToken(token); + + log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); + + // Add the Hadoop token to the JobConf + helper.mergeTokenIntoJobConf(job, accumuloToken); + + // Make sure the UGI contains the token too for good measure + if (!ugi.addToken(accumuloToken)) { + throw new IOException("Failed to add Accumulo Token to UGI"); + } + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException("Failed to acquire Accumulo DelegationToken", e); + } + } + } else { + setAccumuloConnectorInfo(job, cnxnParams.getAccumuloUserName(), + new PasswordToken(cnxnParams.getAccumuloPassword())); } // Set the table where we're writing this data @@ -96,9 +136,18 @@ protected void setAccumuloConnectorInfo(JobConf conf, String username, Authentic } @SuppressWarnings("deprecation") - protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers) { + protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers, + boolean isSasl) throws IOException { try { - AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + if (isSasl) { + // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped + // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only + // SASL support + helper.setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName, + isSasl); + } else { + AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + } } catch (IllegalStateException ise) { // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java index 8b4c9ff..23be5f1 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java @@ -16,6 +16,8 @@ */ package org.apache.hadoop.hive.accumulo; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; @@ -97,4 +99,21 @@ public void testMissingPassword() throws AccumuloException, AccumuloSecurityExce // with null password cnxnParams.getConnector(instance); } + + public void testSasl() { + Configuration conf = new Configuration(false); + + // Default is false + AccumuloConnectionParameters cnxnParams = new AccumuloConnectionParameters(conf); + assertFalse(cnxnParams.useSasl()); + + conf.set(AccumuloConnectionParameters.SASL_ENABLED, "true"); + + cnxnParams = new AccumuloConnectionParameters(conf); + + assertTrue(cnxnParams.useSasl()); + + conf.set(AccumuloConnectionParameters.SASL_ENABLED, "false"); + assertFalse(cnxnParams.useSasl()); + } } diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java new file mode 100644 index 0000000..88544f0 --- /dev/null +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestHiveAccumuloHelper { + + private HiveAccumuloHelper helper; + + @Before + public void setup() { + helper = new HiveAccumuloHelper(); + } + + @Test + public void testTokenMerge() throws Exception { + final Text service = new Text("service"); + Token token = Mockito.mock(Token.class); + JobConf jobConf = new JobConf(); + + Mockito.when(token.getService()).thenReturn(service); + + helper.mergeTokenIntoJobConf(jobConf, token); + + Collection> tokens = jobConf.getCredentials().getAllTokens(); + assertEquals(1, tokens.size()); + assertEquals(service, tokens.iterator().next().getService()); + } + + @Test + public void testTokenToConfFromUser() throws Exception { + UserGroupInformation ugi = Mockito.mock(UserGroupInformation.class); + JobConf jobConf = new JobConf(); + ArrayList> tokens = new ArrayList<>(); + Text service = new Text("service"); + Token token = Mockito.mock(Token.class); + tokens.add(token); + + Mockito.when(ugi.getTokens()).thenReturn(tokens); + Mockito.when(token.getKind()).thenReturn(HiveAccumuloHelper.ACCUMULO_SERVICE); + Mockito.when(token.getService()).thenReturn(service); + + helper.addTokenFromUserToJobConf(ugi, jobConf); + + Collection> credTokens = jobConf.getCredentials().getAllTokens(); + assertEquals(1, credTokens.size()); + assertEquals(service, credTokens.iterator().next().getService()); + } +} diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java index e8beeb6..ee5aecf 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java @@ -526,7 +526,7 @@ public void testConfigureAccumuloInputFormat() throws Exception { ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, @@ -568,7 +568,7 @@ public void testConfigureAccumuloInputFormatWithAuthorizations() throws Exceptio ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, new Authorizations("foo,bar")); @@ -622,7 +622,7 @@ public void testConfigureAccumuloInputFormatWithIterators() throws Exception { ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, @@ -678,7 +678,7 @@ public void testConfigureAccumuloInputFormatWithEmptyColumns() throws Exception ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java index 093245d..5d3f15b 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java @@ -98,7 +98,7 @@ public void testBasicConfiguration() throws IOException, AccumuloSecurityExcepti outputFormat.configureAccumuloOutputFormat(conf); Mockito.verify(outputFormat).setAccumuloConnectorInfo(conf, user, new PasswordToken(password)); - Mockito.verify(outputFormat).setAccumuloZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(outputFormat).setAccumuloZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable); } -- 2.1.2