diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index 82e17a7..8f805d0 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -781,7 +781,6 @@
io.netty
netty
${netty.hadoop.version}
- test
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
index 42b500f..1307518 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -161,7 +161,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue {
*/
private boolean needToDrop(CallRunner callRunner) {
long now = EnvironmentEdgeManager.currentTime();
- long callDelay = now - callRunner.getCall().timestamp;
+ long callDelay = now - callRunner.getCall().getTimestamp();
long localMinDelay = this.minDelay;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
index 1a8fa5b..e9ec836 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
@@ -34,6 +34,7 @@ class BufferChain {
private final ByteBuffer[] buffers;
private int remaining = 0;
private int bufferOffset = 0;
+ private int size;
BufferChain(ByteBuffer ... buffers) {
// Some of the incoming buffers can be null
@@ -43,6 +44,7 @@ class BufferChain {
bbs.add(b);
this.remaining += b.remaining();
}
+ this.size = remaining;
this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]);
}
@@ -114,4 +116,12 @@ class BufferChain {
}
}
}
+
+ int size() {
+ return size;
+ }
+
+ ByteBuffer[] getBuffers() {
+ return this.buffers;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index e91699a..24969e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;
@@ -49,7 +48,7 @@ public class CallRunner {
private static final CallDroppedException CALL_DROPPED_EXCEPTION
= new CallDroppedException();
- private Call call;
+ private ServerCall call;
private RpcServerInterface rpcServer;
private MonitoredRPCHandler status;
private volatile boolean sucessful;
@@ -60,7 +59,7 @@ public class CallRunner {
* time we occupy heap.
*/
// The constructor is shutdown so only RpcServer in this class can make one of these.
- CallRunner(final RpcServerInterface rpcServer, final Call call) {
+ CallRunner(final RpcServerInterface rpcServer, final ServerCall call) {
this.call = call;
this.rpcServer = rpcServer;
// Add size of the call to queue size.
@@ -69,7 +68,7 @@ public class CallRunner {
}
}
- public Call getCall() {
+ public ServerCall getCall() {
return call;
}
@@ -87,16 +86,16 @@ public class CallRunner {
public void run() {
try {
- if (!call.connection.channel.isOpen()) {
+ if (!call.isConnectionOpen()) {
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
}
return;
}
this.status.setStatus("Setting up call");
- this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
+ this.status.setConnection(call.getHostAddress(), call.getRemotePort());
if (RpcServer.LOG.isTraceEnabled()) {
- UserGroupInformation remoteUser = call.connection.ugi;
+ UserGroupInformation remoteUser = call.getUser();
RpcServer.LOG.trace(call.toShortString() + " executing as " +
((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
}
@@ -111,12 +110,13 @@ public class CallRunner {
throw new ServerNotRunningYetException("Server " +
(address != null ? address : "(channel closed)") + " is not running yet");
}
- if (call.tinfo != null) {
- traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
+ if (call.getTinfo() != null) {
+ traceScope = Trace.startSpan(call.toTraceString(), call.getTinfo());
}
// make the call
- resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
- call.timestamp, this.status, call.timeout);
+ resultPair = this.rpcServer.call(call.getService(),
+ call.getMethodDescriptor(), call.getParam(), call.getCellScanner(),
+ call.getTimestamp(), this.status, call.getTimeout());
} catch (Throwable e) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
@@ -173,7 +173,7 @@ public class CallRunner {
*/
public void drop() {
try {
- if (!call.connection.channel.isOpen()) {
+ if (!call.isConnectionOpen()) {
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Netty4RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Netty4RpcServer.java
new file mode 100644
index 0000000..4d51935
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Netty4RpcServer.java
@@ -0,0 +1,1714 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.DecoderException;
+import io.netty.handler.codec.ReplayingDecoder;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.internal.RecyclableArrayList;
+import io.netty.util.internal.StringUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.ipc.RpcServer.Responder;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.SaslStatus;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Counter;
+import org.apache.hadoop.hbase.util.JVM;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.htrace.TraceInfo;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
+
+public class Netty4RpcServer implements RpcServerInterface,
+ ConfigurationObserver {
+ public static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION = new CallQueueTooBigException();
+ private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."
+ + Server.class.getName());
+ public static final byte CURRENT_VERSION = 0;
+ public static final Log LOG = LogFactory.getLog(Netty4RpcServer.class);
+
+ private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
+ private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
+ /**
+ * Minimum allowable timeout (in milliseconds) in rpc request's header. This
+ * configuration exists to prevent the rpc service regarding this request as
+ * timeout immediately.
+ */
+ private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
+ private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
+
+ /**
+ * The maximum size that we can hold in the RPC queue
+ */
+ private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
+ /** Default value for above params */
+ private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
+ private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final Server server;
+ private final String name;
+ private final List services;
+ protected final InetSocketAddress bindAddress;
+ protected final Configuration conf;
+ private final RpcScheduler scheduler;
+ private final boolean authorize;
+ private boolean isSecurityEnabled;
+
+ private final CountDownLatch closed = new CountDownLatch(1);
+ private Channel serverChannel;
+ private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);;
+
+ volatile boolean started = false;
+ protected HBaseRPCErrorHandler errorHandler = null;
+ protected MetricsHBaseServer metrics;
+ protected final Counter callQueueSize = new Counter();
+ private int maxQueueSize;
+
+ private final int warnResponseTime;
+ private final int warnResponseSize;
+
+ private int slowCallLimit;
+ private long incrementPeriod;
+ private AtomicLong totalSlowCalls;
+ private AtomicLong totalCalls;
+ private UserProvider userProvider;
+ private final IPCUtil ipcUtil;
+ private final BoundedByteBufferPool reservoir;
+ private final int minClientRequestTimeout;
+
+ public Netty4RpcServer(final Server server, final String name,
+ final List services,
+ final InetSocketAddress bindAddress, Configuration conf,
+ final RpcScheduler scheduler) throws IOException {
+ if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
+ this.reservoir = new BoundedByteBufferPool(conf.getInt(
+ "hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
+ conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size",
+ 16 * 1024),
+ // Make the max twice the number of handlers to be safe.
+ conf.getInt("hbase.ipc.server.reservoir.initial.max", conf.getInt(
+ HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2),
+ // By default make direct byte buffers from the buffer pool.
+ conf.getBoolean("hbase.ipc.server.reservoir.direct.buffer", true));
+ } else {
+ reservoir = null;
+ }
+ this.server = server;
+ this.name = name;
+ this.services = services;
+ this.bindAddress = bindAddress;
+ this.conf = conf;
+ this.metrics = new MetricsHBaseServer(name,
+ new Netty4RpcServer.MetricsHBaseServerWrapperImpl(this));
+
+ boolean useEpoll = useEpoll(conf);
+ LOG.info("useEpoll: " + useEpoll);
+ EventLoopGroup bossGroup = null;
+ EventLoopGroup workerGroup = null;
+ if (useEpoll) {
+ bossGroup = new EpollEventLoopGroup();
+ workerGroup = new EpollEventLoopGroup();
+ } else {
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
+ }
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(bossGroup, workerGroup);
+ if (useEpoll) {
+ bootstrap.channel(EpollServerSocketChannel.class);
+ } else {
+ bootstrap.channel(NioServerSocketChannel.class);
+ }
+ //bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
+ bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.childOption(ChannelOption.SO_LINGER, 0);
+ //bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ // bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+ // bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ bootstrap.childHandler(new Initializer());
+
+ try {
+ serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
+ LOG.info("Netty4RpcServer bind to: " + serverChannel.localAddress());
+ allChannels.add(serverChannel);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ this.maxQueueSize = this.conf.getInt("hbase.ipc.server.max.callqueue.size",
+ DEFAULT_MAX_CALLQUEUE_SIZE);
+ this.warnResponseTime = this.conf.getInt(WARN_RESPONSE_TIME,
+ DEFAULT_WARN_RESPONSE_TIME);
+ this.warnResponseSize = this.conf.getInt(WARN_RESPONSE_SIZE,
+ DEFAULT_WARN_RESPONSE_SIZE);
+ this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
+ DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
+ // mark a call slow when its total time is longer than 10 secs
+ this.slowCallLimit = this.conf.getInt("hbase.ipc.server.slow.call.limit",
+ 10 * 1000);
+ this.incrementPeriod = this.conf.getLong(
+ "hbase.ipc.server.metrics.inc.period", 60 * 60 * 1000L);
+ this.totalSlowCalls = new AtomicLong(0);
+ this.totalCalls = new AtomicLong(0);
+ this.ipcUtil = new IPCUtil(this.conf);
+
+ this.authorize = this.conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
+ this.userProvider = UserProvider.instantiate(this.conf);
+ this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
+ if (isSecurityEnabled) {
+ HBaseSaslRpcServer.init(this.conf);
+ }
+ this.scheduler = scheduler;
+ this.scheduler.init(new RpcSchedulerContext(this));
+ }
+
+ private boolean useEpoll(Configuration conf) {
+ // Config to enable native transport.
+ boolean epollEnabled = conf.getBoolean("hbase.rpc.server.nativetransport",
+ false);
+ // Use the faster native epoll transport mechanism on linux if enabled
+ if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void start() {
+ if (started) {
+ return;
+ }
+ scheduler.start();
+ started = true;
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stopping server on " + this.bindAddress.getPort());
+ allChannels.close().awaitUninterruptibly();
+ serverChannel.close();
+ scheduler.stop();
+ closed.countDown();
+ }
+
+ @Override
+ public void join() throws InterruptedException {
+ closed.await();
+ }
+
+ @Override
+ public InetSocketAddress getListenerAddress() {
+ return ((InetSocketAddress) serverChannel.localAddress());
+ }
+
+ private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
+ throws IOException {
+ if (response != null) response.reset();
+ call.setResponse(null, null, t, error);
+ }
+
+ class Connection {
+ // If the connection header has been read or not.
+ private boolean connectionHeaderRead = false;
+ ConnectionHeader connectionHeader;
+ /**
+ * Codec the client asked use.
+ */
+ private Codec codec;
+ /**
+ * Compression codec the client asked us use.
+ */
+ private CompressionCodec compressionCodec;
+ BlockingService service;
+ protected UserGroupInformation user = null;
+ private AuthMethod authMethod;
+ private boolean skipInitialSaslHandshake;
+ private ByteBuffer dataLengthBuffer = null;
+ private ByteBuffer data;
+ // Fake 'call' for failed authorization response
+ private static final int AUTHORIZATION_FAILED_CALLID = -1;
+ private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
+ null, null, null, null, null, this, null, 0, null, null, 0);
+ private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
+ // Fake 'call' for SASL context setup
+ private static final int SASL_CALLID = -33;
+ boolean useSasl;
+ private final Call saslCall = new Call(SASL_CALLID, this.service, null,
+ null, null, null, this, null, 0, null, null, 0);
+ // Cache the remote host & port info so that even if the socket is
+ // disconnected, we can say where it used to connect to.
+ protected String hostAddress;
+ protected int remotePort;
+ private boolean useWrap = false;
+ protected Channel channel;
+
+ Connection(Channel channel) {
+ super();
+ this.channel = channel;
+ InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel
+ .remoteAddress());
+ InetAddress addr = inetSocketAddress.getAddress();
+ if (addr == null) {
+ this.hostAddress = "*Unknown*";
+ } else {
+ this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
+ }
+ this.remotePort = inetSocketAddress.getPort();
+ }
+
+ // Reads the connection header following version
+ void processConnectionHeader(ByteBuffer data) throws IOException {
+ this.connectionHeader = ConnectionHeader
+ .parseFrom(new ByteBufferInputStream(data));
+ String serviceName = connectionHeader.getServiceName();
+ if (serviceName == null) {
+ throw new EmptyServiceNameException();
+ }
+ this.service = getService(services, serviceName);
+ if (this.service == null) {
+ throw new UnknownServiceException(serviceName);
+ }
+ setupCellBlockCodecs(this.connectionHeader);
+ UserGroupInformation protocolUser = createUser(connectionHeader);
+ if (!useSasl) {
+ user = protocolUser;
+ if (user != null) {
+ user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
+ }
+ } else {
+ // user is authenticated
+ user.setAuthenticationMethod(authMethod.authenticationMethod);
+ // Now we check if this is a proxy user case. If the protocol
+ // user is
+ // different from the 'user', it is a proxy user scenario.
+ // However,
+ // this is not allowed if user authenticated with DIGEST.
+ if ((protocolUser != null)
+ && (!protocolUser.getUserName().equals(user.getUserName()))) {
+ if (authMethod == AuthMethod.DIGEST) {
+ // Not allowed to doAs if token authentication is used
+ throw new AccessDeniedException("Authenticated user (" + user
+ + ") doesn't match what the client claims to be ("
+ + protocolUser + ")");
+ } else {
+ // Effective user can be different from authenticated
+ // user
+ // for simple auth or kerberos auth
+ // The user is the real user. Now we create a proxy user
+ UserGroupInformation realUser = user;
+ user = UserGroupInformation.createProxyUser(
+ protocolUser.getUserName(), realUser);
+ // Now the user is a proxy user, set Authentication
+ // method Proxy.
+ user.setAuthenticationMethod(AuthenticationMethod.PROXY);
+ }
+ }
+ }
+ if (connectionHeader.hasVersionInfo()) {
+ AUDITLOG.info("Connection from " + this.hostAddress + " port: "
+ + this.remotePort + " with version info: "
+ + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
+ } else {
+ AUDITLOG.info("Connection from " + this.hostAddress + " port: "
+ + this.remotePort + " with unknown version info");
+ }
+ }
+
+ private UserGroupInformation createUser(ConnectionHeader head) {
+ UserGroupInformation ugi = null;
+
+ if (!head.hasUserInfo()) {
+ return null;
+ }
+ UserInformation userInfoProto = head.getUserInfo();
+ String effectiveUser = null;
+ if (userInfoProto.hasEffectiveUser()) {
+ effectiveUser = userInfoProto.getEffectiveUser();
+ }
+ String realUser = null;
+ if (userInfoProto.hasRealUser()) {
+ realUser = userInfoProto.getRealUser();
+ }
+ if (effectiveUser != null) {
+ if (realUser != null) {
+ UserGroupInformation realUserUgi = UserGroupInformation
+ .createRemoteUser(realUser);
+ ugi = UserGroupInformation
+ .createProxyUser(effectiveUser, realUserUgi);
+ } else {
+ ugi = UserGroupInformation.createRemoteUser(effectiveUser);
+ }
+ }
+ return ugi;
+ }
+
+ /**
+ * Set up cell block codecs
+ *
+ * @throws FatalConnectionException
+ */
+ private void setupCellBlockCodecs(final ConnectionHeader header)
+ throws FatalConnectionException {
+ // TODO: Plug in other supported decoders.
+ if (!header.hasCellBlockCodecClass())
+ return;
+ String className = header.getCellBlockCodecClass();
+ if (className == null || className.length() == 0)
+ return;
+ try {
+ this.codec = (Codec) Class.forName(className).newInstance();
+ } catch (Exception e) {
+ throw new UnsupportedCellCodecException(className, e);
+ }
+ if (!header.hasCellBlockCompressorClass())
+ return;
+ className = header.getCellBlockCompressorClass();
+ try {
+ this.compressionCodec = (CompressionCodec) Class.forName(className)
+ .newInstance();
+ } catch (Exception e) {
+ throw new UnsupportedCompressionCodecException(className, e);
+ }
+ }
+
+ void readPreamble(ByteBuf buffer) throws IOException {
+ byte[] rpcHead = { buffer.readByte(), buffer.readByte(),
+ buffer.readByte(), buffer.readByte() };
+ if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
+ doBadPreambleHandling("Expected HEADER="
+ + Bytes.toStringBinary(HConstants.RPC_HEADER)
+ + " but received HEADER=" + Bytes.toStringBinary(rpcHead)
+ + " from " + toString());
+ return;
+ }
+ // Now read the next two bytes, the version and the auth to use.
+ int version = buffer.readByte();
+ byte authbyte = buffer.readByte();
+ this.authMethod = AuthMethod.valueOf(authbyte);
+ if (version != CURRENT_VERSION) {
+ String msg = getFatalConnectionString(version, authbyte);
+ doBadPreambleHandling(msg, new WrongVersionException(msg));
+ return;
+ }
+ if (authMethod == null) {
+ String msg = getFatalConnectionString(version, authbyte);
+ doBadPreambleHandling(msg, new BadAuthException(msg));
+ return;
+ }
+ if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+ AccessDeniedException ae = new AccessDeniedException(
+ "Authentication is required");
+ setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
+ authFailedCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
+ return;
+ }
+ if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+ doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
+ SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
+ authMethod = AuthMethod.SIMPLE;
+ // client has already sent the initial Sasl message and we
+ // should ignore it. Both client and server should fall back
+ // to simple auth from now on.
+ skipInitialSaslHandshake = true;
+ }
+ if (authMethod != AuthMethod.SIMPLE) {
+ useSasl = true;
+ }
+ }
+
+ private String getFatalConnectionString(final int version,
+ final byte authByte) {
+ return "serverVersion=" + CURRENT_VERSION + ", clientVersion=" + version
+ + ", authMethod=" + authByte + ", authSupported="
+ + (authMethod != null) + " from " + toString();
+ }
+
+ private void doRawSaslReply(SaslStatus status, Writable rv, String errorClass, String error)
+ throws IOException {
+ ByteBufferOutputStream saslResponse = null;
+ DataOutputStream out = null;
+ try {
+ // In my testing, have noticed that sasl messages are usually
+ // in the ballpark of 100-200. That's why the initial capacity
+ // is 256.
+ saslResponse = new ByteBufferOutputStream(256);
+ out = new DataOutputStream(saslResponse);
+ out.writeInt(status.state); // write status
+ if (status == SaslStatus.SUCCESS) {
+ rv.write(out);
+ } else {
+ WritableUtils.writeString(out, errorClass);
+ WritableUtils.writeString(out, error);
+ }
+ saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
+ saslCall.sendResponseIfReady();
+ } finally {
+ if (saslResponse != null) {
+ saslResponse.close();
+ }
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ private void doBadPreambleHandling(final String msg) throws IOException {
+ doBadPreambleHandling(msg, new FatalConnectionException(msg));
+ }
+
+ private void doBadPreambleHandling(final String msg, final Exception e)
+ throws IOException {
+ LOG.warn(msg);
+ Call fakeCall = new Call(-1, null, null, null, null, null, this, null,
+ -1, null, null, 0);
+ setupResponse(null, fakeCall, e, msg);
+ // closes out the connection.
+ fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
+ }
+
+ Object processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+ long totalRequestSize = buf.limit();
+ int offset = 0;
+ // Here we read in the header. We avoid having pb
+ // do its default 4k allocation for CodedInputStream. We force it to
+ // use backing array.
+ CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
+ int headerSize = cis.readRawVarint32();
+ offset = cis.getTotalBytesRead();
+ Message.Builder builder = RequestHeader.newBuilder();
+ ProtobufUtil.mergeFrom(builder, cis, headerSize);
+ RequestHeader header = (RequestHeader) builder.build();
+ offset += headerSize;
+ int id = header.getCallId();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) + " totalRequestSize: "
+ + totalRequestSize + " bytes");
+ }
+ // Enforcing the call queue size, this triggers a retry in the
+ // client
+ // This is a bit late to be doing this check - we have already read
+ // in the total request.
+ if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
+ final Call callTooBig = new Call(id, this.service, null, null, null,
+ null, this, null, totalRequestSize, null, null, 0);
+ ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+ metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+ setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
+ "Call queue is full on " + getListenerAddress()
+ + ", is hbase.ipc.server.max.callqueue.size too small?");
+ callTooBig.sendResponseIfReady();
+ return null;
+ }
+ MethodDescriptor md = null;
+ Message param = null;
+ CellScanner cellScanner = null;
+ try {
+ if (header.hasRequestParam() && header.getRequestParam()) {
+ md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
+ if (md == null) throw new UnsupportedOperationException(header.getMethodName());
+ builder = this.service.getRequestPrototype(md).newBuilderForType();
+ cis.resetSizeCounter();
+ int paramSize = cis.readRawVarint32();
+ offset += cis.getTotalBytesRead();
+ if (builder != null) {
+ ProtobufUtil.mergeFrom(builder, cis, paramSize);
+ param = builder.build();
+ }
+ offset += paramSize;
+ }
+ if (header.hasCellBlockMeta()) {
+ buf.position(offset);
+ cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf);
+ }
+ } catch (Throwable t) {
+ String msg =
+ getListenerAddress() + " is unable to read call parameter from client "
+ + getHostAddress();
+ LOG.warn(msg, t);
+
+ metrics.exception(t);
+
+ // probably the hbase hadoop version does not match the running
+ // hadoop version
+ if (t instanceof LinkageError) {
+ t = new DoNotRetryIOException(t);
+ }
+ // If the method is not present on the server, do not retry.
+ if (t instanceof UnsupportedOperationException) {
+ t = new DoNotRetryIOException(t);
+ }
+
+ final Call readParamsFailedCall = new Call(id, this.service, null,
+ null, null, null, this, null, totalRequestSize, null, null, 0);
+ ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+ setupResponse(responseBuffer, readParamsFailedCall, t, msg + "; " + t.getMessage());
+ readParamsFailedCall.sendResponseIfReady();
+ return null;
+ }
+
+ TraceInfo traceInfo =
+ header.hasTraceInfo() ? new TraceInfo(header.getTraceInfo().getTraceId(), header
+ .getTraceInfo().getParentId()) : null;
+ int timeout = 0;
+ if (header.hasTimeout()) {
+ timeout = Math.max(minClientRequestTimeout, header.getTimeout());
+ }
+ Call call = new Call(id, this.service, md, header, param, cellScanner,
+ this, null, totalRequestSize, traceInfo, RpcServer.getRemoteIp(),
+ timeout);
+// if (!scheduler.dispatch(new CallRunner(Netty4RpcServer.this, call))) {
+// callQueueSize.add(-1 * call.getSize());
+//
+// ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+// metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+// InetSocketAddress address = getListenerAddress();
+// setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION, "Call queue is full on "
+// + (address != null ? address : "(channel closed)") + ", too many items queued ?");
+// call.sendResponseIfReady();
+// }
+ return new CallRunner(Netty4RpcServer.this, call);
+ }
+
+ private Object process(ByteBuffer buf) throws IOException,
+ InterruptedException {
+ if (connectionHeaderRead) {
+ return processRequest(buf);
+ } else {
+ processConnectionHeader(buf);
+ this.connectionHeaderRead = true;
+ return null;
+ }
+ }
+
+ public void close() {
+ // disposeSasl();
+ }
+
+ public boolean isConnectionOpen() {
+ return channel.isOpen();
+ }
+
+ public String getHostAddress() {
+ return hostAddress;
+ }
+
+ public int getRemotePort() {
+ return remotePort;
+ }
+
+ VersionInfo getVersionInfo() {
+ if (connectionHeader.hasVersionInfo()) {
+ return connectionHeader.getVersionInfo();
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Datastructure that holds all necessary to a method invocation and then afterward, carries the
+ * result.
+ */
+ public class Call implements ServerCall {
+
+ protected int id; // the client's call id
+ protected BlockingService service;
+ protected MethodDescriptor md;
+ protected RequestHeader header;
+ protected Message param; // the parameter passed
+ // Optional cell data passed outside of protobufs.
+ protected CellScanner cellScanner;
+ protected Connection connection; // connection to client
+ protected long timestamp; // the time received when response is null
+ // the time served when response is not null
+
+ protected int timeout;
+ /**
+ * Chain of buffers to send as response.
+ */
+ protected BufferChain response;
+ protected boolean delayResponse;
+ protected Responder responder;
+ protected boolean delayReturnValue; // if the return value should be
+ // set at call completion
+ protected long size; // size of current call
+ protected boolean isError;
+ protected TraceInfo tinfo;
+ private ByteBuffer cellBlock = null;
+
+ private User user;
+ private InetAddress remoteAddress;
+
+ // set at call completion
+ ByteBuf responseBB = null;
+
+ Call(int id, final BlockingService service, final MethodDescriptor md,
+ RequestHeader header, Message param, CellScanner cellScanner,
+ Connection connection, Responder responder, long size, TraceInfo tinfo,
+ final InetAddress remoteAddress, int timeout) {
+ this.id = id;
+ this.service = service;
+ this.md = md;
+ this.header = header;
+ this.param = param;
+ this.cellScanner = cellScanner;
+ this.connection = connection;
+ this.timestamp = System.currentTimeMillis();
+ this.response = null;
+ this.delayResponse = false;
+ this.responder = responder;
+ this.isError = false;
+ this.size = size;
+ this.tinfo = tinfo;
+ this.user = connection.user == null ? null : userProvider
+ .create(connection.user);
+ this.remoteAddress = remoteAddress;
+ this.timeout = timeout;
+ }
+
+ /**
+ * Call is done. Execution happened and we returned results to client. It is now safe to
+ * cleanup.
+ */
+ void done() {
+ }
+
+ protected synchronized void setSaslTokenResponse(ByteBuffer response) {
+ this.response = new BufferChain(response);
+ }
+
+ public synchronized void setResponse(Object m, final CellScanner cells, Throwable t,
+ String errorMsg) {
+ if (this.isError) return;
+ if (t != null) this.isError = true;
+ BufferChain bc = null;
+ try {
+ ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
+ // Presume it a pb Message. Could be null.
+ Message result = (Message) m;
+ // Call id.
+ headerBuilder.setCallId(this.id);
+ if (t != null) {
+ ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
+ exceptionBuilder.setExceptionClassName(t.getClass().getName());
+ exceptionBuilder.setStackTrace(errorMsg);
+ exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
+ if (t instanceof RegionMovedException) {
+ // Special casing for this exception. This is only one
+ // carrying a payload.
+ // Do this instead of build a generic system for
+ // allowing exceptions carry
+ // any kind of payload.
+ RegionMovedException rme = (RegionMovedException) t;
+ exceptionBuilder.setHostname(rme.getHostname());
+ exceptionBuilder.setPort(rme.getPort());
+ }
+ // Set the exception as the result of the method invocation.
+ headerBuilder.setException(exceptionBuilder.build());
+ }
+ // Pass reservoir to buildCellBlock. Keep reference to returne
+ // so can add it back to the
+ // reservoir when finished. This is hacky and the hack is not
+ // contained but benefits are
+ // high when we can avoid a big buffer allocation on each rpc.
+ this.cellBlock =
+ ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells,
+ reservoir);
+ if (this.cellBlock != null) {
+ CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
+ // Presumes the cellBlock bytebuffer has been flipped so
+ // limit has total size in it.
+ cellBlockBuilder.setLength(this.cellBlock.limit());
+ headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
+ }
+ Message header = headerBuilder.build();
+
+ // Organize the response as a set of bytebuffers rather than
+ // collect it all together inside
+ // one big byte array; save on allocations.
+ ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
+ ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
+ int totalSize =
+ bbHeader.capacity() + (bbResult == null ? 0 : bbResult.limit())
+ + (this.cellBlock == null ? 0 : this.cellBlock.limit());
+ ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
+ bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
+ if (connection.useWrap) {
+ bc = wrapWithSasl(bc);
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception while creating response " + e);
+ }
+ this.response = bc;
+ responseBB = UnpooledByteBufAllocator.DEFAULT.buffer(this.response.size());
+ ByteBuffer[] buffers = this.response.getBuffers();
+ for (ByteBuffer bb : buffers) {
+ responseBB.writeBytes(bb);
+ }
+ }
+
+ private BufferChain wrapWithSasl(BufferChain bc) throws IOException {
+ return null;
+ }
+
+ @Override
+ public long disconnectSince() {
+ return -1L;
+ }
+
+ Connection getConnection() {
+ return (Connection) this.connection;
+ }
+
+ /**
+ * If we have a response, and delay is not set, then respond immediately. Otherwise, do not
+ * respond to client. This is called by the RPC code in the context of the Handler thread.
+ */
+ public synchronized void sendResponseIfReady() throws IOException {
+ getConnection().channel.writeAndFlush(this);
+ }
+
+ public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
+ getConnection().channel.writeAndFlush(this).addListener(listener);
+ }
+
+ @Override
+ public InetAddress getInetAddress() {
+ return ((InetSocketAddress) getConnection().channel.remoteAddress()).getAddress();
+ }
+
+ @Override
+ public boolean isClientCellBlockSupported() {
+ return this.connection != null && this.connection.codec != null;
+ }
+
+ @Override
+ public User getRequestUser() {
+ return user;
+ }
+
+ @Override
+ public String getRequestUserName() {
+ User user = getRequestUser();
+ return user == null ? null : user.getShortName();
+ }
+
+ @Override
+ public InetAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ @Override
+ public VersionInfo getClientVersionInfo() {
+ return connection.getVersionInfo();
+ }
+
+ @Override
+ public boolean isRetryImmediatelySupported() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public long getResponseCellSize() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void incrementResponseCellSize(long cellSize) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public long getResponseBlockSize() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void incrementResponseBlockSize(long blockSize) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean isConnectionOpen() {
+ return connection.channel.isOpen();
+ }
+
+ @Override
+ public String getHostAddress() {
+ return connection.getHostAddress();
+ }
+
+ @Override
+ public int getRemotePort() {
+ return connection.getRemotePort();
+ }
+
+ @Override
+ public UserGroupInformation getUser() {
+ return connection.user;
+ }
+
+ @Override
+ public long getSize() {
+ return this.size;
+ }
+
+ @Override
+ public RequestHeader getHeader() {
+ return this.header;
+ }
+
+ @Override
+ public String toShortString() {
+ String serviceName = this.connection.service != null ? this.connection.service
+ .getDescriptorForType().getName() : "null";
+ return "callId: " + this.id + " service: " + serviceName
+ + " methodName: " + ((this.md != null) ? this.md.getName() : "n/a")
+ + " size: "
+ + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1)
+ + " connection: " + connection.toString();
+ }
+
+ @Override
+ public String toTraceString() {
+ String serviceName = this.connection.service != null ? this.connection.service
+ .getDescriptorForType().getName() : "";
+ String methodName = (this.md != null) ? this.md.getName() : "";
+ return serviceName + "." + methodName;
+ }
+
+ @Override
+ public TraceInfo getTinfo() {
+ return tinfo;
+ }
+
+ @Override
+ public BlockingService getService() {
+ return service;
+ }
+
+ @Override
+ public MethodDescriptor getMethodDescriptor() {
+ return md;
+ }
+
+ @Override
+ public Message getParam() {
+ return param;
+ }
+
+ @Override
+ public CellScanner getCellScanner() {
+ return cellScanner;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public int getPriority() {
+ return this.header.getPriority();
+ }
+
+ @Override
+ public int getTimeout() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+ }
+
+ static BlockingService getService(
+ final List services, final String serviceName) {
+ BlockingServiceAndInterface bsasi = getServiceAndInterface(services,
+ serviceName);
+ return bsasi == null ? null : bsasi.getBlockingService();
+ }
+
+ static BlockingServiceAndInterface getServiceAndInterface(
+ final List services, final String serviceName) {
+ for (BlockingServiceAndInterface bs : services) {
+ if (bs.getBlockingService().getDescriptorForType().getName()
+ .equals(serviceName)) {
+ return bs;
+ }
+ }
+ return null;
+ }
+
+ private class Initializer extends ChannelInitializer {
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("header", new ConnectionHeaderHandler());
+ // pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4,
+ // 0, 4));
+ // pipeline.addLast("decoder", new MessageDecoder());
+ pipeline.addLast("decoder", new NettyProtocolDecoder());
+ pipeline.addLast("encoder", new MessageEncoder());
+ pipeline.addLast("schedulerHandler", new SchedulerHandler());
+ }
+
+ }
+
+ public class ConnectionHeaderHandler extends ReplayingDecoder {
+ // If initial preamble with version and magic has been read or not.
+ private boolean connectionPreambleRead = false;
+ private Connection connection;
+
+ public ConnectionHeaderHandler() {
+ super(State.CHECK_PROTOCOL_VERSION);
+ }
+
+ private void readPreamble(ChannelHandlerContext ctx, ByteBuf input) throws IOException {
+ if (input.readableBytes() < 6) {
+ return;
+ }
+ connection = new Connection(ctx.channel());
+ connection.readPreamble(input);
+ ((NettyProtocolDecoder) ctx.pipeline().get("decoder")).setConnection(connection);
+ connectionPreambleRead = true;
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List