From 70e7815b37a28a90542b3911b8238b304390e8e7 Mon Sep 17 00:00:00 2001 From: jackbearden Date: Mon, 20 Aug 2018 15:56:21 -0700 Subject: [PATCH] HBASE-20993. Fix IPC client fallback to simple auth --- .../hadoop/hbase/ipc/BlockingRpcConnection.java | 26 +++ .../org/apache/hadoop/hbase/ipc/RpcConnection.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 29 +-- .../hadoop/hbase/security/TestInsecureIPC.java | 207 +++++++++++++++++++++ 4 files changed, 251 insertions(+), 13 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index d5cf6a2748..c5e47fe528 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; +import org.apache.hadoop.hbase.security.SaslStatus; +import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -440,6 +442,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); // Write out the preamble -- MAGIC, version, and auth to use. writeConnectionHeaderPreamble(outStream); + readPreambleResponse(inStream); if (useSasl) { final InputStream in2 = inStream; final OutputStream out2 = outStream; @@ -499,6 +502,29 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { thread.start(); } + private void readPreambleResponse(InputStream inStream) throws IOException { + DataInputStream resultCode = new DataInputStream(new BufferedInputStream(inStream)); + int state = resultCode.readInt(); + int fallback = resultCode.readInt(); + if (state == SaslStatus.SUCCESS.state) { + if (fallback == 0) { + return; + } + if (fallback == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { + if (this.rpcClient.fallbackAllowed) { + useSasl = false; + return; + } else { + throw new DoNotRetryIOException("Server asks client fall back to SIMPLE auth, " + + "but client doesn't allow."); + } + } + } + if (state == SaslStatus.ERROR.state) { + readResponse(); + } + } + /** * Write the RPC header: {@code } */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 5e9e97e219..93881dd464 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -58,7 +58,7 @@ abstract class RpcConnection { protected final AuthMethod authMethod; - protected final boolean useSasl; + protected boolean useSasl; protected final Token token; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 395093904f..9d43465507 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1296,7 +1296,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private AuthMethod authMethod; private boolean saslContextEstablished; - private boolean skipInitialSaslHandshake; private ByteBuffer unwrappedData; // When is this set? FindBugs wants to know! Says NP private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); @@ -1573,6 +1572,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { preambleBuffer.flip(); for (int i = 0; i < HConstants.RPC_HEADER.length; i++) { if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) { + doRawSaslReply(SaslStatus.ERROR, null, null, null); return doBadPreambleHandling("Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" + Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) + @@ -1583,18 +1583,24 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1); this.authMethod = AuthMethod.valueOf(authbyte); if (version != CURRENT_VERSION) { + doRawSaslReply(SaslStatus.ERROR, null, null, null); String msg = getFatalConnectionString(version, authbyte); return doBadPreambleHandling(msg, new WrongVersionException(msg)); } if (authMethod == null) { + doRawSaslReply(SaslStatus.ERROR, null, null, null); String msg = getFatalConnectionString(version, authbyte); return doBadPreambleHandling(msg, new BadAuthException(msg)); } if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { + // Case: (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) + // server side uses non-simple auth, client side uses simple auth. if (allowFallbackToSimpleAuth) { + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null); metrics.authenticationFallback(); authenticatedWithFallback = true; } else { + doRawSaslReply(SaslStatus.ERROR, null, null, null); AccessDeniedException ae = new AccessDeniedException("Authentication is required"); setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); responder.doRespond(authFailedCall); @@ -1602,16 +1608,20 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { + // Case: (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) + // server side uses simple auth, client side uses non-simple auth. doRawSaslReply(SaslStatus.SUCCESS, new IntWritable( SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null); authMethod = AuthMethod.SIMPLE; - // client has already sent the initial Sasl message and we - // should ignore it. Both client and server should fall back - // to simple auth from now on. - skipInitialSaslHandshake = true; - } - if (authMethod != AuthMethod.SIMPLE) { + } else if (authMethod != AuthMethod.SIMPLE) { + // Case: (isSecurityEnabled && authMethod != AuthMethod.SIMPLE) + // both server and client side use non-simple auth. useSasl = true; + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null); + } else if (!isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { + // Case: (!isSecurityEnabled && authMethod == AuthMethod.SIMPLE) + // both server and client side use simple auth. + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null); } preambleBuffer = null; // do not need it anymore @@ -1753,11 +1763,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private void process() throws IOException, InterruptedException { data.flip(); try { - if (skipInitialSaslHandshake) { - skipInitialSaslHandshake = false; - return; - } - if (useSasl) { saslReadAndProcess(data); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java new file mode 100644 index 0000000000..c7894af320 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingService; +import com.google.protobuf.ServiceException; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.*; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.*; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +@RunWith(Parameterized.class) +@Category({ SecurityTests.class, SmallTests.class }) +public class TestInsecureIPC { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final File KEYTAB_FILE = + new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + + private static MiniKdc KDC; + private static String HOST = "localhost"; + private static String PRINCIPAL; + + String krbKeytab; + String krbPrincipal; + + Configuration clientConf; + Configuration serverConf; + UserGroupInformation ugi; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Parameterized.Parameters(name = "{index}: rpcClientImpl={0}") + public static Collection parameters() { + return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() }); + } + + @Parameterized.Parameter + public String rpcClientImpl; + + @BeforeClass + public static void setUp() throws Exception { + KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); + PRINCIPAL = "hbase/" + HOST; + KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL); + HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); + } + + @AfterClass + public static void tearDown() throws IOException { + if (KDC != null) { + KDC.stop(); + } + TEST_UTIL.cleanupTestDir(); + } + + @Before + public void setUpTest() throws Exception { + krbKeytab = getKeytabFileForTesting(); + krbPrincipal = getPrincipalForTesting(); + ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); + clientConf = getSecuredConfiguration(); + clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl); + serverConf = HBaseConfiguration.create(); + } + + @Test + public void testRpcInsecureClientAgainstInsecureServer() throws Exception { + String clientUsername = "testuser"; + UserGroupInformation clientUgi = + UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername }); + + assertNotSame(ugi, clientUgi); + assertEquals(UserGroupInformation.AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod()); + assertEquals(clientUsername, clientUgi.getUserName()); + + clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); + callInsecureRpcService(User.create(clientUgi)); + } + + @Test + public void testRpcFallbackToSimpleFromKerberosClientAgainstInsecureServer() throws Exception { + UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser(); + + assertSame(ugi, ugi2); + assertEquals(UserGroupInformation.AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod()); + assertEquals(krbPrincipal, ugi.getUserName()); + + serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true); + callInsecureRpcService(User.create(ugi2)); + } + + private void callInsecureRpcService(User clientUser) throws Exception { + InetSocketAddress isa = new InetSocketAddress(HOST, 0); + + RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestInsecureIPC", + Lists.newArrayList( + new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)), + isa, serverConf, new FifoRpcScheduler(serverConf, 1)); + rpcServer.start(); + try (RpcClient rpcClient = + RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) { + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser); + TestInsecureIPC.TestThread th1 = new TestInsecureIPC.TestThread(stub); + final Throwable exception[] = new Throwable[1]; + Collections.synchronizedList(new ArrayList()); + Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + exception[0] = ex; + } + }; + th1.setUncaughtExceptionHandler(exceptionHandler); + th1.start(); + th1.join(); + if (exception[0] != null) { + // throw root cause. + while (exception[0].getCause() != null) { + exception[0] = exception[0].getCause(); + } + throw (Exception) exception[0]; + } + } finally { + rpcServer.stop(); + } + } + + private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal) + throws Exception { + Configuration cnf = new Configuration(); + cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(cnf); + UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab); + return UserGroupInformation.getLoginUser(); + } + + public static class TestThread extends Thread { + private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub; + + public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) { + this.stub = stub; + } + + @Override + public void run() { + try { + int[] messageSize = new int[] { 100, 1000, 10000 }; + for (int i = 0; i < messageSize.length; i++) { + String input = RandomStringUtils.random(messageSize[i]); + String result = + stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build()) + .getMessage(); + assertEquals(input, result); + } + } catch (ServiceException e) { + throw new RuntimeException(e); + } + } + } +} -- 2.15.1 (Apple Git-101)