From b2a7aa6f617ab947ae7de264fa66c52336152efa Mon Sep 17 00:00:00 2001 From: Reid Chan Date: Mon, 29 Apr 2019 18:50:50 +0800 Subject: [PATCH] HBASE-20993 [Auth] IPC client fallback to simple auth allowed doesn't work --- .../hadoop/hbase/ipc/BlockingRpcConnection.java | 84 +++++--- .../hbase/ipc/FallbackDisallowedException.java | 4 +- .../hadoop/hbase/ipc/NettyRpcConnection.java | 42 +++- .../hadoop/hbase/ipc/PreambleResponseHandler.java | 74 +++++++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 56 ++++-- .../hadoop/hbase/security/TestInsecureIPC.java | 222 +++++++++++++++++++++ 6 files changed, 433 insertions(+), 49 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleResponseHandler.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index d5cf6a2748..e221886097 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; +import org.apache.hadoop.hbase.security.SaslStatus; +import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -111,6 +113,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { private byte[] connectionHeaderWithLength; + private int serverAskFallback; + /** * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to @@ -440,38 +444,49 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); // Write out the preamble -- MAGIC, version, and auth to use. writeConnectionHeaderPreamble(outStream); + readPreambleResponse(inStream); if (useSasl) { - final InputStream in2 = inStream; - final OutputStream out2 = outStream; - UserGroupInformation ticket = getUGI(); - boolean continueSasl; - if (ticket == null) { - throw new FatalConnectionException("ticket/user is null"); - } - try { - continueSasl = ticket.doAs(new PrivilegedExceptionAction() { - @Override - public Boolean run() throws IOException { - return setupSaslConnection(in2, out2); - } - }); - } catch (Exception ex) { - ExceptionUtil.rethrowIfInterrupt(ex); - handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, ticket); - continue; - } - if (continueSasl) { - // Sasl connect is successful. Let's set up Sasl i/o streams. - inStream = saslRpcClient.getInputStream(inStream); - outStream = saslRpcClient.getOutputStream(outStream); + if (serverAskFallback != SaslUtil.SWITCH_TO_SIMPLE_AUTH) { + final InputStream in2 = inStream; + final OutputStream out2 = outStream; + UserGroupInformation ticket = getUGI(); + boolean continueSasl; + if (ticket == null) { + throw new FatalConnectionException("ticket/user is null"); + } + try { + continueSasl = ticket.doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws IOException { + return setupSaslConnection(in2, out2); + } + }); + } catch (Exception ex) { + ExceptionUtil.rethrowIfInterrupt(ex); + handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, ticket); + continue; + } + if (continueSasl) { + // Sasl connect is successful. Let's set up Sasl i/o streams. + inStream = saslRpcClient.getInputStream(inStream); + outStream = saslRpcClient.getOutputStream(outStream); + } } else { // fall back to simple auth because server told us so. // do not change authMethod and useSasl here, we should start from secure when // reconnecting because regionserver may change its sasl config after restart. + this.in = new DataInputStream(new BufferedInputStream(inStream)); + this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + // Write out a header for server to skip initial sasl handshake. + writeConnectionHeader(); } } - this.in = new DataInputStream(new BufferedInputStream(inStream)); - this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + if (this.in == null) { + this.in = new DataInputStream(new BufferedInputStream(inStream)); + } + if (this.out == null) { + this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + } // Now write out the connection header writeConnectionHeader(); break; @@ -499,6 +514,25 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { thread.start(); } + private void readPreambleResponse(InputStream inStream) throws IOException { + DataInputStream resultCode = new DataInputStream(new BufferedInputStream(inStream)); + int state = resultCode.readInt(); + if (state == SaslStatus.SUCCESS.state) { + int fallback = resultCode.readInt(); + if (fallback == SaslStatus.SUCCESS.state) { + return; + } + if (fallback == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { + if (this.rpcClient.fallbackAllowed) { + LOG.warn("Server asked us to fall back to SIMPLE auth. Falling back..."); + serverAskFallback = SaslUtil.SWITCH_TO_SIMPLE_AUTH; + return; + } + throw new FallbackDisallowedException(); + } + } + } + /** * Write the RPC header: {@code } */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java index 721148b08b..0f9812983c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java @@ -32,7 +32,7 @@ public class FallbackDisallowedException extends HBaseIOException { private static final long serialVersionUID = -6942845066279358253L; public FallbackDisallowedException() { - super("Server asks us to fall back to SIMPLE auth, " + super("Server asked us to fall back to SIMPLE auth, " + "but this client is configured to only allow secure connections."); } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index b5fb7e4147..7406823554 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -187,7 +187,7 @@ class NettyRpcConnection extends RpcConnection { failInit(ch, new FatalConnectionException("ticket/user is null")); return; } - Promise saslPromise = ch.eventLoop().newPromise(); + final Promise saslPromise = ch.eventLoop().newPromise(); ChannelHandler saslHandler; try { saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token, @@ -236,14 +236,50 @@ class NettyRpcConnection extends RpcConnection { rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause()); return; } - ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); + handshake(ch); + } + }).channel(); + } + + private void handshake(final Channel ch) { + ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); + + Promise preamblePromise = ch.eventLoop().newPromise(); + ChannelHandler pr = new PreambleResponseHandler(preamblePromise, rpcClient.fallbackAllowed); + ch.pipeline().addFirst(pr); + preamblePromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + switch (future.get()) { + case SUCCESS: { + ch.pipeline().remove(PreambleResponseHandler.class); if (useSasl) { saslNegotiate(ch); } else { established(ch); } + return; } - }).channel(); + case FALLBACK: { + ch.pipeline().remove(PreambleResponseHandler.class); + // Flush a useless request for server side to skip initial sasl handshake + ch.writeAndFlush(connectionHeaderWithLength.retainedDuplicate()); + established(ch); + return; + } + case FAILURE: { + Throwable cause = future.cause(); + scheduleRelogin(cause); + failInit(ch, toIOE(cause)); + return; + } + } + } + }); + } + + enum Result { + SUCCESS, FALLBACK, FAILURE } private void write(Channel ch, final Call call) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleResponseHandler.java new file mode 100644 index 0000000000..15ba2ffa57 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleResponseHandler.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.Promise; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.security.SaslStatus; +import org.apache.hadoop.hbase.security.SaslUtil; + +import java.io.IOException; + +/** + * Handler for dealing with preamble response. + */ +public class PreambleResponseHandler extends ChannelDuplexHandler { + private static final Log LOG = LogFactory.getLog(PreambleResponseHandler.class); + + private final boolean clientAllowFallback; + private final Promise promise; + + public PreambleResponseHandler(Promise promise, + boolean clientAllowFallback) { + this.promise = promise; + this.clientAllowFallback = clientAllowFallback; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf buf = (ByteBuf) msg; + int status = buf.readInt(); + if (status == SaslStatus.SUCCESS.state) { + if (buf.readInt() == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { + if (clientAllowFallback) { + LOG.warn("Server asked us to fall back to SIMPLE auth. Falling back..."); + promise.trySuccess(NettyRpcConnection.Result.FALLBACK); + } else { + promise.trySuccess(NettyRpcConnection.Result.FAILURE); + promise.tryFailure(new FallbackDisallowedException()); + } + } else { + promise.trySuccess(NettyRpcConnection.Result.SUCCESS); + } + } else { + promise.trySuccess(NettyRpcConnection.Result.FAILURE); + promise.tryFailure(new IOException("Error while establishing connection to server")); + } + buf.release(); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index a32040c295..c03884bef6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1589,6 +1589,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { preambleBuffer.flip(); for (int i = 0; i < HConstants.RPC_HEADER.length; i++) { if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) { + doRawSaslReply(SaslStatus.ERROR, null, null, null); return doBadPreambleHandling("Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" + Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) + @@ -1599,35 +1600,52 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1); this.authMethod = AuthMethod.valueOf(authbyte); if (version != CURRENT_VERSION) { + doRawSaslReply(SaslStatus.ERROR, null, null, null); String msg = getFatalConnectionString(version, authbyte); return doBadPreambleHandling(msg, new WrongVersionException(msg)); } if (authMethod == null) { + doRawSaslReply(SaslStatus.ERROR, null, null, null); String msg = getFatalConnectionString(version, authbyte); return doBadPreambleHandling(msg, new BadAuthException(msg)); } - if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { - if (allowFallbackToSimpleAuth) { - metrics.authenticationFallback(); - authenticatedWithFallback = true; + LOG.debug("Server auth: " + (isSecurityEnabled ? AuthMethod.KERBEROS : AuthMethod.SIMPLE) + + ", client auth: " + authMethod); + if (isSecurityEnabled) { + // Server side uses non-simple auth + if (authMethod == AuthMethod.SIMPLE) { + // client side uses simple auth + if (allowFallbackToSimpleAuth) { + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null); + metrics.authenticationFallback(); + authenticatedWithFallback = true; + } else { + doRawSaslReply(SaslStatus.ERROR, null, null, null); + AccessDeniedException ae = new AccessDeniedException("Authentication is required"); + setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); + responder.doRespond(authFailedCall); + throw ae; + } } else { - AccessDeniedException ae = new AccessDeniedException("Authentication is required"); - setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); - responder.doRespond(authFailedCall); - throw ae; + // client side use non-simple auth. + useSasl = true; + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null); } - } - if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { - doRawSaslReply(SaslStatus.SUCCESS, new IntWritable( + } else { + // Server side uses simple auth + if (authMethod == AuthMethod.SIMPLE) { + // client side use simple auth. + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(0), null, null); + } else { + // client side use non-simple auth. + doRawSaslReply(SaslStatus.SUCCESS, new IntWritable( SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null); - authMethod = AuthMethod.SIMPLE; - // client has already sent the initial Sasl message and we - // should ignore it. Both client and server should fall back - // to simple auth from now on. - skipInitialSaslHandshake = true; - } - if (authMethod != AuthMethod.SIMPLE) { - useSasl = true; + authMethod = AuthMethod.SIMPLE; + // client has already sent the initial Sasl message and we + // should ignore it. Both client and server should fall back + // to simple auth from now on. + skipInitialSaslHandshake = true; + } } preambleBuffer = null; // do not need it anymore diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java new file mode 100644 index 0000000000..355cadb14b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestInsecureIPC.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingService; +import com.google.protobuf.ServiceException; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.BlockingRpcClient; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.NettyRpcClient; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category({SecurityTests.class, SmallTests.class}) +public class TestInsecureIPC { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final File KEYTAB_FILE = + new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + + private static MiniKdc KDC; + private static String HOST = "localhost"; + private static String PRINCIPAL; + + String krbKeytab; + String krbPrincipal; + + Configuration clientConf; + Configuration serverConf; + UserGroupInformation ugi; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Parameterized.Parameters(name = "{index}: rpcClientImpl={0}") + public static Collection parameters() { + return Arrays.asList(new Object[]{BlockingRpcClient.class.getName()}, + new Object[]{NettyRpcClient.class.getName()}); + } + + @Parameterized.Parameter + public String rpcClientImpl; + + @BeforeClass + public static void setUp() throws Exception { + KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); + PRINCIPAL = "foo"; + KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL); + HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); + HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); + } + + @AfterClass + public static void tearDown() throws IOException { + if (KDC != null) { + KDC.stop(); + } + TEST_UTIL.cleanupTestDir(); + } + + @Before + public void setupTest() throws Exception { + krbKeytab = getKeytabFileForTesting(); + krbPrincipal = getPrincipalForTesting(); + ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); + clientConf = getSecuredConfiguration(); + clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl); + serverConf = HBaseConfiguration.create(); + } + + @Test + public void testRpcSimpleClientAgainstSimpleServer() throws Exception { + String clientUsername = "testuser"; + UserGroupInformation clientUgi = + UserGroupInformation.createUserForTesting(clientUsername, new String[]{clientUsername}); + + assertNotSame(ugi, clientUgi); + assertEquals(UserGroupInformation.AuthenticationMethod.SIMPLE, + clientUgi.getAuthenticationMethod()); + assertEquals(clientUsername, clientUgi.getUserName()); + + clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); + callSimpleRpcService(User.create(clientUgi)); + } + + @Test + public void testKerberizedClientAgainstSimpleServer() throws Exception { + UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser(); + + assertSame(ugi, ugi2); + assertEquals(UserGroupInformation.AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod()); + assertEquals(krbPrincipal, ugi.getUserName()); + + clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true); + callSimpleRpcService(User.create(ugi2)); + } + + private void callSimpleRpcService(User clientUser) throws Exception { + InetSocketAddress isa = new InetSocketAddress(HOST, 0); + + RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestInsecureIPC", + Lists.newArrayList( + new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, + null)), + isa, serverConf, new FifoRpcScheduler(serverConf, 1)); + rpcServer.start(); + try (RpcClient rpcClient = + RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) { + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser); + TestInsecureIPC.TestThread th1 = new TestInsecureIPC.TestThread(stub); + final Throwable[] exception = new Throwable[1]; + Collections.synchronizedList(new ArrayList()); + Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + exception[0] = ex; + } + }; + th1.setUncaughtExceptionHandler(exceptionHandler); + th1.start(); + th1.join(); + if (exception[0] != null) { + // throw root cause. + while (exception[0].getCause() != null) { + exception[0] = exception[0].getCause(); + } + throw (Exception) exception[0]; + } + } finally { + rpcServer.stop(); + } + } + + private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal) + throws Exception { + Configuration cnf = new Configuration(); + cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(cnf); + UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab); + return UserGroupInformation.getLoginUser(); + } + + public static class TestThread extends Thread { + private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub; + + public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) { + this.stub = stub; + } + + @Override + public void run() { + try { + int[] messageSize = new int[]{100, 1000, 10000}; + for (int i = 0; i < messageSize.length; i++) { + String input = RandomStringUtils.random(messageSize[i]); + String result = + stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build()) + .getMessage(); + assertEquals(input, result); + } + } catch (ServiceException e) { + throw new RuntimeException(e); + } + } + } +} -- 2.15.0