diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java index 0189c07..3ae5431 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java @@ -61,7 +61,7 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException } protected void configureAccumuloOutputFormat(JobConf job) throws IOException { - AccumuloConnectionParameters cnxnParams = new AccumuloConnectionParameters(job); + AccumuloConnectionParameters cnxnParams = getConnectionParams(job); final String tableName = job.get(AccumuloSerDeParameters.TABLE_NAME); @@ -72,35 +72,35 @@ protected void configureAccumuloOutputFormat(JobConf job) throws IOException { // Set the necessary Accumulo information try { if (cnxnParams.useMockInstance()) { - setAccumuloMockInstance(job, cnxnParams.getAccumuloInstanceName()); + setMockInstanceWithErrorChecking(job, cnxnParams.getAccumuloInstanceName()); } else { // Accumulo instance name with ZK quorum - setAccumuloZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(), + setZooKeeperInstanceWithErrorChecking(job, cnxnParams.getAccumuloInstanceName(), cnxnParams.getZooKeepers(), cnxnParams.useSasl()); } // Extract the delegation Token from the UGI and add it to the job // The AccumuloOutputFormat will look for it there. if (cnxnParams.useSasl()) { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - if (!ugi.hasKerberosCredentials()) { - helper.addTokenFromUserToJobConf(ugi, job); + UserGroupInformation ugi = getCurrentUser(); + if (!hasKerberosCredentials(ugi)) { + getHelper().addTokenFromUserToJobConf(ugi, job); } else { // Still in the local JVM, can use Kerberos credentials try { Connector connector = cnxnParams.getConnector(); - AuthenticationToken token = helper.getDelegationToken(connector); + AuthenticationToken token = getHelper().getDelegationToken(connector); // Send the DelegationToken down to the Configuration for Accumulo to use - setConnectorInfo(job, cnxnParams.getAccumuloUserName(), token); + setConnectorInfoWithErrorChecking(job, cnxnParams.getAccumuloUserName(), token); // Convert the Accumulo token in a Hadoop token - Token accumuloToken = helper.getHadoopToken(token); + Token accumuloToken = getHelper().getHadoopToken(token); log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); // Add the Hadoop token to the JobConf - helper.mergeTokenIntoJobConf(job, accumuloToken); + getHelper().mergeTokenIntoJobConf(job, accumuloToken); // Make sure the UGI contains the token too for good measure if (!ugi.addToken(accumuloToken)) { @@ -111,7 +111,7 @@ protected void configureAccumuloOutputFormat(JobConf job) throws IOException { } } } else { - setAccumuloConnectorInfo(job, cnxnParams.getAccumuloUserName(), + setConnectorInfoWithErrorChecking(job, cnxnParams.getAccumuloUserName(), new PasswordToken(cnxnParams.getAccumuloPassword())); } @@ -125,7 +125,7 @@ protected void configureAccumuloOutputFormat(JobConf job) throws IOException { // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing - protected void setAccumuloConnectorInfo(JobConf conf, String username, AuthenticationToken token) + protected void setConnectorInfoWithErrorChecking(JobConf conf, String username, AuthenticationToken token) throws AccumuloSecurityException { try { AccumuloOutputFormat.setConnectorInfo(conf, username, token); @@ -136,14 +136,14 @@ protected void setAccumuloConnectorInfo(JobConf conf, String username, Authentic } @SuppressWarnings("deprecation") - protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers, + protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName, String zookeepers, boolean isSasl) throws IOException { try { if (isSasl) { // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only // SASL support - helper.setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName, + getHelper().setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName, isSasl); } else { AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); @@ -155,7 +155,7 @@ protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, S } } - protected void setAccumuloMockInstance(JobConf conf, String instanceName) { + protected void setMockInstanceWithErrorChecking(JobConf conf, String instanceName) { try { AccumuloOutputFormat.setMockInstance(conf, instanceName); } catch (IllegalStateException e) { @@ -167,4 +167,24 @@ protected void setAccumuloMockInstance(JobConf conf, String instanceName) { protected void setDefaultAccumuloTableName(JobConf conf, String tableName) { AccumuloOutputFormat.setDefaultTableName(conf, tableName); } + + HiveAccumuloHelper getHelper() { + // Allows mocking in testing. + return helper; + } + + AccumuloConnectionParameters getConnectionParams(JobConf conf) { + // Allows mocking in testing. + return new AccumuloConnectionParameters(conf); + } + + boolean hasKerberosCredentials(UserGroupInformation ugi) { + // Allows mocking in testing. + return ugi.hasKerberosCredentials(); + } + + UserGroupInformation getCurrentUser() throws IOException { + // Allows mocking in testing. + return UserGroupInformation.getCurrentUser(); + } } diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java index 88544f0..3f2122d 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -25,11 +26,13 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.log4j.Logger; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; public class TestHiveAccumuloHelper { + private static final Logger log = Logger.getLogger(TestHiveAccumuloHelper.class); private HiveAccumuloHelper helper; @@ -46,7 +49,13 @@ public void testTokenMerge() throws Exception { Mockito.when(token.getService()).thenReturn(service); - helper.mergeTokenIntoJobConf(jobConf, token); + try { + helper.mergeTokenIntoJobConf(jobConf, token); + } catch (IOException e) { + // Hadoop 1 doesn't support credential merging, so this will fail. + log.info("Ignoring exception, likely coming from Hadoop 1", e); + return; + } Collection> tokens = jobConf.getCredentials().getAllTokens(); assertEquals(1, tokens.size()); @@ -66,7 +75,13 @@ public void testTokenToConfFromUser() throws Exception { Mockito.when(token.getKind()).thenReturn(HiveAccumuloHelper.ACCUMULO_SERVICE); Mockito.when(token.getService()).thenReturn(service); - helper.addTokenFromUserToJobConf(ugi, jobConf); + try { + helper.addTokenFromUserToJobConf(ugi, jobConf); + } catch (IOException e) { + // Hadoop 1 doesn't support credential merging, so this will fail. + log.info("Ignoring exception, likely coming from Hadoop 1", e); + return; + } Collection> credTokens = jobConf.getCredentials().getAllTokens(); assertEquals(1, credTokens.size()); diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java index 5d3f15b..5fdab28 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java @@ -18,15 +18,18 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.Properties; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -36,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; +import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper; import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; import org.apache.hadoop.hive.accumulo.serde.AccumuloRowSerializer; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe; @@ -54,6 +58,10 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -87,6 +95,15 @@ public void setup() throws IOException { conf.set(AccumuloConnectionParameters.INSTANCE_NAME, instanceName); conf.set(AccumuloConnectionParameters.ZOOKEEPERS, zookeepers); conf.set(AccumuloConnectionParameters.TABLE_NAME, outputTable); + + System.setProperty("java.security.krb5.realm", "accumulo"); + System.setProperty("java.security.krb5.kdc", "fake"); + } + + @After + public void cleanup() { + System.setProperty("java.security.krb5.realm", ""); + System.setProperty("java.security.krb5.kdc", ""); } @Test @@ -94,12 +111,72 @@ public void testBasicConfiguration() throws IOException, AccumuloSecurityExcepti HiveAccumuloTableOutputFormat outputFormat = Mockito.mock(HiveAccumuloTableOutputFormat.class); Mockito.doCallRealMethod().when(outputFormat).configureAccumuloOutputFormat(conf); + Mockito.doCallRealMethod().when(outputFormat).getConnectionParams(conf); + + outputFormat.configureAccumuloOutputFormat(conf); + + Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, new PasswordToken(password)); + Mockito.verify(outputFormat).setZooKeeperInstanceWithErrorChecking(conf, instanceName, zookeepers, false); + Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testSaslConfiguration() throws IOException, AccumuloException, AccumuloSecurityException { + final HiveAccumuloTableOutputFormat outputFormat = Mockito.mock(HiveAccumuloTableOutputFormat.class); + final AuthenticationToken authToken = Mockito.mock(AuthenticationToken.class); + final Token hadoopToken = Mockito.mock(Token.class); + final HiveAccumuloHelper helper = Mockito.mock(HiveAccumuloHelper.class); + final AccumuloConnectionParameters cnxnParams = Mockito.mock(AccumuloConnectionParameters.class); + final Connector connector = Mockito.mock(Connector.class); + + // Set UGI to use Kerberos + // Have to use the string constant to support hadoop 1 + conf.set("hadoop.security.authentication", "kerberos"); + UserGroupInformation.setConfiguration(conf); + + // Set the current UGI to a fake user + UserGroupInformation user1 = UserGroupInformation.createUserForTesting(user, new String[0]); + // Use that as the "current user" + Mockito.when(outputFormat.getCurrentUser()).thenReturn(user1); + + // Turn off passwords, enable sasl and set a keytab + conf.unset(AccumuloConnectionParameters.USER_PASS); + + // Call the real method instead of the mock + Mockito.doCallRealMethod().when(outputFormat).configureAccumuloOutputFormat(conf); + + // Return our mocked objects + Mockito.when(outputFormat.getHelper()).thenReturn(helper); + Mockito.when(outputFormat.getConnectionParams(conf)).thenReturn(cnxnParams); + Mockito.when(cnxnParams.getConnector()).thenReturn(connector); + Mockito.when(helper.getDelegationToken(connector)).thenReturn(authToken); + Mockito.when(helper.getHadoopToken(authToken)).thenReturn(hadoopToken); + // Stub AccumuloConnectionParameters actions + Mockito.when(cnxnParams.useSasl()).thenReturn(true); + Mockito.when(cnxnParams.getAccumuloUserName()).thenReturn(user); + Mockito.when(cnxnParams.getAccumuloInstanceName()).thenReturn(instanceName); + Mockito.when(cnxnParams.getZooKeepers()).thenReturn(zookeepers); + + // Stub OutputFormat actions + Mockito.when(outputFormat.hasKerberosCredentials(user1)).thenReturn(true); + + // Invoke the method outputFormat.configureAccumuloOutputFormat(conf); - Mockito.verify(outputFormat).setAccumuloConnectorInfo(conf, user, new PasswordToken(password)); - Mockito.verify(outputFormat).setAccumuloZooKeeperInstance(conf, instanceName, zookeepers, false); + // The AccumuloInputFormat methods + Mockito.verify(outputFormat).setZooKeeperInstanceWithErrorChecking(conf, instanceName, zookeepers, true); + Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, authToken); Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable); + + // Other methods we expect + Mockito.verify(helper).mergeTokenIntoJobConf(conf, hadoopToken); + + // Make sure the token made it into the UGI + Collection> tokens = user1.getTokens(); + Assert.assertEquals(1, tokens.size()); + Assert.assertEquals(hadoopToken, tokens.iterator().next()); } @Test @@ -109,11 +186,12 @@ public void testMockInstance() throws IOException, AccumuloSecurityException { conf.unset(AccumuloConnectionParameters.ZOOKEEPERS); Mockito.doCallRealMethod().when(outputFormat).configureAccumuloOutputFormat(conf); + Mockito.doCallRealMethod().when(outputFormat).getConnectionParams(conf); outputFormat.configureAccumuloOutputFormat(conf); - Mockito.verify(outputFormat).setAccumuloConnectorInfo(conf, user, new PasswordToken(password)); - Mockito.verify(outputFormat).setAccumuloMockInstance(conf, instanceName); + Mockito.verify(outputFormat).setConnectorInfoWithErrorChecking(conf, user, new PasswordToken(password)); + Mockito.verify(outputFormat).setMockInstanceWithErrorChecking(conf, instanceName); Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable); }