From 3d754f4415b2ac2f20abae10834d41d101bb8da3 Mon Sep 17 00:00:00 2001 From: jackbearden Date: Mon, 13 Aug 2018 13:37:25 -0700 Subject: [PATCH] HBASE-20993. Fix IPC client fallback to simple auth --- .../hadoop/hbase/ipc/BlockingRpcConnection.java | 38 ++++ .../org/apache/hadoop/hbase/ipc/RpcConnection.java | 4 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 51 +++++ .../hadoop/hbase/security/TestInsecureIPC.java | 207 +++++++++++++++++++++ 4 files changed, 298 insertions(+), 2 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index d5cf6a2748..82619aee9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; @@ -65,8 +66,10 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.io.IOUtils; @@ -411,6 +414,36 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { }); } + private void readPreambleReply(InputStream inStream, ByteBuffer preambleBuffer) throws IOException { + DataInputStream response = new DataInputStream(new BufferedInputStream(inStream)); + while(preambleBuffer.hasRemaining()) { + int b = response.read(); + if (b == -1) { + break; + } + preambleBuffer.put((byte)b); + } + preambleBuffer.flip(); + } + + private void verifyPreambleReply(ByteBuffer preambleBuffer) throws FatalConnectionException { + for (int i = 0; i < HConstants.RPC_HEADER.length; i++) { + if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) { + throw new FatalConnectionException("Expected HEADER=" + + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" + + Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) + + " from " + toString()); + } + } + byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1); + this.authMethod = AuthMethod.valueOf(authbyte); + + if (authbyte == AuthMethod.SIMPLE.code) { + useSasl = false; + authMethod = AuthMethod.SIMPLE; + } + } + private void setupIOstreams() throws IOException { if (socket != null) { // The connection is already available. Perfect. @@ -440,6 +473,11 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); // Write out the preamble -- MAGIC, version, and auth to use. writeConnectionHeaderPreamble(outStream); + // Read and verify preamble reply from server + ByteBuffer preambleBuffer = ByteBuffer.allocate(6); + readPreambleReply(inStream, preambleBuffer); + verifyPreambleReply(preambleBuffer); + if (useSasl) { final InputStream in2 = inStream; final OutputStream out2 = outStream; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 5e9e97e219..28cae467b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -56,9 +56,9 @@ abstract class RpcConnection { protected final ConnectionId remoteId; - protected final AuthMethod authMethod; + protected AuthMethod authMethod; - protected final boolean useSasl; + protected boolean useSasl; protected final Token token; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 395093904f..36ec8052e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -425,6 +425,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return serviceName + "." + methodName; } + protected synchronized void setPreambleTokenResponse(ByteBuffer response) { + this.response = new BufferChain(response); + } + protected synchronized void setSaslTokenResponse(ByteBuffer response) { this.response = new BufferChain(response); } @@ -1309,6 +1313,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { null, null, this, null, 0, null, null, 0); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); + // Fake 'call' for PreambleReply context setup + private static final int PREAMBLE_REPLY_CALLID = -11; + private final Call preambleCall = new Call(PREAMBLE_REPLY_CALLID, null, null, null, null, null, this, null, + 0, null, null, 0); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null, @@ -1518,6 +1526,33 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } + private void doRawPreambleReply(int version, byte authbyte) throws IOException { + ByteBufferOutputStream preambleReply = null; + DataOutputStream out = null; + try { + preambleReply = new ByteBufferOutputStream(6); + out = new DataOutputStream(preambleReply); + out.write(HConstants.RPC_HEADER); + out.write(version); + if (authbyte == AuthMethod.KERBEROS.code && + !isSecurityEnabled && + allowFallbackToSimpleAuth) { + authbyte = AuthMethod.SIMPLE.code; + } + out.write(authbyte); + preambleCall.setPreambleTokenResponse(preambleReply.getByteBuffer()); + preambleCall.responder = responder; + preambleCall.sendResponseIfReady(); + } finally { + if (preambleReply != null) { + preambleReply.close(); + } + if (out != null) { + out.close(); + } + } + } + /** * No protobuf encoding of raw sasl messages */ @@ -1590,6 +1625,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { String msg = getFatalConnectionString(version, authbyte); return doBadPreambleHandling(msg, new BadAuthException(msg)); } + // Handle preamble reply + if (authMethod == AuthMethod.KERBEROS && !isSecurityEnabled) { + if (allowFallbackToSimpleAuth) { + doRawPreambleReply(version, authbyte); + preambleBuffer = null; // do not need it anymore + connectionPreambleRead = true; + + return count; + } + // Fallback not supported + String msg = getFatalConnectionString(version, authbyte); + return doBadPreambleHandling(msg, new BadAuthException(msg)); + } else { + // Reply with normal header + doRawPreambleReply(version, authbyte); + } if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { if (allowFallbackToSimpleAuth) { metrics.authenticationFallback(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java new file mode 100644 index 0000000000..c7894af320 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingService; +import com.google.protobuf.ServiceException; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.*; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.*; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +@RunWith(Parameterized.class) +@Category({ SecurityTests.class, SmallTests.class }) +public class TestInsecureIPC { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final File KEYTAB_FILE = + new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + + private static MiniKdc KDC; + private static String HOST = "localhost"; + private static String PRINCIPAL; + + String krbKeytab; + String krbPrincipal; + + Configuration clientConf; + Configuration serverConf; + UserGroupInformation ugi; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Parameterized.Parameters(name = "{index}: rpcClientImpl={0}") + public static Collection parameters() { + return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() }); + } + + @Parameterized.Parameter + public String rpcClientImpl; + + @BeforeClass + public static void setUp() throws Exception { + KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); + PRINCIPAL = "hbase/" + HOST; + KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL); + HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); + } + + @AfterClass + public static void tearDown() throws IOException { + if (KDC != null) { + KDC.stop(); + } + TEST_UTIL.cleanupTestDir(); + } + + @Before + public void setUpTest() throws Exception { + krbKeytab = getKeytabFileForTesting(); + krbPrincipal = getPrincipalForTesting(); + ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); + clientConf = getSecuredConfiguration(); + clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl); + serverConf = HBaseConfiguration.create(); + } + + @Test + public void testRpcInsecureClientAgainstInsecureServer() throws Exception { + String clientUsername = "testuser"; + UserGroupInformation clientUgi = + UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername }); + + assertNotSame(ugi, clientUgi); + assertEquals(UserGroupInformation.AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod()); + assertEquals(clientUsername, clientUgi.getUserName()); + + clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); + callInsecureRpcService(User.create(clientUgi)); + } + + @Test + public void testRpcFallbackToSimpleFromKerberosClientAgainstInsecureServer() throws Exception { + UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser(); + + assertSame(ugi, ugi2); + assertEquals(UserGroupInformation.AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod()); + assertEquals(krbPrincipal, ugi.getUserName()); + + serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true); + callInsecureRpcService(User.create(ugi2)); + } + + private void callInsecureRpcService(User clientUser) throws Exception { + InetSocketAddress isa = new InetSocketAddress(HOST, 0); + + RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestInsecureIPC", + Lists.newArrayList( + new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)), + isa, serverConf, new FifoRpcScheduler(serverConf, 1)); + rpcServer.start(); + try (RpcClient rpcClient = + RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) { + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser); + TestInsecureIPC.TestThread th1 = new TestInsecureIPC.TestThread(stub); + final Throwable exception[] = new Throwable[1]; + Collections.synchronizedList(new ArrayList()); + Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + exception[0] = ex; + } + }; + th1.setUncaughtExceptionHandler(exceptionHandler); + th1.start(); + th1.join(); + if (exception[0] != null) { + // throw root cause. + while (exception[0].getCause() != null) { + exception[0] = exception[0].getCause(); + } + throw (Exception) exception[0]; + } + } finally { + rpcServer.stop(); + } + } + + private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal) + throws Exception { + Configuration cnf = new Configuration(); + cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(cnf); + UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab); + return UserGroupInformation.getLoginUser(); + } + + public static class TestThread extends Thread { + private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub; + + public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) { + this.stub = stub; + } + + @Override + public void run() { + try { + int[] messageSize = new int[] { 100, 1000, 10000 }; + for (int i = 0; i < messageSize.length; i++) { + String input = RandomStringUtils.random(messageSize[i]); + String result = + stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build()) + .getMessage(); + assertEquals(input, result); + } + } catch (ServiceException e) { + throw new RuntimeException(e); + } + } + } +} -- 2.14.3 (Apple Git-98)