From 127202bd02f2898c25b45070fab11f64dbd7e90f Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Tue, 27 Jun 2017 20:03:50 -0400 Subject: [PATCH] HIVE-16973 Fix AccumuloStorageHandler with Kerberos and remove handling for legacy Accumulo versions Tests functionality locally and via HS2 with MapReduce and Tez engines --- .../accumulo/AccumuloConnectionParameters.java | 57 +-- .../hive/accumulo/AccumuloStorageHandler.java | 77 ++-- .../hadoop/hive/accumulo/HiveAccumuloHelper.java | 396 ++++++++++++--------- .../org/apache/hadoop/hive/accumulo/Utils.java | 3 +- .../accumulo/mr/HiveAccumuloTableInputFormat.java | 244 +++---------- .../accumulo/mr/HiveAccumuloTableOutputFormat.java | 84 +---- .../serde/CompositeAccumuloRowIdFactory.java | 3 +- .../serde/DefaultAccumuloRowIdFactory.java | 3 +- .../hive/accumulo/TestAccumuloStorageHandler.java | 6 +- .../hive/accumulo/TestHiveAccumuloHelper.java | 184 ++++++++-- .../mr/TestHiveAccumuloTableInputFormat.java | 37 +- .../mr/TestHiveAccumuloTableOutputFormat.java | 29 +- itests/qtest-accumulo/pom.xml | 3 +- pom.xml | 2 +- 14 files changed, 523 insertions(+), 605 deletions(-) diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java index f34e82094d..58fd89f697 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java @@ -17,19 +17,19 @@ package org.apache.hadoop.hive.accumulo; import java.io.File; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; +import java.io.IOException; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.JavaUtils; import com.google.common.base.Preconditions; @@ -37,8 +37,6 @@ * */ public class AccumuloConnectionParameters { - private static final String KERBEROS_TOKEN_CLASS = "org.apache.accumulo.core.client.security.tokens.KerberosToken"; - public static final String USER_NAME = "accumulo.user.name"; public static final String USER_PASS = "accumulo.user.pass"; public static final String ZOOKEEPERS = "accumulo.zookeepers"; @@ -124,8 +122,9 @@ public Instance getInstance() { if (null == zookeepers) { throw new IllegalArgumentException("ZooKeeper quorum string must be provided in hiveconf using " + ZOOKEEPERS); } + ClientConfiguration clientConf = ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers).withSasl(useSasl()); - return new ZooKeeperInstance(instanceName, zookeepers); + return new ZooKeeperInstance(clientConf); } public Connector getConnector() throws AccumuloException, AccumuloSecurityException { @@ -142,7 +141,7 @@ public Connector getConnector(Instance inst) throws AccumuloException, AccumuloS } if (useSasl()) { - return inst.getConnector(username, getKerberosToken()); + return inst.getConnector(username, getKerberosToken(username)); } else { // Not using SASL/Kerberos -- use the password String password = getAccumuloPassword(); @@ -175,17 +174,10 @@ public AuthenticationToken getKerberosToken() { * Instantiate a KerberosToken in a backwards compatible manner. * @param username Kerberos principal */ - AuthenticationToken getKerberosToken(String username) { - // Get the Class - Class krbTokenClz = getKerberosTokenClass(); - + KerberosToken getKerberosToken(String username) { try { - // Invoke the `new KerberosToken(String)` constructor - // Expects that the user is already logged-in - Constructor constructor = krbTokenClz.getConstructor(String.class); - return constructor.newInstance(username); - } catch (NoSuchMethodException | SecurityException | InstantiationException | - IllegalArgumentException | InvocationTargetException | IllegalAccessException e) { + return new KerberosToken(username); + } catch (IOException e) { throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e); } } @@ -195,36 +187,11 @@ AuthenticationToken getKerberosToken(String username) { * @param username Kerberos principal * @param keytab Keytab on local filesystem */ - AuthenticationToken getKerberosToken(String username, String keytab) { - Class krbTokenClz = getKerberosTokenClass(); - - File keytabFile = new File(keytab); - if (!keytabFile.isFile() || !keytabFile.canRead()) { - throw new IllegalArgumentException("Keytab must be a readable file: " + keytab); - } - + KerberosToken getKerberosToken(String username, String keytab) { try { - // Invoke the `new KerberosToken(String, File, boolean)` constructor - // Tries to log in as the provided user with the given keytab, overriding an already logged-in user if present - Constructor constructor = krbTokenClz.getConstructor(String.class, File.class, boolean.class); - return constructor.newInstance(username, keytabFile, true); - } catch (NoSuchMethodException | SecurityException | InstantiationException | - IllegalArgumentException | InvocationTargetException | IllegalAccessException e) { + return new KerberosToken(username, new File(keytab), true); + } catch (IOException e) { throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e); } } - - /** - * Attempt to instantiate the KerberosToken class - */ - Class getKerberosTokenClass() { - try { - // Instantiate the class - Class clz = JavaUtils.loadClass(KERBEROS_TOKEN_CLASS); - // Cast it to an AuthenticationToken since Connector will need that - return clz.asSubclass(AuthenticationToken.class); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Could not load KerberosToken class. >=Accumulo 1.7.0 required", e); - } - } } diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java index 62524e87a0..5391a99a16 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java @@ -24,14 +24,6 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; -import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; -import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; -import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.fate.Fate; -import org.apache.accumulo.start.Main; -import org.apache.accumulo.trace.instrument.Tracer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableInputFormat; import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableOutputFormat; @@ -60,7 +52,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,7 +168,6 @@ public void setConf(Configuration conf) { connectionParams = new AccumuloConnectionParameters(conf); } - @SuppressWarnings("deprecation") @Override public Class getSerDeClass() { return AccumuloSerDe.class; @@ -228,6 +218,37 @@ public void configureInputJobProperties(TableDesc tableDesc, Map } LOG.info("Computed input job properties of " + jobProperties); + + Configuration conf = getConf(); + helper.loadDependentJars(conf); + + // When Kerberos is enabled, we have to add the Accumulo delegation token to the + // Job so that it gets passed down to the YARN/Tez task. + if (connectionParams.useSasl()) { + try { + // Open an accumulo connection + Connector conn = connectionParams.getConnector(); + + // Convert the Accumulo token in a Hadoop token + Token accumuloToken = helper.setConnectorInfoForInputAndOutput(connectionParams, conn, conf); + + // Probably don't have a JobConf here, but we can still try... + if (conf instanceof JobConf) { + // Convert the Accumulo token in a Hadoop token + LOG.debug("Adding Hadoop Token for Accumulo to Job's Credentials: " + accumuloToken); + + // Add the Hadoop token to the JobConf + JobConf jobConf = (JobConf) conf; + jobConf.getCredentials().addToken(accumuloToken.getService(), accumuloToken); + LOG.info("All job tokens: " + jobConf.getCredentials().getAllTokens()); + } else { + LOG.info("Don't have a JobConf, so we cannot persist Tokens. Have to do it later."); + } + } catch (Exception e) { + throw new RuntimeException("Failed to obtain DelegationToken for " + + connectionParams.getAccumuloUserName(), e); + } + } } @Override @@ -413,7 +434,6 @@ public void rollbackDropTable(Table table) throws MetaException { // do nothing } - @SuppressWarnings("deprecation") @Override public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deserializer, ExprNodeDesc desc) { @@ -431,15 +451,9 @@ public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deseria } } - @SuppressWarnings("deprecation") @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { - try { - Utils.addDependencyJars(jobConf, Tracer.class, Fate.class, Connector.class, Main.class, - ZooKeeper.class, AccumuloStorageHandler.class); - } catch (IOException e) { - LOG.error("Could not add necessary Accumulo dependencies to classpath", e); - } + helper.loadDependentJars(jobConf); Properties tblProperties = tableDesc.getProperties(); AccumuloSerDeParameters serDeParams = null; @@ -462,36 +476,17 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { // Job so that it gets passed down to the YARN/Tez task. if (connectionParams.useSasl()) { try { - // Obtain a delegation token from Accumulo + // Open an accumulo connection Connector conn = connectionParams.getConnector(); - AuthenticationToken token = helper.getDelegationToken(conn); - - // Make sure the Accumulo token is set in the Configuration (only a stub of the Accumulo - // AuthentiationToken is serialized, not the entire token). configureJobConf may be - // called multiple times with the same JobConf which results in an error from Accumulo - // MapReduce API. Catch the error, log a debug message and just keep going - try { - InputConfigurator.setConnectorInfo(AccumuloInputFormat.class, jobConf, - connectionParams.getAccumuloUserName(), token); - } catch (IllegalStateException e) { - // The implementation balks when this method is invoked multiple times - LOG.debug("Ignoring IllegalArgumentException about re-setting connector information"); - } - try { - OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, jobConf, - connectionParams.getAccumuloUserName(), token); - } catch (IllegalStateException e) { - // The implementation balks when this method is invoked multiple times - LOG.debug("Ignoring IllegalArgumentException about re-setting connector information"); - } // Convert the Accumulo token in a Hadoop token - Token accumuloToken = helper.getHadoopToken(token); + Token accumuloToken = helper.setConnectorInfoForInputAndOutput(connectionParams, conn, jobConf); - LOG.info("Adding Hadoop Token for Accumulo to Job's Credentials"); + LOG.debug("Adding Hadoop Token for Accumulo to Job's Credentials"); // Add the Hadoop token to the JobConf helper.mergeTokenIntoJobConf(jobConf, accumuloToken); + LOG.debug("All job tokens: " + jobConf.getCredentials().getAllTokens()); } catch (Exception e) { throw new RuntimeException("Failed to obtain DelegationToken for " + connectionParams.getAccumuloUserName(), e); diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java index 71b8b77352..9fccb499ef 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java @@ -19,20 +19,35 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.List; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.admin.DelegationTokenConfig; +import org.apache.accumulo.core.client.impl.DelegationTokenImpl; +import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; +import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; +import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.accumulo.fate.Fate; +import org.apache.accumulo.start.Main; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,35 +56,9 @@ */ public class HiveAccumuloHelper { private static final Logger log = LoggerFactory.getLogger(HiveAccumuloHelper.class); - // Constant from Accumulo's DelegationTokenImpl + // Constant from Accumulo's AuthenticationTokenIdentifier public static final Text ACCUMULO_SERVICE = new Text("ACCUMULO_AUTH_TOKEN"); - // Constants for DelegationToken reflection to continue to support 1.6 - private static final String DELEGATION_TOKEN_CONFIG_CLASS_NAME = - "org.apache.accumulo.core.client.admin.DelegationTokenConfig"; - private static final String DELEGATION_TOKEN_IMPL_CLASS_NAME = - "org.apache.accumulo.core.client.impl.DelegationTokenImpl"; - private static final String GET_DELEGATION_TOKEN_METHOD_NAME = "getDelegationToken"; - private static final String GET_IDENTIFIER_METHOD_NAME = "getIdentifier"; - private static final String GET_PASSWORD_METHOD_NAME = "getPassword"; - private static final String GET_SERVICE_NAME_METHOD_NAME = "getServiceName"; - - // Constants for ClientConfiguration and setZooKeeperInstance reflection - // to continue to support 1.5 - private static final String CLIENT_CONFIGURATION_CLASS_NAME = - "org.apache.accumulo.core.client.ClientConfiguration"; - private static final String LOAD_DEFAULT_METHOD_NAME = "loadDefault"; - private static final String SET_PROPERTY_METHOD_NAME = "setProperty"; - private static final String INSTANCE_ZOOKEEPER_HOST = "instance.zookeeper.host"; - private static final String INSTANCE_NAME = "instance.name"; - private static final String INSTANCE_RPC_SASL_ENABLED = "instance.rpc.sasl.enabled"; - private static final String SET_ZOOKEEPER_INSTANCE_METHOD_NAME = "setZooKeeperInstance"; - - // Constants for unwrapping the DelegationTokenStub into a DelegationTokenImpl - private static final String CONFIGURATOR_BASE_CLASS_NAME = - "org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase"; - private static final String UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME = "unwrapAuthenticationToken"; - /** * Extract the appropriate Token for Accumulo from the provided {@code user} and add it to the * {@link JobConf}'s credentials. @@ -89,18 +78,11 @@ public void addTokenFromUserToJobConf(UserGroupInformation user, JobConf jobConf // Accumulo token already in Configuration, but the Token isn't in the Job credentials like the // AccumuloInputFormat expects - Token accumuloToken = null; - Collection> tokens = user.getTokens(); - for (Token token : tokens) { - if (ACCUMULO_SERVICE.equals(token.getKind())) { - accumuloToken = token; - break; - } - } + Token accumuloToken = getAccumuloToken(user); // If we didn't find the Token, we can't proceed. Log the tokens for debugging. if (null == accumuloToken) { - log.error("Could not find accumulo token in user: " + tokens); + log.error("Could not find accumulo token in user: " + user.getTokens()); throw new IOException("Could not find Accumulo Token in user's tokens"); } @@ -109,6 +91,17 @@ public void addTokenFromUserToJobConf(UserGroupInformation user, JobConf jobConf mergeTokenIntoJobConf(jobConf, accumuloToken); } + public Token getAccumuloToken(UserGroupInformation user) { + checkNotNull(user, "Provided UGI was null"); + Collection> tokens = user.getTokens(); + for (Token token : tokens) { + if (ACCUMULO_SERVICE.equals(token.getKind())) { + return token; + } + } + return null; + } + /** * Merge the provided Token into the JobConf. * @@ -128,7 +121,7 @@ public void mergeTokenIntoJobConf(JobConf jobConf, Token accumuloToken) throw } /** - * Obtain a DelegationToken from Accumulo in a backwards compatible manner. + * Obtain a DelegationToken from Accumulo. * * @param conn * The Accumulo connector @@ -138,49 +131,32 @@ public void mergeTokenIntoJobConf(JobConf jobConf, Token accumuloToken) throw */ public AuthenticationToken getDelegationToken(Connector conn) throws IOException { try { - Class clz = JavaUtils.loadClass(DELEGATION_TOKEN_CONFIG_CLASS_NAME); - // DelegationTokenConfig delegationTokenConfig = new DelegationTokenConfig(); - Object delegationTokenConfig = clz.newInstance(); - - SecurityOperations secOps = conn.securityOperations(); - - Method getDelegationTokenMethod = secOps.getClass().getMethod( - GET_DELEGATION_TOKEN_METHOD_NAME, clz); - - // secOps.getDelegationToken(delegationTokenConfig) - return (AuthenticationToken) getDelegationTokenMethod.invoke(secOps, delegationTokenConfig); - } catch (Exception e) { - throw new IOException("Failed to obtain DelegationToken from Accumulo", e); + DelegationTokenConfig config = new DelegationTokenConfig(); + return conn.securityOperations().getDelegationToken(config); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException("Failed to obtain DelegationToken", e); } } - public Token getHadoopToken(AuthenticationToken delegationToken) + public Token getHadoopToken(AuthenticationToken token) throws IOException { + if (!(token instanceof DelegationTokenImpl)) { + throw new IOException("Expected a DelegationTokenImpl but found " + + (token != null ? token.getClass() : "null")); + } + DelegationTokenImpl dt = (DelegationTokenImpl) token; try { - // DelegationTokenImpl class - Class delegationTokenClass = JavaUtils.loadClass(DELEGATION_TOKEN_IMPL_CLASS_NAME); - // Methods on DelegationToken - Method getIdentifierMethod = delegationTokenClass.getMethod(GET_IDENTIFIER_METHOD_NAME); - Method getPasswordMethod = delegationTokenClass.getMethod(GET_PASSWORD_METHOD_NAME); - Method getServiceNameMethod = delegationTokenClass.getMethod(GET_SERVICE_NAME_METHOD_NAME); - - // Treat the TokenIdentifier implementation as the abstract class to avoid dependency issues - // AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); - TokenIdentifier identifier = (TokenIdentifier) getIdentifierMethod.invoke(delegationToken); - - // new Token(identifier.getBytes(), - // delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); - return new Token(identifier.getBytes(), (byte[]) - getPasswordMethod.invoke(delegationToken), identifier.getKind(), - (Text) getServiceNameMethod.invoke(delegationToken)); + AuthenticationTokenIdentifier identifier = dt.getIdentifier(); + + return new Token(identifier.getBytes(), + dt.getPassword(), identifier.getKind(), dt.getServiceName()); } catch (Exception e) { throw new IOException("Failed to create Hadoop token from Accumulo DelegationToken", e); } } /** - * Construct a ClientConfiguration instance in a backwards-compatible way. Allows us - * to support Accumulo 1.5 + * Construct a ClientConfiguration instance. * * @param zookeepers * ZooKeeper hosts @@ -189,127 +165,199 @@ public AuthenticationToken getDelegationToken(Connector conn) throws IOException * @param useSasl * Is SASL enabled * @return A ClientConfiguration instance - * @throws IOException - * If the instance fails to be created */ - public Object getClientConfiguration(String zookeepers, String instanceName, boolean useSasl) - throws IOException { - try { - // Construct a new instance of ClientConfiguration - Class clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME); - Method loadDefaultMethod = clientConfigClass.getMethod(LOAD_DEFAULT_METHOD_NAME); - Object clientConfig = loadDefaultMethod.invoke(null); - - // Set instance and zookeeper hosts - Method setPropertyMethod = clientConfigClass.getMethod(SET_PROPERTY_METHOD_NAME, - String.class, Object.class); - setPropertyMethod.invoke(clientConfig, INSTANCE_ZOOKEEPER_HOST, zookeepers); - setPropertyMethod.invoke(clientConfig, INSTANCE_NAME, instanceName); - - if (useSasl) { - // Defaults to not using SASL, set true if SASL is being used - setPropertyMethod.invoke(clientConfig, INSTANCE_RPC_SASL_ENABLED, true); + public ClientConfiguration getClientConfiguration(String zookeepers, String instanceName, boolean useSasl) { + return ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers).withSasl(useSasl); + } + + public void updateInputFormatConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser, + AccumuloConnectionParameters cnxnParams) throws IOException { + updateConfWithAccumuloToken(jobConf, currentUser, cnxnParams, true); + } + + public void updateOutputFormatConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser, + AccumuloConnectionParameters cnxnParams) throws IOException { + updateConfWithAccumuloToken(jobConf, currentUser, cnxnParams, false); + } + + void updateConfWithAccumuloToken(JobConf jobConf, UserGroupInformation currentUser, + AccumuloConnectionParameters cnxnParams, boolean isInputFormat) throws IOException { + if (getAccumuloToken(currentUser) != null) { + addTokenFromUserToJobConf(currentUser, jobConf); + } else { + try { + Connector connector = cnxnParams.getConnector(); + // If we have Kerberos credentials, we should obtain the delegation token + AuthenticationToken token = getDelegationToken(connector); + + // Send the DelegationToken down to the Configuration for Accumulo to use + if (isInputFormat) { + setInputFormatConnectorInfo(jobConf, cnxnParams.getAccumuloUserName(), token); + } else { + setOutputFormatConnectorInfo(jobConf, cnxnParams.getAccumuloUserName(), token); + } + + // Convert the Accumulo token in a Hadoop token + Token accumuloToken = getHadoopToken(token); + + // Add the Hadoop token to the JobConf + mergeTokenIntoJobConf(jobConf, accumuloToken); + + // Make sure the UGI contains the token too for good measure + if (!currentUser.addToken(accumuloToken)) { + throw new IOException("Failed to add Accumulo Token to UGI"); + } + + try { + addTokenFromUserToJobConf(currentUser, jobConf); + } catch (IOException e) { + throw new IOException("Current user did not contain necessary delegation Tokens " + currentUser, e); + } + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException("Failed to acquire Accumulo DelegationToken", e); } + } + } - return clientConfig; - } catch (Exception e) { - String msg = "Failed to instantiate and invoke methods on ClientConfiguration"; - log.error(msg, e); - throw new IOException(msg, e); + public boolean hasKerberosCredentials(UserGroupInformation ugi) { + // Allows mocking in testing. + return ugi.getAuthenticationMethod() == AuthenticationMethod.KERBEROS; + } + + /** + * Calls {@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}, + * suppressing exceptions due to setting the configuration multiple times. + */ + public void setInputFormatConnectorInfo(JobConf conf, String username, AuthenticationToken token) + throws AccumuloSecurityException { + try { + AccumuloInputFormat.setConnectorInfo(conf, username, token); + } catch (IllegalStateException e) { + // AccumuloInputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e); } } /** - * Wrapper around setZooKeeperInstance(Configuration, ClientConfiguration) which only - * exists in 1.6.0 and newer. Support backwards compat. - * - * @param jobConf - * The JobConf - * @param inputOrOutputFormatClass - * The InputFormat or OutputFormat class - * @param zookeepers - * ZooKeeper hosts - * @param instanceName - * Accumulo instance name - * @param useSasl - * Is SASL enabled - * @throws IOException - * When invocation of the method fails + * Calls {@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} + * suppressing exceptions due to setting the configuration multiple times. */ - public void setZooKeeperInstance(JobConf jobConf, Class inputOrOutputFormatClass, String - zookeepers, String instanceName, boolean useSasl) throws IOException { + public void setOutputFormatConnectorInfo(JobConf conf, String username, AuthenticationToken token) + throws AccumuloSecurityException { try { - setZooKeeperInstanceWithReflection(jobConf, inputOrOutputFormatClass, zookeepers, - instanceName, useSasl); - } catch (InvocationTargetException e) { - Throwable cause = e.getCause(); - if (null != cause && cause instanceof IllegalStateException) { - throw (IllegalStateException) cause; - } - throw new IOException("Failed to invoke setZooKeeperInstance method", e); + AccumuloOutputFormat.setConnectorInfo(conf, username, token); } catch (IllegalStateException e) { - // re-throw the ISE so the caller can work around the silly impl that throws this in the - // first place. - throw e; - } catch (Exception e) { - throw new IOException("Failed to invoke setZooKeeperInstance method", e); + // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e); } } /** - * Wrap the setZooKeeperInstance reflected-call into its own method for testing - * - * @param jobConf - * The JobConf - * @param inputOrOutputFormatClass - * The InputFormat or OutputFormat class - * @param zookeepers - * ZooKeeper hosts - * @param instanceName - * Accumulo instance name - * @param useSasl - * Is SASL enabled - * @throws IOException - * When invocation of the method fails + * Calls {@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)}, + * suppressing exceptions due to setting the configuration multiple times. */ - void setZooKeeperInstanceWithReflection(JobConf jobConf, Class inputOrOutputFormatClass, String - zookeepers, String instanceName, boolean useSasl) throws IOException, ClassNotFoundException, - NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, - InvocationTargetException { - Class clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME); - - // get the ClientConfiguration - Object clientConfig = getClientConfiguration(zookeepers, instanceName, useSasl); - - // AccumuloOutputFormat.setZooKeeperInstance(JobConf, ClientConfiguration) or - // AccumuloInputFormat.setZooKeeperInstance(JobConf, ClientConfiguration) - Method setZooKeeperMethod = inputOrOutputFormatClass.getMethod( - SET_ZOOKEEPER_INSTANCE_METHOD_NAME, JobConf.class, clientConfigClass); - setZooKeeperMethod.invoke(null, jobConf, clientConfig); + public void setInputFormatZooKeeperInstance(JobConf conf, String instanceName, String zookeepers, + boolean isSasl) throws IOException { + try { + ClientConfiguration clientConf = getClientConfiguration(zookeepers, instanceName, isSasl); + AccumuloInputFormat.setZooKeeperInstance(conf, clientConf); + } catch (IllegalStateException ise) { + // AccumuloInputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " + + zookeepers, ise); + } } - + /** - * Wrapper around ConfiguratorBase.unwrapAuthenticationToken which only exists in - * 1.7.0 and new. Uses reflection to not break compat. - * - * @param jobConf - * JobConf object - * @param token - * The DelegationTokenStub instance - * @return A DelegationTokenImpl created from the Token in the Job's credentials - * @throws IOException - * If the token fails to be unwrapped + * Calls {@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)}, + * suppressing exceptions due to setting the configuration multiple times. */ - public AuthenticationToken unwrapAuthenticationToken(JobConf jobConf, AuthenticationToken token) - throws IOException { + public void setOutputFormatZooKeeperInstance(JobConf conf, String instanceName, String zookeepers, + boolean isSasl) throws IOException { try { - Class configuratorBaseClass = JavaUtils.loadClass(CONFIGURATOR_BASE_CLASS_NAME); - Method unwrapAuthenticationTokenMethod = configuratorBaseClass.getMethod( - UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME, JobConf.class, AuthenticationToken.class); - // ConfiguratorBase.unwrapAuthenticationToken(conf, token); - return (AuthenticationToken) unwrapAuthenticationTokenMethod.invoke(null, jobConf, token); + ClientConfiguration clientConf = getClientConfiguration(zookeepers, instanceName, isSasl); + AccumuloOutputFormat.setZooKeeperInstance(conf, clientConf); + } catch (IllegalStateException ise) { + // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " + + zookeepers, ise); + } + } + + /** + * Calls {@link AccumuloInputFormat#setMockInstance(JobConf, String)}, suppressing exceptions due + * to setting the configuration multiple times. + */ + public void setInputFormatMockInstance(JobConf conf, String instanceName) { + try { + AccumuloInputFormat.setMockInstance(conf, instanceName); + } catch (IllegalStateException e) { + // AccumuloInputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting mock instance of " + instanceName, e); + } + } + + /** + * Calls {@link AccumuloOutputFormat#setMockInstance(JobConf, String)}, suppressing exceptions + * due to setting the configuration multiple times. + */ + public void setOutputFormatMockInstance(JobConf conf, String instanceName) { + try { + AccumuloOutputFormat.setMockInstance(conf, instanceName); + } catch (IllegalStateException e) { + // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting mock instance of " + instanceName, e); + } + } + + /** + * Sets all jars requried by Accumulo input/output tasks in the configuration to be dynamically + * loaded when the task is executed. + */ + public void loadDependentJars(Configuration conf) { + @SuppressWarnings("deprecation") + List> classesToLoad = new ArrayList<>(Arrays.asList(Tracer.class, Fate.class, Connector.class, Main.class, ZooKeeper.class, AccumuloStorageHandler.class)); + try { + classesToLoad.add(Class.forName("org.apache.htrace.Trace")); } catch (Exception e) { - throw new IOException("Failed to unwrap AuthenticationToken", e); + log.warn("Failed to load class for HTrace jar, trying to continue", e); } + try { + Utils.addDependencyJars(conf, classesToLoad); + } catch (IOException e) { + log.error("Could not add necessary Accumulo dependencies to classpath", e); + } + } + + /** + * Obtains an Accumulo DelegationToken and sets it in the configuration for input and output jobs. + * The Accumulo token is converted into a Hadoop-style token and returned to the caller. + * + * @return A Hadoop-style token which contains the Accumulo DelegationToken + */ + public Token setConnectorInfoForInputAndOutput(AccumuloConnectionParameters params, Connector conn, Configuration conf) throws Exception { + // Obtain a delegation token from Accumulo + AuthenticationToken token = getDelegationToken(conn); + + // Make sure the Accumulo token is set in the Configuration (only a stub of the Accumulo + // AuthentiationToken is serialized, not the entire token). configureJobConf may be + // called multiple times with the same JobConf which results in an error from Accumulo + // MapReduce API. Catch the error, log a debug message and just keep going + try { + InputConfigurator.setConnectorInfo(AccumuloInputFormat.class, conf, + params.getAccumuloUserName(), token); + } catch (IllegalStateException e) { + // The implementation balks when this method is invoked multiple times + log.debug("Ignoring IllegalArgumentException about re-setting connector information"); + } + try { + OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, conf, + params.getAccumuloUserName(), token); + } catch (IllegalStateException e) { + // The implementation balks when this method is invoked multiple times + log.debug("Ignoring IllegalArgumentException about re-setting connector information"); + } + + return getHadoopToken(token); } } diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java index 407ecbdbb2..af9a6f08e9 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/Utils.java @@ -30,6 +30,7 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.jar.JarFile; @@ -57,7 +58,7 @@ private static final Logger log = LoggerFactory.getLogger(Utils.class); // Thanks, HBase - public static void addDependencyJars(Configuration conf, Class... classes) throws IOException { + public static void addDependencyJars(Configuration conf, List> classes) throws IOException { FileSystem localFs = FileSystem.getLocal(conf); Set jars = new HashSet(); // Add jars that are already in the tmpjars variable diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java index 083678f802..af64eac1fc 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.accumulo.mr; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -39,6 +37,7 @@ import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -73,9 +72,8 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Level; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,22 +107,41 @@ Path[] tablePaths = FileInputFormat.getInputPaths(context); try { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - final Connector connector; + Connector connector = null; // Need to get a Connector so we look up the user's authorizations if not otherwise specified - if (accumuloParams.useSasl() && !ugi.hasKerberosCredentials()) { + if (accumuloParams.useSasl()) { + log.info("Current user: " + UserGroupInformation.getCurrentUser()); // In a YARN/Tez job, don't have the Kerberos credentials anymore, use the delegation token AuthenticationToken token = ConfiguratorBase.getAuthenticationToken( AccumuloInputFormat.class, jobConf); - // Convert the stub from the configuration back into a normal Token - // More reflection to support 1.6 - token = helper.unwrapAuthenticationToken(jobConf, token); - connector = instance.getConnector(accumuloParams.getAccumuloUserName(), token); + if (null != token && !jobConf.getCredentials().getAllTokens().isEmpty()) { + // Convert the stub from the configuration back into a normal Token + log.info("Found authentication token in Configuration: " + token); + log.info("Job credential tokens: " + jobConf.getCredentials().getAllTokens()); + AuthenticationToken unwrappedToken = ConfiguratorBase.unwrapAuthenticationToken(jobConf, token); + log.info("Converted authentication token from Configuration into: " + unwrappedToken); + // It's possible that the Job doesn't have the token in its credentials. In this case, unwrapAuthenticatinoToken + // will return back the original token (which we know is insufficient) + if (unwrappedToken != token) { + log.info("Creating Accumulo Connector with unwrapped delegation token"); + connector = instance.getConnector(accumuloParams.getAccumuloUserName(), unwrappedToken); + } else { + log.info("Job credentials did not contain delegation token, fetching new token"); + } + } + + if (connector == null) { + log.info("Obtaining Accumulo Connector using KerberosToken"); + // Construct a KerberosToken -- relies on ProxyUser configuration. Will be the client making + // the request on top of the HS2's user. Accumulo will require proper proxy-user auth configs. + connector = instance.getConnector(accumuloParams.getAccumuloUserName(), new KerberosToken(accumuloParams.getAccumuloUserName())); + } } else { // Still in the local JVM, use the username+password or Kerberos credentials connector = accumuloParams.getConnector(instance); } + final List columnMappings = columnMapper.getColumnMappings(); final List iterators = predicateHandler.getIterators(jobConf, columnMapper); final Collection ranges = predicateHandler.getRanges(jobConf, columnMapper); @@ -153,6 +170,7 @@ HiveAccumuloSplit[] hiveSplits = new HiveAccumuloSplit[splits.length]; for (int i = 0; i < splits.length; i++) { RangeInputSplit ris = (RangeInputSplit) splits[i]; + ris.setLogLevel(Level.DEBUG); hiveSplits[i] = new HiveAccumuloSplit(ris, tablePaths[0]); } @@ -172,12 +190,6 @@ /** * Setup accumulo input format from conf properties. Delegates to final RecordReader from mapred * package. - * - * @param inputSplit - * @param jobConf - * @param reporter - * @return RecordReader - * @throws IOException */ @Override public RecordReader getRecordReader(InputSplit inputSplit, @@ -190,6 +202,8 @@ } try { + final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters( + jobConf); final List iterators = predicateHandler.getIterators(jobConf, columnMapper); HiveAccumuloSplit hiveSplit = (HiveAccumuloSplit) inputSplit; @@ -213,11 +227,14 @@ // ACCUMULO-3015 Like the above, RangeInputSplit should have the table name // but we want it to, so just re-set it if it's null. - if (null == getTableName(rangeSplit)) { - final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters( - jobConf); - log.debug("Re-setting table name on InputSplit due to Accumulo bug."); - setTableName(rangeSplit, accumuloParams.getAccumuloTableName()); + if (null == rangeSplit.getTableName()) { + rangeSplit.setTableName(accumuloParams.getAccumuloTableName()); + } + + // ACCUMULO-4670 RangeInputSplit doesn't preserve useSasl on the ClientConfiguration/ZooKeeperInstance + // We have to manually re-set it in the JobConf to make sure it gets picked up. + if (accumuloParams.useSasl()) { + helper.setInputFormatZooKeeperInstance(jobConf, accumuloParams.getAccumuloInstanceName(), accumuloParams.getZooKeepers(), accumuloParams.useSasl()); } final RecordReader>> recordReader = accumuloInputFormat @@ -268,9 +285,6 @@ protected ColumnMapper getColumnMapper(Configuration conf) throws IOException, * Any iterators to be configured server-side * @param ranges * Accumulo ranges on for the query - * @throws AccumuloSecurityException - * @throws AccumuloException - * @throws SerDeException */ protected void configure(JobConf conf, Instance instance, Connector connector, AccumuloConnectionParameters accumuloParams, ColumnMapper columnMapper, @@ -279,44 +293,17 @@ protected void configure(JobConf conf, Instance instance, Connector connector, // Handle implementation of Instance and invoke appropriate InputFormat method if (instance instanceof MockInstance) { - setMockInstance(conf, instance.getInstanceName()); + getHelper().setInputFormatMockInstance(conf, instance.getInstanceName()); } else { - setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers(), + getHelper().setInputFormatZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers(), accumuloParams.useSasl()); } // Set the username/passwd for the Accumulo connection if (accumuloParams.useSasl()) { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - - // If we have Kerberos credentials, we should obtain the delegation token - if (ugi.hasKerberosCredentials()) { - Connector conn = accumuloParams.getConnector(); - AuthenticationToken token = helper.getDelegationToken(conn); - - // Send the DelegationToken down to the Configuration for Accumulo to use - setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), token); - - // Convert the Accumulo token in a Hadoop token - Token accumuloToken = helper.getHadoopToken(token); - - log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); - - // Add the Hadoop token to the JobConf - helper.mergeTokenIntoJobConf(conf, accumuloToken); - - if (!ugi.addToken(accumuloToken)) { - throw new IOException("Failed to add Accumulo Token to UGI"); - } - } - - try { - helper.addTokenFromUserToJobConf(ugi, conf); - } catch (IOException e) { - throw new IOException("Current user did not contain necessary delegation Tokens " + ugi, e); - } + getHelper().updateInputFormatConfWithAccumuloToken(conf, UserGroupInformation.getCurrentUser(), accumuloParams); } else { - setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), + getHelper().setInputFormatConnectorInfo(conf, accumuloParams.getAccumuloUserName(), new PasswordToken(accumuloParams.getAccumuloPassword())); } @@ -355,45 +342,6 @@ protected void configure(JobConf conf, Instance instance, Connector connector, // Wrap the static AccumuloInputFormat methods with methods that we can // verify were correctly called via Mockito - protected void setMockInstance(JobConf conf, String instanceName) { - try { - AccumuloInputFormat.setMockInstance(conf, instanceName); - } catch (IllegalStateException e) { - // AccumuloInputFormat complains if you re-set an already set value. We just don't care. - log.debug("Ignoring exception setting mock instance of " + instanceName, e); - } - } - - @SuppressWarnings("deprecation") - protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts, - boolean isSasl) throws IOException { - // To support builds against 1.5, we can't use the new 1.6 setZooKeeperInstance which - // takes a ClientConfiguration class that only exists in 1.6 - try { - if (isSasl) { - // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped - // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only SASL support - helper.setZooKeeperInstance(conf, AccumuloInputFormat.class, zkHosts, instanceName, isSasl); - } else { - AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts); - } - } catch (IllegalStateException ise) { - // AccumuloInputFormat complains if you re-set an already set value. We just don't care. - log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " - + zkHosts, ise); - } - } - - protected void setConnectorInfo(JobConf conf, String user, AuthenticationToken token) - throws AccumuloSecurityException { - try { - AccumuloInputFormat.setConnectorInfo(conf, user, token); - } catch (IllegalStateException e) { - // AccumuloInputFormat complains if you re-set an already set value. We just don't care. - log.debug("Ignoring exception setting Accumulo Connector instance for user " + user, e); - } - } - protected void setInputTableName(JobConf conf, String tableName) { AccumuloInputFormat.setInputTableName(conf, tableName); } @@ -454,109 +402,7 @@ protected void fetchColumns(JobConf conf, Set> cfCqPairs) { return pairs; } - /** - * Reflection to work around Accumulo 1.5 and 1.6 incompatibilities. Throws an {@link IOException} - * for any reflection related exceptions - * - * @param split - * A RangeInputSplit - * @return The name of the table from the split - * @throws IOException - */ - protected String getTableName(RangeInputSplit split) throws IOException { - // ACCUMULO-3017 shenanigans with method names changing without deprecation - Method getTableName = null; - try { - getTableName = RangeInputSplit.class.getMethod("getTableName"); - } catch (SecurityException e) { - log.debug("Could not get getTableName method from RangeInputSplit", e); - } catch (NoSuchMethodException e) { - log.debug("Could not get getTableName method from RangeInputSplit", e); - } - - if (null != getTableName) { - try { - return (String) getTableName.invoke(split); - } catch (IllegalArgumentException e) { - log.debug("Could not invoke getTableName method from RangeInputSplit", e); - } catch (IllegalAccessException e) { - log.debug("Could not invoke getTableName method from RangeInputSplit", e); - } catch (InvocationTargetException e) { - log.debug("Could not invoke getTableName method from RangeInputSplit", e); - } - } - - Method getTable; - try { - getTable = RangeInputSplit.class.getMethod("getTable"); - } catch (SecurityException e) { - throw new IOException("Could not get table name from RangeInputSplit", e); - } catch (NoSuchMethodException e) { - throw new IOException("Could not get table name from RangeInputSplit", e); - } - - try { - return (String) getTable.invoke(split); - } catch (IllegalArgumentException e) { - throw new IOException("Could not get table name from RangeInputSplit", e); - } catch (IllegalAccessException e) { - throw new IOException("Could not get table name from RangeInputSplit", e); - } catch (InvocationTargetException e) { - throw new IOException("Could not get table name from RangeInputSplit", e); - } - } - - /** - * Sets the table name on a RangeInputSplit, accounting for change in method name. Any reflection - * related exception is wrapped in an {@link IOException} - * - * @param split - * The RangeInputSplit to operate on - * @param tableName - * The name of the table to set - * @throws IOException - */ - protected void setTableName(RangeInputSplit split, String tableName) throws IOException { - // ACCUMULO-3017 shenanigans with method names changing without deprecation - Method setTableName = null; - try { - setTableName = RangeInputSplit.class.getMethod("setTableName", String.class); - } catch (SecurityException e) { - log.debug("Could not get getTableName method from RangeInputSplit", e); - } catch (NoSuchMethodException e) { - log.debug("Could not get getTableName method from RangeInputSplit", e); - } - - if (null != setTableName) { - try { - setTableName.invoke(split, tableName); - return; - } catch (IllegalArgumentException e) { - log.debug("Could not invoke getTableName method from RangeInputSplit", e); - } catch (IllegalAccessException e) { - log.debug("Could not invoke getTableName method from RangeInputSplit", e); - } catch (InvocationTargetException e) { - log.debug("Could not invoke getTableName method from RangeInputSplit", e); - } - } - - Method setTable; - try { - setTable = RangeInputSplit.class.getMethod("setTable", String.class); - } catch (SecurityException e) { - throw new IOException("Could not set table name from RangeInputSplit", e); - } catch (NoSuchMethodException e) { - throw new IOException("Could not set table name from RangeInputSplit", e); - } - - try { - setTable.invoke(split, tableName); - } catch (IllegalArgumentException e) { - throw new IOException("Could not set table name from RangeInputSplit", e); - } catch (IllegalAccessException e) { - throw new IOException("Could not set table name from RangeInputSplit", e); - } catch (InvocationTargetException e) { - throw new IOException("Could not set table name from RangeInputSplit", e); - } + HiveAccumuloHelper getHelper() { + return helper; } } diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java index bfa764a286..0414c350b7 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java @@ -20,11 +20,8 @@ import java.io.IOException; -import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.fs.FileSystem; @@ -38,8 +35,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Progressable; import com.google.common.base.Preconditions; @@ -78,46 +73,19 @@ protected void configureAccumuloOutputFormat(JobConf job) throws IOException { // Set the necessary Accumulo information try { if (cnxnParams.useMockInstance()) { - setMockInstanceWithErrorChecking(job, cnxnParams.getAccumuloInstanceName()); + getHelper().setOutputFormatMockInstance(job, cnxnParams.getAccumuloInstanceName()); } else { // Accumulo instance name with ZK quorum - setZooKeeperInstanceWithErrorChecking(job, cnxnParams.getAccumuloInstanceName(), + getHelper().setOutputFormatZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(), cnxnParams.getZooKeepers(), cnxnParams.useSasl()); } // Extract the delegation Token from the UGI and add it to the job // The AccumuloOutputFormat will look for it there. if (cnxnParams.useSasl()) { - UserGroupInformation ugi = getCurrentUser(); - if (!hasKerberosCredentials(ugi)) { - getHelper().addTokenFromUserToJobConf(ugi, job); - } else { - // Still in the local JVM, can use Kerberos credentials - try { - Connector connector = cnxnParams.getConnector(); - AuthenticationToken token = getHelper().getDelegationToken(connector); - - // Send the DelegationToken down to the Configuration for Accumulo to use - setConnectorInfoWithErrorChecking(job, cnxnParams.getAccumuloUserName(), token); - - // Convert the Accumulo token in a Hadoop token - Token accumuloToken = getHelper().getHadoopToken(token); - - log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); - - // Add the Hadoop token to the JobConf - getHelper().mergeTokenIntoJobConf(job, accumuloToken); - - // Make sure the UGI contains the token too for good measure - if (!ugi.addToken(accumuloToken)) { - throw new IOException("Failed to add Accumulo Token to UGI"); - } - } catch (AccumuloException | AccumuloSecurityException e) { - throw new IOException("Failed to acquire Accumulo DelegationToken", e); - } - } + getHelper().updateOutputFormatConfWithAccumuloToken(job, getCurrentUser(), cnxnParams); } else { - setConnectorInfoWithErrorChecking(job, cnxnParams.getAccumuloUserName(), + getHelper().setOutputFormatConnectorInfo(job, cnxnParams.getAccumuloUserName(), new PasswordToken(cnxnParams.getAccumuloPassword())); } @@ -141,45 +109,6 @@ protected void configureAccumuloOutputFormat(JobConf job) throws IOException { // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing - protected void setConnectorInfoWithErrorChecking(JobConf conf, String username, - AuthenticationToken token) throws AccumuloSecurityException { - try { - AccumuloIndexedOutputFormat.setConnectorInfo(conf, username, token); - } catch (IllegalStateException e) { - // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. - log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e); - } - } - - @SuppressWarnings("deprecation") - protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName, - String zookeepers, boolean isSasl) throws IOException { - try { - if (isSasl) { - // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped - // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only - // SASL support - getHelper().setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName, - isSasl); - } else { - AccumuloIndexedOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); - } - } catch (IllegalStateException ise) { - // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. - log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " - + zookeepers, ise); - } - } - - protected void setMockInstanceWithErrorChecking(JobConf conf, String instanceName) { - try { - AccumuloIndexedOutputFormat.setMockInstance(conf, instanceName); - } catch (IllegalStateException e) { - // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. - log.debug("Ignoring exception setting mock instance of " + instanceName, e); - } - } - protected void setDefaultAccumuloTableName(JobConf conf, String tableName) { AccumuloIndexedOutputFormat.setDefaultTableName(conf, tableName); } @@ -206,11 +135,6 @@ AccumuloConnectionParameters getConnectionParams(JobConf conf) { return new AccumuloConnectionParameters(conf); } - boolean hasKerberosCredentials(UserGroupInformation ugi) { - // Allows mocking in testing. - return ugi.hasKerberosCredentials(); - } - UserGroupInformation getCurrentUser() throws IOException { // Allows mocking in testing. return UserGroupInformation.getCurrentUser(); diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java index 02d9736c00..67e9250dab 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/CompositeAccumuloRowIdFactory.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; +import java.util.Collections; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -57,7 +58,7 @@ public CompositeAccumuloRowIdFactory(Class keyClass) throws SecurityException public void addDependencyJars(Configuration jobConf) throws IOException { // Make sure the jar containing the custom CompositeRowId is included // in the mapreduce job's classpath (libjars) - Utils.addDependencyJars(jobConf, keyClass); + Utils.addDependencyJars(jobConf, Collections.> singletonList(keyClass)); } @Override diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java index ea04d1a1bf..6d96d9b370 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/DefaultAccumuloRowIdFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.accumulo.serde; import java.io.IOException; +import java.util.Collections; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -61,7 +62,7 @@ public void init(AccumuloSerDeParameters accumuloSerDeParams, Properties propert @Override public void addDependencyJars(Configuration conf) throws IOException { - Utils.addDependencyJars(conf, getClass()); + Utils.addDependencyJars(conf, Collections.> singletonList(getClass())); } @Override diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java index 8d195ee276..58bf4a695d 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java @@ -23,6 +23,7 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -43,14 +44,17 @@ */ public class TestAccumuloStorageHandler { - protected AccumuloStorageHandler storageHandler; + private AccumuloStorageHandler storageHandler; + private Configuration conf; @Rule public TestName test = new TestName(); @Before public void setup() { + conf = new Configuration(); storageHandler = new AccumuloStorageHandler(); + storageHandler.setConf(conf); } @Test diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java index 406768a94d..af561e0c3d 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java @@ -23,12 +23,16 @@ import java.util.ArrayList; import java.util.Collection; +import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.log4j.Logger; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -90,51 +94,163 @@ public void testTokenToConfFromUser() throws Exception { assertEquals(service, credTokens.iterator().next().getService()); } - @Test(expected = IllegalStateException.class) - public void testISEIsPropagated() throws Exception { + @Test + public void testInputFormatWithKerberosToken() throws Exception { + final JobConf jobConf = new JobConf(); final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); + final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class); + final Token hadoopToken = Mockito.mock(Token.class); + final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class); + final Connector connector = Mockito.mock(Connector.class); + + final String user = "bob"; + final String instanceName = "accumulo"; + final String zookeepers = "host1:2181,host2:2181,host3:2181"; + UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]); - final JobConf jobConf = Mockito.mock(JobConf.class); - final Class inputOrOutputFormatClass = AccumuloInputFormat.class; - final String zookeepers = "localhost:2181"; - final String instanceName = "accumulo_instance"; - final boolean useSasl = false; + // Call the real methods for these + Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.doCallRealMethod().when(helper).updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, true); - // Call the real "public" method - Mockito.doCallRealMethod().when(helper).setZooKeeperInstance(jobConf, inputOrOutputFormatClass, - zookeepers, instanceName, useSasl); + // Return our mocked objects + Mockito.when(cnxnParams.getConnector()).thenReturn(connector); + Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken); + Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken); - // Mock the private one to throw the ISE - Mockito.doThrow(new IllegalStateException()).when(helper). - setZooKeeperInstanceWithReflection(jobConf, inputOrOutputFormatClass, zookeepers, - instanceName, useSasl); + // Stub AccumuloConnectionParameters actions + Mockito.when(cnxnParams.useSasl()).thenReturn(true); + Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user); + Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName); + Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers); - // Should throw an IllegalStateException - helper.setZooKeeperInstance(jobConf, inputOrOutputFormatClass, zookeepers, instanceName, - useSasl); + // Test the InputFormat execution path + // + Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(true); + // Invoke the InputFormat entrypoint + helper.updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.verify(helper).setInputFormatConnectorInfo(jobConf, user, authToken); + Mockito.verify(helper).mergeTokenIntoJobConf(jobConf, hadoopToken); + Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf); + + // Make sure the token made it into the UGI + Collection> tokens = ugi.getTokens(); + Assert.assertEquals(1, tokens.size()); + Assert.assertEquals(hadoopToken, tokens.iterator().next()); } - @Test(expected = IllegalStateException.class) - public void testISEIsPropagatedWithReflection() throws Exception { + @Test + public void testInputFormatWithoutKerberosToken() throws Exception { + final JobConf jobConf = new JobConf(); + final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); + final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class); + final Token hadoopToken = Mockito.mock(Token.class); + final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class); + final Connector connector = Mockito.mock(Connector.class); + + final String user = "bob"; + final String instanceName = "accumulo"; + final String zookeepers = "host1:2181,host2:2181,host3:2181"; + UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]); + + // Call the real methods for these + Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.doCallRealMethod().when(helper).updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, true); + + // Return our mocked objects + Mockito.when(cnxnParams.getConnector()).thenReturn(connector); + Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken); + Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken); + + // Stub AccumuloConnectionParameters actions + Mockito.when(cnxnParams.useSasl()).thenReturn(true); + Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user); + Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName); + Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers); + + // Verify that when we have no kerberos credentials, we pull the serialized Token + Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(false); + helper.updateInputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf); + } + + @Test + public void testOutputFormatSaslConfigurationWithoutKerberosToken() throws Exception { + final JobConf jobConf = new JobConf(); final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); + final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class); + final Token hadoopToken = Mockito.mock(Token.class); + final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class); + final Connector connector = Mockito.mock(Connector.class); + + final String user = "bob"; + final String instanceName = "accumulo"; + final String zookeepers = "host1:2181,host2:2181,host3:2181"; + UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]); + + // Call the real methods for these + Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, false); + + // Return our mocked objects + Mockito.when(cnxnParams.getConnector()).thenReturn(connector); + Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken); + Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken); + + // Stub AccumuloConnectionParameters actions + Mockito.when(cnxnParams.useSasl()).thenReturn(true); + Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user); + Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName); + Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers); + + // Verify that when we have no kerberos credentials, we pull the serialized Token + Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(false); + helper.updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf); + } + + @Test + public void testOutputFormatSaslConfigurationWithKerberosToken() throws Exception { + final JobConf jobConf = new JobConf(); + final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); + final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class); + final Token hadoopToken = Mockito.mock(Token.class); + final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class); + final Connector connector = Mockito.mock(Connector.class); + + final String user = "bob"; + final String instanceName = "accumulo"; + final String zookeepers = "host1:2181,host2:2181,host3:2181"; + UserGroupInformation ugi = UserGroupInformation.createUserForTesting(user, new String[0]); + + // Call the real methods for these + Mockito.doCallRealMethod().when(helper).updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.doCallRealMethod().when(helper).updateConfWithAccumuloToken(jobConf, ugi, cnxnParams, false); + + // Return our mocked objects + Mockito.when(cnxnParams.getConnector()).thenReturn(connector); + Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken); + Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken); - final JobConf jobConf = Mockito.mock(JobConf.class); - final Class inputOrOutputFormatClass = AccumuloInputFormat.class; - final String zookeepers = "localhost:2181"; - final String instanceName = "accumulo_instance"; - final boolean useSasl = false; + // Stub AccumuloConnectionParameters actions + Mockito.when(cnxnParams.useSasl()).thenReturn(true); + Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user); + Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName); + Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers); - // Call the real "public" method - Mockito.doCallRealMethod().when(helper).setZooKeeperInstance(jobConf, inputOrOutputFormatClass, - zookeepers, instanceName, useSasl); + // We have kerberos credentials - // Mock the private one to throw the IAE - Mockito.doThrow(new InvocationTargetException(new IllegalStateException())).when(helper). - setZooKeeperInstanceWithReflection(jobConf, inputOrOutputFormatClass, zookeepers, - instanceName, useSasl); + Mockito.when(helper.hasKerberosCredentials(ugi)).thenReturn(true); + // Invoke the OutputFormat entrypoint + helper.updateOutputFormatConfWithAccumuloToken(jobConf, ugi, cnxnParams); + Mockito.verify(helper).setOutputFormatConnectorInfo(jobConf, user, authToken); + Mockito.verify(helper).mergeTokenIntoJobConf(jobConf, hadoopToken); + Mockito.verify(helper).addTokenFromUserToJobConf(ugi, jobConf); - // Should throw an IllegalStateException - helper.setZooKeeperInstance(jobConf, inputOrOutputFormatClass, zookeepers, instanceName, - useSasl); + // Make sure the token made it into the UGI + Collection> tokens = ugi.getTokens(); + Assert.assertEquals(1, tokens.size()); + Assert.assertEquals(hadoopToken, tokens.iterator().next()); } } diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java index ee5aecff4e..56beb8f12b 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants; import org.apache.hadoop.hive.accumulo.AccumuloHiveRow; +import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper; import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; import org.apache.hadoop.hive.accumulo.columns.ColumnMapper; import org.apache.hadoop.hive.accumulo.columns.ColumnMapping; @@ -472,6 +473,10 @@ public void testConfigureMockAccumuloInputFormat() throws Exception { Set ranges = Collections.singleton(new Range()); HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class); + HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); + + // Stub out a mocked Helper instance + Mockito.when(mockInputFormat.getHelper()).thenReturn(helper); // Call out to the real configure method Mockito.doCallRealMethod().when(mockInputFormat) @@ -485,8 +490,8 @@ public void testConfigureMockAccumuloInputFormat() throws Exception { ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setMockInstance(conf, mockInstance.getInstanceName()); - Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); + Mockito.verify(helper).setInputFormatMockInstance(conf, mockInstance.getInstanceName()); + Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, con.securityOperations().getUserAuthorizations(USER)); @@ -509,10 +514,13 @@ public void testConfigureAccumuloInputFormat() throws Exception { ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class); HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class); + HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); // Stub out the ZKI mock Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName); Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers); + // Stub out a mocked Helper instance + Mockito.when(mockInputFormat.getHelper()).thenReturn(helper); // Call out to the real configure method Mockito.doCallRealMethod().when(mockInputFormat) @@ -526,8 +534,8 @@ public void testConfigureAccumuloInputFormat() throws Exception { ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); - Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); + Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false); + Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, con.securityOperations().getUserAuthorizations(USER)); @@ -551,10 +559,13 @@ public void testConfigureAccumuloInputFormatWithAuthorizations() throws Exceptio ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class); HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class); + HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); // Stub out the ZKI mock Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName); Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers); + // Stub out a mocked Helper instance + Mockito.when(mockInputFormat.getHelper()).thenReturn(helper); // Call out to the real configure method Mockito.doCallRealMethod().when(mockInputFormat) @@ -568,8 +579,8 @@ public void testConfigureAccumuloInputFormatWithAuthorizations() throws Exceptio ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); - Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); + Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false); + Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, new Authorizations("foo,bar")); Mockito.verify(mockInputFormat).addIterators(conf, iterators); @@ -605,10 +616,13 @@ public void testConfigureAccumuloInputFormatWithIterators() throws Exception { ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class); HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class); + HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); // Stub out the ZKI mock Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName); Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers); + // Stub out a mocked Helper instance + Mockito.when(mockInputFormat.getHelper()).thenReturn(helper); // Call out to the real configure method Mockito.doCallRealMethod().when(mockInputFormat) @@ -622,8 +636,8 @@ public void testConfigureAccumuloInputFormatWithIterators() throws Exception { ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); - Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); + Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false); + Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, con.securityOperations().getUserAuthorizations(USER)); @@ -659,12 +673,15 @@ public void testConfigureAccumuloInputFormatWithEmptyColumns() throws Exception ZooKeeperInstance zkInstance = Mockito.mock(ZooKeeperInstance.class); HiveAccumuloTableInputFormat mockInputFormat = Mockito.mock(HiveAccumuloTableInputFormat.class); + HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); // Stub out the ZKI mock Mockito.when(zkInstance.getInstanceName()).thenReturn(instanceName); Mockito.when(zkInstance.getZooKeepers()).thenReturn(zookeepers); Mockito.when(mockInputFormat.getPairCollection(columnMapper.getColumnMappings())).thenReturn( cfCqPairs); + // Stub out a mocked Helper instance + Mockito.when(mockInputFormat.getHelper()).thenReturn(helper); // Call out to the real configure method Mockito.doCallRealMethod().when(mockInputFormat) @@ -678,8 +695,8 @@ public void testConfigureAccumuloInputFormatWithEmptyColumns() throws Exception ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); - Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); + Mockito.verify(helper).setInputFormatZooKeeperInstance(conf, instanceName, zookeepers, false); + Mockito.verify(helper).setInputFormatConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, con.securityOperations().getUserAuthorizations(USER)); diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java index 5fdab28e5b..1dd2b8cac5 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java @@ -109,14 +109,17 @@ public void cleanup() { @Test public void testBasicConfiguration() throws IOException, AccumuloSecurityException { HiveAccumuloTableOutputFormat outputFormat = Mockito.mock(HiveAccumuloTableOutputFormat.class); + HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); + + Mockito.when(outputFormat.getHelper()).thenReturn(helper); Mockito.doCallRealMethod().when(outputFormat).configureAccumuloOutputFormat(conf); Mockito.doCallRealMethod().when(outputFormat).getConnectionParams(conf); outputFormat.configureAccumuloOutputFormat(conf); - Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, new PasswordToken(password)); - Mockito.verify(outputFormat).setZooKeeperInstanceWithErrorChecking(conf, instanceName, zookeepers, false); + Mockito.verify(helper).setOutputFormatConnectorInfo(conf, user, new PasswordToken(password)); + Mockito.verify(helper).setOutputFormatZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable); } @@ -160,38 +163,32 @@ public void testSaslConfiguration() throws IOException, AccumuloException, Accum Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers); // Stub OutputFormat actions - Mockito.when(outputFormat.hasKerberosCredentials(user1)).thenReturn(true); + Mockito.when(helper.hasKerberosCredentials(user1)).thenReturn(true); // Invoke the method outputFormat.configureAccumuloOutputFormat(conf); - // The AccumuloInputFormat methods - Mockito.verify(outputFormat).setZooKeeperInstanceWithErrorChecking(conf, instanceName, zookeepers, true); - Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, authToken); + // The AccumuloOutputFormat methods + Mockito.verify(helper).setOutputFormatZooKeeperInstance(conf, instanceName, zookeepers, true); + Mockito.verify(helper).updateOutputFormatConfWithAccumuloToken(conf, user1, cnxnParams); Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable); - - // Other methods we expect - Mockito.verify(helper).mergeTokenIntoJobConf(conf, hadoopToken); - - // Make sure the token made it into the UGI - Collection> tokens = user1.getTokens(); - Assert.assertEquals(1, tokens.size()); - Assert.assertEquals(hadoopToken, tokens.iterator().next()); } @Test public void testMockInstance() throws IOException, AccumuloSecurityException { HiveAccumuloTableOutputFormat outputFormat = Mockito.mock(HiveAccumuloTableOutputFormat.class); + HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); conf.setBoolean(AccumuloConnectionParameters.USE_MOCK_INSTANCE, true); conf.unset(AccumuloConnectionParameters.ZOOKEEPERS); + Mockito.when(outputFormat.getHelper()).thenReturn(helper); Mockito.doCallRealMethod().when(outputFormat).configureAccumuloOutputFormat(conf); Mockito.doCallRealMethod().when(outputFormat).getConnectionParams(conf); outputFormat.configureAccumuloOutputFormat(conf); - Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, new PasswordToken(password)); - Mockito.verify(outputFormat).setMockInstanceWithErrorChecking(conf, instanceName); + Mockito.verify(helper).setOutputFormatConnectorInfo(conf, user, new PasswordToken(password)); + Mockito.verify(helper).setOutputFormatMockInstance(conf, instanceName); Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable); } diff --git a/itests/qtest-accumulo/pom.xml b/itests/qtest-accumulo/pom.xml index b7ce28336a..40d0a749c8 100644 --- a/itests/qtest-accumulo/pom.xml +++ b/itests/qtest-accumulo/pom.xml @@ -38,7 +38,8 @@ true - 0.9.0 + + 0.9.1 -mkdir -p diff --git a/pom.xml b/pom.xml index 1b8963274e..ed11856ac5 100644 --- a/pom.xml +++ b/pom.xml @@ -109,7 +109,7 @@ 1.8 - 1.6.0 + 1.7.3 5.5.0 1.9.1 3.5.2 -- 2.12.2