From ccae21c614f03e9cbb7b2bb19c10228d769aa121 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Wed, 16 Dec 2020 18:04:15 +0000 Subject: [PATCH] One way TLS on Netty RPC Implementation: Here, client is using default JDK truststore, while server is using a self signed certificate. For this test, we needed to import the server certificate into jdk default truststore, so that client can validate the server certificate. Since this is one-way, client certificate is not required. Server is loading private keys from local keystore. Currently hardcoded into netty rpc client/server related classes. That is, using this, would always do tls in the rpc. --- .../ipc/BufferCallBeforeInitHandler.java | 16 ++ .../hbase/ipc/NettyRpcClientSslHandler.java | 104 ++++++++++ .../hadoop/hbase/ipc/NettyRpcConnection.java | 185 ++++++++++++++---- .../security/NettyHBaseSaslRpcClient.java | 9 +- .../hbase/security/SaslChallengeDecoder.java | 10 +- .../hbase/ipc/NettyRpcFrameDecoder.java | 2 + .../hadoop/hbase/ipc/NettyRpcServer.java | 40 +++- .../ipc/NettyRpcServerPreambleHandler.java | 6 + .../ipc/NettyRpcServerRequestDecoder.java | 7 + .../ipc/NettyRpcServerResponseEncoder.java | 12 ++ .../hbase/ipc/NettyRpcSslHandlerFactory.java | 36 ++++ .../hbase/ipc/NettyServerRpcConnection.java | 10 + .../hadoop/hbase/ipc/ServerRpcConnection.java | 1 + 13 files changed, 393 insertions(+), 45 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientSslHandler.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcSslHandlerFactory.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java index 137e60b7460..f4f17598eba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.Map; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * We will expose the connection to upper layer before initialized, so we need to buffer the calls @@ -34,6 +36,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private class BufferCallBeforeInitHandler extends ChannelDuplexHandler { + private static final Logger LOG = LoggerFactory.getLogger(BufferCallBeforeInitHandler.class); + private enum BufferCallAction { FLUSH, FAIL } @@ -67,12 +71,24 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg instanceof Call) { + StringBuilder builder = new StringBuilder(); + for(StackTraceElement e : Thread.currentThread().getStackTrace()){ + builder.append(e.getClassName()).append(".").append(e.getMethodName()).append("(") + .append(e.getLineNumber()).append(")").append("\n"); + } + LOG.info(">>>>> writing Call message at:" + builder.toString()); Call call = (Call) msg; id2Call.put(call.id, call); // The call is already in track so here we set the write operation as success. // We will fail the call directly if we can not write it out. promise.trySuccess(); } else { + StringBuilder builder = new StringBuilder(); + for(StackTraceElement e : Thread.currentThread().getStackTrace()){ + builder.append(e.getClassName()).append(".").append(e.getMethodName()).append("(") + .append(e.getLineNumber()).append(")").append("\n"); + } + LOG.info(">>>>> writing non Call message {} at: {}", msg.toString(), builder.toString()); ctx.write(msg, promise); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientSslHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientSslHandler.java new file mode 100644 index 00000000000..bfcbedd42aa --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientSslHandler.java @@ -0,0 +1,104 @@ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; +import org.apache.hbase.thirdparty.io.netty.channel.socket.SocketChannel; +import org.apache.hbase.thirdparty.io.netty.handler.codec.DelimiterBasedFrameDecoder; +import org.apache.hbase.thirdparty.io.netty.handler.codec.Delimiters; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContextBuilder; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; +import org.apache.hbase.thirdparty.io.netty.util.concurrent.GenericFutureListener; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; +import java.io.File; +import java.io.FileInputStream; +import java.security.KeyStore; +import java.util.function.Consumer; +import java.util.function.Function; + +@InterfaceAudience.Private +public class NettyRpcClientSslHandler extends ChannelInitializer { + + private static final Logger LOG = LoggerFactory.getLogger(NettyRpcClientSslHandler.class); + + private SslContext context; + private String host; + private int port; + private GenericFutureListener> listener; + private Call call; + + public void setPostEstablishmentAction(Consumer postEstablishmentAction, Call call) { + this.postEstablishmentAction = postEstablishmentAction; + this.call = call; + } + + private Consumer postEstablishmentAction; + + public synchronized boolean isConnectionStablished() { + return connectionStablished; + } + + public synchronized void setConnectionStablished(boolean connectionStablished) { + this.connectionStablished = connectionStablished; + } + + private boolean connectionStablished; + + public NettyRpcClientSslHandler(SslContext context, String host, int port, GenericFutureListener> listener) { + this.context = context; + this.host=host; + this.port=port; + this.listener=listener; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + SslHandler handler = context.newHandler(ch.alloc(),host,port); + handler.handshakeFuture().addListener(this.listener); + LOG.info("registered handshakelistener to handler on thread {}", Thread.currentThread().getName()); + pipeline.addLast("ssl_handler", handler); + LOG.info("added new SslHandler for host: {}, port: {}", host, port); + pipeline.addLast(new BufferCallBeforeInitHandler()); + } + + public void executeActionAfterConnectionEstablished(){ + this.postEstablishmentAction.accept(call); + } + + public static class Builder { + + private ConnectionId connectionId; + private GenericFutureListener> listener; + + public Builder setHandshakeListener(GenericFutureListener> listener){ + this.listener=listener; + return this; + } + + public Builder setConnectionId(ConnectionId id){ + this.connectionId=id; + return this; + } + + public NettyRpcClientSslHandler build() { + try { + //setting null to TMF then resorts to jdk default truststore + final SslContext sslCtx = SslContextBuilder.forClient() + .trustManager((TrustManagerFactory) null).build(); + return new NettyRpcClientSslHandler(sslCtx, connectionId.address.getHostName(), connectionId.address.getPort(), listener); + } catch (Exception e){ + LOG.error("Error building Netty SSL handler: ", e); + } + return null; + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index ba85428dbde..eb4bc531678 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -35,6 +35,9 @@ import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.SaslChallengeDecoder; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; +import org.apache.hbase.thirdparty.io.netty.util.concurrent.GenericFutureListener; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +96,8 @@ class NettyRpcConnection extends RpcConnection { // thread to access this field. private volatile Channel channel; + private volatile NettyRpcClientSslHandler handler; + NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); @@ -152,12 +157,28 @@ class NettyRpcConnection extends RpcConnection { assert eventLoop.inEventLoop(); ChannelPipeline p = ch.pipeline(); String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name(); - p.addBefore(addBeforeHandler, null, - new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)); - p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); - p.addBefore(addBeforeHandler, null, - new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); +// p.addAfter("ssl_handler", null, +// new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)); +// p.addAfter("ssl_handler", null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); +// p.addAfter("ssl_handler", null, +// new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); + p.addBefore(addBeforeHandler, null, + new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)); + p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); + p.addBefore(addBeforeHandler, null, + new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); + StringBuilder builder = new StringBuilder(); + builder.append("Listing all pipeline handlers, in order, after connection established:\n"); + ch.pipeline().forEach( e -> { + builder.append(e.getKey()).append(" -> ").append(e.getValue()).append("\n"); + }); + LOG.info(builder.toString()); + p.fireUserEventTriggered(BufferCallEvent.success()); + +// NettyRpcClientSslHandler handler = channel.pipeline().get(NettyRpcClientSslHandler.class); + handler.setConnectionStablished(true); + handler.executeActionAfterConnectionEstablished(); } private boolean reloginInProgress; @@ -210,16 +231,33 @@ class NettyRpcConnection extends RpcConnection { failInit(ch, e); return; } - ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler); +// ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler); + ch.pipeline().addAfter("ssl_handler", "sasl_decoder", new SaslChallengeDecoder()); + LOG.info("added SaslChallengeDecoder in the pipeline..."); + ch.pipeline().addAfter("sasl_decoder", "sasl_handler", saslHandler); + LOG.info("added NettyHBaseSaslRpcClientHandler in the pipeline..."); + StringBuilder builder = new StringBuilder(); + builder.append("Listing all pipeline handlers, in order, prior to operationComplete:\n"); + ch.pipeline().forEach( e -> { + builder.append(e.getKey()).append(" -> ").append(e.getValue()).append("\n"); + }); + LOG.info(builder.toString()); saslPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { + StringBuilder builder = new StringBuilder(); + builder.append("Listing all pipeline handlers, in order, inside operationComplete:\n"); + ch.pipeline().forEach( e -> { + builder.append(e.getKey()).append(" -> ").append(e.getValue()).append("\n"); + }); + LOG.info(builder.toString()); if (future.isSuccess()) { ChannelPipeline p = ch.pipeline(); p.remove(SaslChallengeDecoder.class); + LOG.info("removed SaslChallengeDecoder..."); p.remove(NettyHBaseSaslRpcClientHandler.class); - + LOG.info("removed NettyHBaseSaslRpcClientHandler..."); // check if negotiate with server for connection header is necessary if (saslHandler.isNeedProcessConnectionHeader()) { Promise connectionHeaderPromise = ch.eventLoop().newPromise(); @@ -229,14 +267,17 @@ class NettyRpcConnection extends RpcConnection { // add ReadTimeoutHandler to deal with server doesn't response connection header // because of the different configuration in client side and server side - p.addFirst( - new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); +// p.addAfter("ssl_handler", "read_timeout_handler", +// new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); +// p.addAfter("read_timeout_handler", "header_handler", chHandler); + p.addFirst(new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); p.addLast(chHandler); connectionHeaderPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { ChannelPipeline p = ch.pipeline(); + LOG.info("Header processed, removing timeout and header handlers..."); p.remove(ReadTimeoutHandler.class); p.remove(NettyHBaseRpcConnectionHeaderHandler.class); // don't send connection header, NettyHbaseRpcConnectionHeaderHandler @@ -263,25 +304,25 @@ class NettyRpcConnection extends RpcConnection { }); } - private void connect() { + private void connect() throws IOException { assert eventLoop.inEventLoop(); - LOG.trace("Connecting to {}", remoteId.address); + LOG.info("Connecting to {}, with group: {} and channel type: {}", remoteId.address, eventLoop, rpcClient.channelClass); - this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) - .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) - .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) - .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) - .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { +// try { + this.handler = (new NettyRpcClientSslHandler.Builder()) + .setConnectionId(remoteId) + .setHandshakeListener(new GenericFutureListener>() { @Override - public void operationComplete(ChannelFuture future) throws Exception { - Channel ch = future.channel(); + public void operationComplete(Future future) throws Exception { + LOG.info("I believe handshake is now complete. I'm on thread {}", Thread.currentThread().getName()); + Channel ch = future.get(); if (!future.isSuccess()) { failInit(ch, toIOE(future.cause())); rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause()); return; } + LOG.info("Connected with SSL. Adding SASL layer..."); ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); if (useSasl) { saslNegotiate(ch); @@ -291,7 +332,48 @@ class NettyRpcConnection extends RpcConnection { established(ch); } } - }).channel(); + }) + .build(); + + this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) + .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) + .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) + .handler(handler).localAddress(rpcClient.localAddr) + // .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) + .remoteAddress(remoteId.address).connect().channel(); +// this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) +// .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) +// .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) +// .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) +// .handler((new NettyRpcClientSslHandler.Builder()).setConnectionId(remoteId).build()).localAddress(rpcClient.localAddr) +// .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { +// @Override +// public void operationComplete(ChannelFuture future) throws Exception { +// LOG.info("I believe handshake is now complete. I'm on thread {}", Thread.currentThread().getName()); +// Channel ch = future.channel(); +// if (!future.isSuccess()) { +// failInit(ch, toIOE(future.cause())); +// rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause()); +// return; +// } +// LOG.info("Connected with SSL. Adding SASL layer..."); +// ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); +// if (useSasl) { +// saslNegotiate(ch); +// } else { +// // send the connection header to server +// ch.write(connectionHeaderWithLength.retainedDuplicate()); +// established(ch); +// } +// } +// }).channel(); + + LOG.info("Added SslHandler and called connect on thread {}", Thread.currentThread().getName()); +// } catch (InterruptedException e) { +// LOG.warn("Logging to see if connection can still go on...", e); +// } + } private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException { @@ -316,21 +398,58 @@ class NettyRpcConnection extends RpcConnection { setCancelled(call); } else { if (channel == null) { + LOG.info("calling connect..."); connect(); + LOG.info("waiting for isConnectionStablished()"); + handler.setPostEstablishmentAction( c -> { + LOG.info("x{}", c); + scheduleTimeoutTask(c); + channel.writeAndFlush(c).addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // Fail the call if we failed to write it out. This usually because the channel is + // closed. This is needed because we may shutdown the channel inside event loop and + // there may still be some pending calls in the event loop queue after us. + if (!future.isSuccess()) { + c.setException(toIOE(future.cause())); + } + } + }); + }, call); + } else { + LOG.info("channel not null. Connetion established? {}", handler.isConnectionStablished()); + scheduleTimeoutTask(call); + channel.writeAndFlush(call).addListener(new ChannelFutureListener() { + // + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // Fail the call if we failed to write it out. This usually because the channel is + // closed. This is needed because we may shutdown the channel inside event loop and + // there may still be some pending calls in the event loop queue after us. + if (!future.isSuccess()) { + call.setException(toIOE(future.cause())); + } + } + }); + } - scheduleTimeoutTask(call); - channel.writeAndFlush(call).addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // Fail the call if we failed to write it out. This usually because the channel is - // closed. This is needed because we may shutdown the channel inside event loop and - // there may still be some pending calls in the event loop queue after us. - if (!future.isSuccess()) { - call.setException(toIOE(future.cause())); - } - } - }); + +// } + +// scheduleTimeoutTask(call); +// channel.writeAndFlush(call).addListener(new ChannelFutureListener() { +// +// @Override +// public void operationComplete(ChannelFuture future) throws Exception { +// // Fail the call if we failed to write it out. This usually because the channel is +// // closed. This is needed because we may shutdown the channel inside event loop and +// // there may still be some pending calls in the event loop queue after us. +// if (!future.isSuccess()) { +// call.setException(toIOE(future.cause())); +// } +// } +// }); } } }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java index a5b980350d1..237e2212cec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java @@ -54,9 +54,12 @@ public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { return; } // add wrap and unwrap handlers to pipeline. - p.addFirst(new SaslWrapHandler(saslClient), - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), - new SaslUnwrapHandler(saslClient)); +// p.addFirst(new SaslWrapHandler(saslClient), +// new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), +// new SaslUnwrapHandler(saslClient)); + p.addAfter("ssl_handler", "sasl_wrap", new SaslWrapHandler(saslClient)); + p.addAfter("sasl_wrap", "lengh_frame_decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + p.addAfter("lengh_frame_decoder","sasl_unwrap", new SaslUnwrapHandler(saslClient)); } public String getSaslQOP() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java index cbbcb0e7761..cf93dbdedae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java @@ -27,6 +27,8 @@ import java.util.List; import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.ipc.RemoteException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Decode the sasl challenge sent by RpcServer. @@ -36,6 +38,8 @@ public class SaslChallengeDecoder extends ByteToMessageDecoder { private static final int MAX_CHALLENGE_SIZE = 1024 * 1024; // 1M + private static final Logger LOG = LoggerFactory.getLogger(SaslChallengeDecoder.class); + private ByteBuf tryDecodeChallenge(ByteBuf in, int offset, int readableBytes) throws IOException { if (readableBytes < 4) { return null; @@ -48,7 +52,7 @@ public class SaslChallengeDecoder extends ByteToMessageDecoder { } if (len > MAX_CHALLENGE_SIZE) { throw new IOException( - "Sasl challenge too large(" + len + "), max allowed is " + MAX_CHALLENGE_SIZE); + "Sasl challenge too large(" + len + "), max allowed is " + MAX_CHALLENGE_SIZE); } int totalLen = 4 + len; if (readableBytes < totalLen) { @@ -101,12 +105,16 @@ public class SaslChallengeDecoder extends ByteToMessageDecoder { int offset = in.readerIndex(); int status = in.getInt(offset); if (status == SaslStatus.SUCCESS.state) { + LOG.info("decoding challenge after SaslStatus.SUCCESS."); ByteBuf challenge = tryDecodeChallenge(in, offset + 4, readableBytes - 4); + LOG.info("finished decoding challenge"); if (challenge != null) { out.add(challenge); } } else { + LOG.info("SASL error, trying to decode message"); tryDecodeError(in, offset + 4, readableBytes - 4); + LOG.info("finished decoding error..."); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java index 80b1288b970..c12ba3a651f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java @@ -61,6 +61,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + NettyRpcServer.LOG.info("Decoding message..."); if (requestTooBig) { handleTooBigRequest(in); return; @@ -103,6 +104,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder { // extract frame out.add(in.readRetainedSlice(frameLengthInt)); + NettyRpcServer.LOG.info("finished decoding..."); } private void handleTooBigRequest(ByteBuf in) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 798e5ee3f08..dad1209c4f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -32,6 +32,12 @@ import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandler; +import org.apache.hbase.thirdparty.io.netty.handler.codec.DelimiterBasedFrameDecoder; +import org.apache.hbase.thirdparty.io.netty.handler.codec.Delimiters; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; +import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; +import org.apache.hbase.thirdparty.io.netty.util.concurrent.GenericFutureListener; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +94,7 @@ public class NettyRpcServer extends RpcServer { NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig(); eventLoopGroup = config.group(); channelClass = config.serverChannelClass(); + LOG.info(">>> server instanceof HRegionServer -> true"); } else { int threadCount = server == null? EVENTLOOP_THREADCOUNT_DEFAULT: server.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY, @@ -95,11 +102,13 @@ public class NettyRpcServer extends RpcServer { eventLoopGroup = new NioEventLoopGroup(threadCount, new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY)); channelClass = NioServerSocketChannel.class; + } + LOG.info(">>> setting eventLoopGroup to {} and channelClass to {}", eventLoopGroup, channelClass); ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) - .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) - .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) - .childOption(ChannelOption.SO_REUSEADDR, true) +// .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) +// .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) +// .childOption(ChannelOption.SO_REUSEADDR, true) .childHandler(new ChannelInitializer() { @Override @@ -107,11 +116,26 @@ public class NettyRpcServer extends RpcServer { ChannelPipeline pipeline = ch.pipeline(); FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); preambleDecoder.setSingleDecode(true); - pipeline.addLast("preambleDecoder", preambleDecoder); - pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); - pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); - pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); - pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); +// pipeline.addFirst("SSL", NettyRpcSslHandler.Builder.build()); + SslHandler handler = NettyRpcSslHandlerFactory.getSslHandlerInstance(ch); + pipeline.addLast("ssl_handler", handler); + handler.handshakeFuture().addListener(new GenericFutureListener>() { + @Override public void operationComplete(Future future) + throws Exception { + LOG.info("waited for handshake complete before adding remaining handlers..."); + pipeline.addLast("preambleDecoder", preambleDecoder); + pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); + pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); + pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); + pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); + } + }); +// pipeline.addLast("preambleDecoder", preambleDecoder); +// pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); +// pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); +// pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); +// pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); + } }); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java index bac19f1bb11..b7b62236dfc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java @@ -27,6 +27,8 @@ import java.nio.ByteBuffer; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Handle connection preamble. @@ -35,6 +37,8 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler { + public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServerPreambleHandler.class); + private final NettyRpcServer rpcServer; public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer) { @@ -43,6 +47,7 @@ class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + LOG.info("reading from channel..."); NettyServerRpcConnection conn = createNettyServerRpcConnection(ctx.channel()); ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes()); msg.readBytes(buf); @@ -56,6 +61,7 @@ class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler ((NettyRpcServerRequestDecoder) p.get("decoder")).setConnection(conn); p.remove(this); p.remove("preambleDecoder"); + LOG.info("finished reading channel and removing decoders..."); } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java index 1e844bb02cb..2bc8081e2f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java @@ -52,10 +52,12 @@ class NettyRpcServerRequestDecoder extends ChannelInboundHandlerAdapter { NettyRpcServer.LOG.trace("Connection {}; # active connections={}", ctx.channel().remoteAddress(), (allChannels.size() - 1)); super.channelActive(ctx); + NettyRpcServer.LOG.trace("finished channel active for {}", ctx.channel().remoteAddress()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + NettyRpcServer.LOG.trace("Reading channel for request from {}", ctx.channel().remoteAddress()); ByteBuf input = (ByteBuf) msg; // 4 bytes length field metrics.receivedBytes(input.readableBytes() + 4); @@ -67,6 +69,11 @@ class NettyRpcServerRequestDecoder extends ChannelInboundHandlerAdapter { allChannels.remove(ctx.channel()); NettyRpcServer.LOG.trace("Disconnection {}; # active connections={}", ctx.channel().remoteAddress(), (allChannels.size() - 1)); + StringBuilder builder = new StringBuilder(); + for(StackTraceElement e : Thread.currentThread().getStackTrace()){ + builder.append(e.getClassName()).append(".").append(e.getMethodName()).append("(").append(e.getLineNumber()).append(")").append("\n"); + } + NettyRpcServer.LOG.info(builder.toString()); super.channelInactive(ctx); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java index 09589da1635..1ae453356ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java @@ -23,6 +23,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapte import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Encoder for {@link RpcResponse}. @@ -31,6 +33,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private class NettyRpcServerResponseEncoder extends ChannelOutboundHandlerAdapter { + private static final Logger LOG = LoggerFactory.getLogger(NettyRpcServerResponseEncoder.class); + private final MetricsHBaseServer metrics; NettyRpcServerResponseEncoder(MetricsHBaseServer metrics) { @@ -40,9 +44,16 @@ class NettyRpcServerResponseEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + StringBuilder builder = new StringBuilder(); + builder.append("Listing all pipeline handlers:\n"); + ctx.channel().pipeline().forEach( e -> { + builder.append(e.getKey()).append(" -> ").append(e.getValue()).append("\n"); + }); + LOG.info(builder.toString()); if (msg instanceof RpcResponse) { RpcResponse resp = (RpcResponse) msg; BufferChain buf = resp.getResponse(); + LOG.info("msg instanceof RpcResponse is true"); ctx.write(Unpooled.wrappedBuffer(buf.getBuffers()), promise).addListener(f -> { resp.done(); if (f.isSuccess()) { @@ -50,6 +61,7 @@ class NettyRpcServerResponseEncoder extends ChannelOutboundHandlerAdapter { } }); } else { + LOG.info("msg instanceof RpcResponse is false"); ctx.write(msg, promise); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcSslHandlerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcSslHandlerFactory.java new file mode 100644 index 00000000000..8c2297d1e62 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcSslHandlerFactory.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hbase.ipc; + + +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandler; +import org.apache.hbase.thirdparty.io.netty.channel.socket.SocketChannel; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContextBuilder; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; +import org.apache.hbase.thirdparty.io.netty.handler.ssl.util.SelfSignedCertificate; +import org.apache.yetus.audience.InterfaceAudience; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManagerFactory; +import java.io.File; +import java.io.FileInputStream; +import java.security.KeyStore; + +@InterfaceAudience.Private +public class NettyRpcSslHandlerFactory { + + public static SslHandler getSslHandlerInstance(Channel ch) throws Exception { + + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(new FileInputStream(new File("/etc/hbase/conf/hbase.keystore")), "changeme".toCharArray()); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, "changeme".toCharArray()); + + SslContext sslCtx = SslContextBuilder.forServer(kmf).build(); + + return sslCtx.newHandler(ch.alloc()); + } +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java index ffa16bfb468..d93ab04e61d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java @@ -34,6 +34,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * RpcConnection implementation for netty rpc server. @@ -44,6 +46,8 @@ class NettyServerRpcConnection extends ServerRpcConnection { final Channel channel; + public static final Logger LOG = LoggerFactory.getLogger(NettyServerRpcConnection.class); + NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) { super(rpcServer); this.channel = channel; @@ -127,6 +131,12 @@ class NettyServerRpcConnection extends ServerRpcConnection { @Override protected void doRespond(RpcResponse resp) { + StringBuilder builder = new StringBuilder(); + builder.append("Listing all pipeline handlers:\n"); + channel.pipeline().forEach( e -> { + builder.append(e.getKey()).append(" -> ").append(e.getValue()).append("\n"); + }); + LOG.info(builder.toString()); channel.writeAndFlush(resp); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index bb638af036d..63b5f794604 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -723,6 +723,7 @@ abstract class ServerRpcConnection implements Closeable { } protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException { + SimpleRpcServer.LOG.info("Processing preamble..."); assert preambleBuffer.remaining() == 6; for (int i = 0; i < RPC_HEADER.length; i++) { if (RPC_HEADER[i] != preambleBuffer.get()) { -- 2.30.1