diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 7ce86bd..79a1d1b 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -25,11 +25,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -48,10 +43,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -59,6 +57,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Lists;
+
@Category(IntegrationTests.class)
public class IntegrationTestRpcClient {
@@ -72,7 +72,7 @@ public class IntegrationTestRpcClient {
conf = HBaseConfiguration.create();
}
- static class TestRpcServer extends RpcServer {
+ static class TestRpcServer extends SimpleRpcServer {
TestRpcServer(Configuration conf) throws IOException {
this(new FifoRpcScheduler(conf, 1), conf);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 5301a67..0aabc10 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -1,4 +1,3 @@
-package org.apache.hadoop.hbase.ipc;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -16,6 +15,8 @@ package org.apache.hadoop.hbase.ipc;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.hadoop.hbase.ipc;
+
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 8b6379b..85c507a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -20,81 +20,35 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.Channels;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.WritableByteChannel;
-import java.security.GeneralSecurityException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.commons.crypto.cipher.CryptoCipherFactory;
-import org.apache.commons.crypto.random.CryptoRandom;
-import org.apache.commons.crypto.random.CryptoRandomFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
-import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@@ -103,91 +57,57 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
-import org.apache.hadoop.hbase.security.SaslStatus;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.TraceInfo;
import org.codehaus.jackson.map.ObjectMapper;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import com.google.common.annotations.VisibleForTesting;
/**
* An RPC server that hosts protobuf described Services.
*
- * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number
- * of Readers in an ExecutorPool, 10 by default. The Listener does an accept and then
- * round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does
- * total read off the channel and the parse from which it makes a Call. The call is wrapped in a
- * CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done
- * and loops till done.
- *
- *
Scheduler can be variously implemented but default simple scheduler has handlers to which it
- * has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run
- * taking from the queue. They run the CallRunner#run method on each item gotten from queue
- * and keep taking while the server is up.
- *
- * CallRunner#run executes the call. When done, asks the included Call to put itself on new
- * queue for Responder to pull from and return result to client.
- *
- * @see BlockingRpcClient
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
-public class RpcServer implements RpcServerInterface, ConfigurationObserver {
+public abstract class RpcServer implements RpcServerInterface,
+ ConfigurationObserver {
// LOG is being used in CallRunner and the log level is being changed in tests
public static final Log LOG = LogFactory.getLog(RpcServer.class);
- private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
+ protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
= new CallQueueTooBigException();
private final boolean authorize;
- private boolean isSecurityEnabled;
+ protected boolean isSecurityEnabled;
public static final byte CURRENT_VERSION = 0;
@@ -200,31 +120,28 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
/**
* How many calls/handler are allowed in the queue.
*/
- static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
+ protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
- private final CellBlockBuilder cellBlockBuilder;
+ protected final CellBlockBuilder cellBlockBuilder;
- private static final String AUTH_FAILED_FOR = "Auth failed for ";
- private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
- private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
- Server.class.getName());
+ protected static final String AUTH_FAILED_FOR = "Auth failed for ";
+ protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
+ protected static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."
+ + Server.class.getName());
protected SecretManager secretManager;
protected ServiceAuthorizationManager authManager;
/** This is set to Call object before Handler invokes an RPC and ybdie
* after the call returns.
*/
- protected static final ThreadLocal CurCall =
- new ThreadLocal();
+ protected static final ThreadLocal CurCall = new ThreadLocal();
/** Keeps MonitoredRPCHandler per handler thread. */
- static final ThreadLocal MONITORED_RPC
+ protected static final ThreadLocal MONITORED_RPC
= new ThreadLocal();
protected final InetSocketAddress bindAddress;
- protected int port; // port we listen on
- protected InetSocketAddress address; // inet address we listen on
- private int readThreads; // number of read threads
+
protected MetricsHBaseServer metrics;
protected final Configuration conf;
@@ -240,8 +157,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE}
* @see {@link #callQueueSizeInBytes}
*/
- private final long maxQueueSizeInBytes;
- private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
+ protected final long maxQueueSizeInBytes;
+ protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
/**
* This is a running count of the size in bytes of all outstanding calls whether currently
@@ -249,11 +166,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
protected final LongAdder callQueueSizeInBytes = new LongAdder();
- protected int socketSendBufferSize;
- protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
- protected final boolean tcpKeepAlive; // if T then use keepalives
- protected final long purgeTimeout; // in milliseconds
-
/**
* This flag is used to indicate to sub threads when they should go down. When we call
* {@link #start()}, all threads started will consult this flag on whether they should
@@ -267,55 +179,51 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
volatile boolean started = false;
- // maintains the set of client connections and handles idle timeouts
- private ConnectionManager connectionManager;
- private Listener listener = null;
- protected Responder responder = null;
protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
protected HBaseRPCErrorHandler errorHandler = null;
- static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
- private static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
+ protected static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
+ protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
new RequestTooBigException();
- private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
- private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
+ protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
+ protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
/**
* Minimum allowable timeout (in milliseconds) in rpc request's header. This
* configuration exists to prevent the rpc service regarding this request as timeout immediately.
*/
- private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
- private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
+ protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
+ protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
/** Default value for above params */
- private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
- private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
- private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
+ protected static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
+ protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
+ protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ protected static final ObjectMapper MAPPER = new ObjectMapper();
- private final int maxRequestSize;
- private final int warnResponseTime;
- private final int warnResponseSize;
+ protected final int maxRequestSize;
+ protected final int warnResponseTime;
+ protected final int warnResponseSize;
- private final int minClientRequestTimeout;
+ protected final int minClientRequestTimeout;
- private final Server server;
- private final List services;
+ protected final Server server;
+ protected final List services;
- private final RpcScheduler scheduler;
+ protected final RpcScheduler scheduler;
- private UserProvider userProvider;
+ protected UserProvider userProvider;
- private final ByteBufferPool reservoir;
+ protected final ByteBufferPool reservoir;
// The requests and response will use buffers from ByteBufferPool, when the size of the
// request/response is at least this size.
// We make this to be 1/6th of the pool buffer size.
- private final int minSizeForReservoirUse;
+ protected final int minSizeForReservoirUse;
- private volatile boolean allowFallbackToSimpleAuth;
+ protected volatile boolean allowFallbackToSimpleAuth;
/**
* Used to get details for scan with a scanner_id
@@ -327,8 +235,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
*/
- @InterfaceAudience.Private
- public class Call implements RpcCall {
+ @InterfaceStability.Evolving
+ public abstract class Call implements RpcCall {
protected int id; // the client's call id
protected BlockingService service;
protected MethodDescriptor md;
@@ -347,17 +255,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* Chain of buffers to send as response.
*/
protected BufferChain response;
- protected Responder responder;
protected long size; // size of current call
protected boolean isError;
protected TraceInfo tinfo;
- private ByteBufferListOutputStream cellBlockStream = null;
- private CallCleanup reqCleanup = null;
+ protected ByteBufferListOutputStream cellBlockStream = null;
+ protected CallCleanup reqCleanup = null;
- private User user;
- private InetAddress remoteAddress;
- private RpcCallback rpcCallback;
+ protected User user;
+ protected InetAddress remoteAddress;
+ protected RpcCallback rpcCallback;
private long responseCellSize = 0;
private long responseBlockSize = 0;
@@ -365,8 +272,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="Can't figure why this complaint is happening... see below")
- Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
- Message param, CellScanner cellScanner, Connection connection, Responder responder,
+ Call(int id, final BlockingService service, final MethodDescriptor md,
+ RequestHeader header, Message param, CellScanner cellScanner,
+ Connection connection,
long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) {
this.id = id;
@@ -378,7 +286,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
- this.responder = responder;
this.isError = false;
this.size = size;
this.tinfo = tinfo;
@@ -391,23 +298,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.reqCleanup = reqCleanup;
}
- /**
- * Call is done. Execution happened and we returned results to client. It is now safe to
- * cleanup.
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
- justification="Presume the lock on processing request held by caller is protection enough")
- void done() {
- if (this.cellBlockStream != null) {
- this.cellBlockStream.releaseResources();// This will return back the BBs which we
- // got from pool.
- this.cellBlockStream = null;
- }
- cleanup();// If the call was run successfuly, we might have already returned the
- // BB back to pool. No worries..Then inputCellBlock will be null
- this.connection.decRpcCount(); // Say that we're done with this call.
- }
-
@Override
public void cleanup() {
if (this.reqCleanup != null) {
@@ -428,10 +318,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return this.header;
}
- public boolean hasPriority() {
- return this.header.hasPriority();
- }
-
@Override
public int getPriority() {
return this.header.getPriority();
@@ -506,8 +392,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
- ByteBuffer headerBuf =
- createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
+ ByteBuffer headerBuf = createHeaderAndMessageBytes(m, header,
+ cellBlockSize, cellBlock);
ByteBuffer[] responseBufs = null;
int cellBlockBufferSize = 0;
if (cellBlock != null) {
@@ -669,15 +555,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
@Override
- public long disconnectSince() {
- if (!connection.channel.isOpen()) {
- return System.currentTimeMillis() - timestamp;
- } else {
- return -1L;
- }
- }
-
- @Override
public long getResponseCellSize() {
return responseCellSize;
}
@@ -708,13 +585,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
@Override
- public synchronized void sendResponseIfReady() throws IOException {
- // set param null to reduce memory pressure
- this.param = null;
- this.responder.doRespond(this);
- }
-
- @Override
public User getRequestUser() {
return user;
}
@@ -773,7 +643,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@Override
public void setReceiveTime(long t) {
this.timestamp = t;
-
}
@Override
@@ -784,7 +653,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@Override
public void setStartTime(long t) {
this.startTime = t;
-
}
@Override
@@ -805,653 +673,73 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
@FunctionalInterface
- static interface CallCleanup {
+ protected static interface CallCleanup {
void run();
}
- /** Listens on the socket. Creates jobs for the handler threads*/
- private class Listener extends Thread {
-
- private ServerSocketChannel acceptChannel = null; //the accept channel
- private Selector selector = null; //the selector that we use for the server
- private Reader[] readers = null;
- private int currentReader = 0;
- private final int readerPendingConnectionQueueLength;
-
- private ExecutorService readPool;
-
- public Listener(final String name) throws IOException {
- super(name);
- // The backlog of requests that we will have the serversocket carry.
- int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
- readerPendingConnectionQueueLength =
- conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
- // Create a new server socket and set to non blocking mode
- acceptChannel = ServerSocketChannel.open();
- acceptChannel.configureBlocking(false);
-
- // Bind the server socket to the binding addrees (can be different from the default interface)
- bind(acceptChannel.socket(), bindAddress, backlogLength);
- port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
- address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
- // create a selector;
- selector = Selector.open();
-
- readers = new Reader[readThreads];
- // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
- // has an advantage in that it is easy to shutdown the pool.
- readPool = Executors.newFixedThreadPool(readThreads,
- new ThreadFactoryBuilder().setNameFormat(
- "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
- ",port=" + port).setDaemon(true)
- .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
- for (int i = 0; i < readThreads; ++i) {
- Reader reader = new Reader();
- readers[i] = reader;
- readPool.execute(reader);
- }
- LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
-
- // Register accepts on the server socket with the selector.
- acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
- this.setName("RpcServer.listener,port=" + port);
- this.setDaemon(true);
- }
-
-
- private class Reader implements Runnable {
- final private LinkedBlockingQueue pendingConnections;
- private final Selector readSelector;
-
- Reader() throws IOException {
- this.pendingConnections =
- new LinkedBlockingQueue(readerPendingConnectionQueueLength);
- this.readSelector = Selector.open();
- }
-
- @Override
- public void run() {
- try {
- doRunLoop();
- } finally {
- try {
- readSelector.close();
- } catch (IOException ioe) {
- LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
- }
- }
- }
-
- private synchronized void doRunLoop() {
- while (running) {
- try {
- // Consume as many connections as currently queued to avoid
- // unbridled acceptance of connections that starves the select
- int size = pendingConnections.size();
- for (int i=size; i>0; i--) {
- Connection conn = pendingConnections.take();
- conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
- }
- readSelector.select();
- Iterator iter = readSelector.selectedKeys().iterator();
- while (iter.hasNext()) {
- SelectionKey key = iter.next();
- iter.remove();
- if (key.isValid()) {
- if (key.isReadable()) {
- doRead(key);
- }
- }
- key = null;
- }
- } catch (InterruptedException e) {
- if (running) { // unexpected -- log it
- LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
- }
- return;
- } catch (IOException ex) {
- LOG.info(getName() + ": IOException in Reader", ex);
- }
- }
- }
-
- /**
- * Updating the readSelector while it's being used is not thread-safe,
- * so the connection must be queued. The reader will drain the queue
- * and update its readSelector before performing the next select
- */
- public void addConnection(Connection conn) throws IOException {
- pendingConnections.add(conn);
- readSelector.wakeup();
- }
- }
-
- @Override
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
- justification="selector access is not synchronized; seems fine but concerned changing " +
- "it will have per impact")
- public void run() {
- LOG.info(getName() + ": starting");
- connectionManager.startIdleScan();
- while (running) {
- SelectionKey key = null;
- try {
- selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
- Iterator iter = selector.selectedKeys().iterator();
- while (iter.hasNext()) {
- key = iter.next();
- iter.remove();
- try {
- if (key.isValid()) {
- if (key.isAcceptable())
- doAccept(key);
- }
- } catch (IOException ignored) {
- if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
- }
- key = null;
- }
- } catch (OutOfMemoryError e) {
- if (errorHandler != null) {
- if (errorHandler.checkOOME(e)) {
- LOG.info(getName() + ": exiting on OutOfMemoryError");
- closeCurrentConnection(key, e);
- connectionManager.closeIdle(true);
- return;
- }
- } else {
- // we can run out of memory if we have too many threads
- // log the event and sleep for a minute and give
- // some thread(s) a chance to finish
- LOG.warn(getName() + ": OutOfMemoryError in server select", e);
- closeCurrentConnection(key, e);
- connectionManager.closeIdle(true);
- try {
- Thread.sleep(60000);
- } catch (InterruptedException ex) {
- LOG.debug("Interrupted while sleeping");
- }
- }
- } catch (Exception e) {
- closeCurrentConnection(key, e);
- }
- }
- LOG.info(getName() + ": stopping");
- synchronized (this) {
- try {
- acceptChannel.close();
- selector.close();
- } catch (IOException ignored) {
- if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
- }
-
- selector= null;
- acceptChannel= null;
-
- // close all connections
- connectionManager.stopIdleScan();
- connectionManager.closeAll();
- }
- }
-
- private void closeCurrentConnection(SelectionKey key, Throwable e) {
- if (key != null) {
- Connection c = (Connection)key.attachment();
- if (c != null) {
- closeConnection(c);
- key.attach(null);
- }
- }
- }
-
- InetSocketAddress getAddress() {
- return address;
- }
-
- void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- SocketChannel channel;
- while ((channel = server.accept()) != null) {
- channel.configureBlocking(false);
- channel.socket().setTcpNoDelay(tcpNoDelay);
- channel.socket().setKeepAlive(tcpKeepAlive);
- Reader reader = getReader();
- Connection c = connectionManager.register(channel);
- // If the connectionManager can't take it, close the connection.
- if (c == null) {
- if (channel.isOpen()) {
- IOUtils.cleanup(null, channel);
- }
- continue;
- }
- key.attach(c); // so closeCurrentConnection can get the object
- reader.addConnection(c);
- }
- }
-
- void doRead(SelectionKey key) throws InterruptedException {
- int count;
- Connection c = (Connection) key.attachment();
- if (c == null) {
- return;
- }
- c.setLastContact(System.currentTimeMillis());
- try {
- count = c.readAndProcess();
- } catch (InterruptedException ieo) {
- LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
- throw ieo;
- } catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": Caught exception while reading:", e);
- }
- count = -1; //so that the (count < 0) block is executed
- }
- if (count < 0) {
- closeConnection(c);
- c = null;
- } else {
- c.setLastContact(System.currentTimeMillis());
- }
- }
-
- synchronized void doStop() {
- if (selector != null) {
- selector.wakeup();
- Thread.yield();
- }
- if (acceptChannel != null) {
- try {
- acceptChannel.socket().close();
- } catch (IOException e) {
- LOG.info(getName() + ": exception in closing listener socket. " + e);
- }
- }
- readPool.shutdownNow();
- }
-
- // The method that will return the next reader to work with
- // Simplistic implementation of round robin for now
- Reader getReader() {
- currentReader = (currentReader + 1) % readers.length;
- return readers[currentReader];
- }
- }
-
- // Sends responses of RPC back to clients.
- protected class Responder extends Thread {
- private final Selector writeSelector;
- private final Set writingCons =
- Collections.newSetFromMap(new ConcurrentHashMap());
-
- Responder() throws IOException {
- this.setName("RpcServer.responder");
- this.setDaemon(true);
- this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
- writeSelector = Selector.open(); // create a selector
- }
-
- @Override
- public void run() {
- LOG.debug(getName() + ": starting");
- try {
- doRunLoop();
- } finally {
- LOG.info(getName() + ": stopping");
- try {
- writeSelector.close();
- } catch (IOException ioe) {
- LOG.error(getName() + ": couldn't close write selector", ioe);
- }
- }
- }
-
- /**
- * Take the list of the connections that want to write, and register them
- * in the selector.
- */
- private void registerWrites() {
- Iterator it = writingCons.iterator();
- while (it.hasNext()) {
- Connection c = it.next();
- it.remove();
- SelectionKey sk = c.channel.keyFor(writeSelector);
- try {
- if (sk == null) {
- try {
- c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
- } catch (ClosedChannelException e) {
- // ignore: the client went away.
- if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
- }
- } else {
- sk.interestOps(SelectionKey.OP_WRITE);
- }
- } catch (CancelledKeyException e) {
- // ignore: the client went away.
- if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
- }
- }
- }
-
- /**
- * Add a connection to the list that want to write,
- */
- public void registerForWrite(Connection c) {
- if (writingCons.add(c)) {
- writeSelector.wakeup();
- }
- }
-
- private void doRunLoop() {
- long lastPurgeTime = 0; // last check for old calls.
- while (running) {
- try {
- registerWrites();
- int keyCt = writeSelector.select(purgeTimeout);
- if (keyCt == 0) {
- continue;
- }
-
- Set keys = writeSelector.selectedKeys();
- Iterator iter = keys.iterator();
- while (iter.hasNext()) {
- SelectionKey key = iter.next();
- iter.remove();
- try {
- if (key.isValid() && key.isWritable()) {
- doAsyncWrite(key);
- }
- } catch (IOException e) {
- LOG.debug(getName() + ": asyncWrite", e);
- }
- }
-
- lastPurgeTime = purge(lastPurgeTime);
-
- } catch (OutOfMemoryError e) {
- if (errorHandler != null) {
- if (errorHandler.checkOOME(e)) {
- LOG.info(getName() + ": exiting on OutOfMemoryError");
- return;
- }
- } else {
- //
- // we can run out of memory if we have too many threads
- // log the event and sleep for a minute and give
- // some thread(s) a chance to finish
- //
- LOG.warn(getName() + ": OutOfMemoryError in server select", e);
- try {
- Thread.sleep(60000);
- } catch (InterruptedException ex) {
- LOG.debug("Interrupted while sleeping");
- return;
- }
- }
- } catch (Exception e) {
- LOG.warn(getName() + ": exception in Responder " +
- StringUtils.stringifyException(e), e);
- }
- }
- LOG.info(getName() + ": stopped");
- }
-
- /**
- * If there were some calls that have not been sent out for a
- * long time, we close the connection.
- * @return the time of the purge.
- */
- private long purge(long lastPurgeTime) {
- long now = System.currentTimeMillis();
- if (now < lastPurgeTime + purgeTimeout) {
- return lastPurgeTime;
- }
-
- ArrayList conWithOldCalls = new ArrayList();
- // get the list of channels from list of keys.
- synchronized (writeSelector.keys()) {
- for (SelectionKey key : writeSelector.keys()) {
- Connection connection = (Connection) key.attachment();
- if (connection == null) {
- throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
- }
- Call call = connection.responseQueue.peekFirst();
- if (call != null && now > call.timestamp + purgeTimeout) {
- conWithOldCalls.add(call.connection);
- }
- }
- }
-
- // Seems safer to close the connection outside of the synchronized loop...
- for (Connection connection : conWithOldCalls) {
- closeConnection(connection);
- }
-
- return now;
- }
-
- private void doAsyncWrite(SelectionKey key) throws IOException {
- Connection connection = (Connection) key.attachment();
- if (connection == null) {
- throw new IOException("doAsyncWrite: no connection");
- }
- if (key.channel() != connection.channel) {
- throw new IOException("doAsyncWrite: bad channel");
- }
-
- if (processAllResponses(connection)) {
- try {
- // We wrote everything, so we don't need to be told when the socket is ready for
- // write anymore.
- key.interestOps(0);
- } catch (CancelledKeyException e) {
- /* The Listener/reader might have closed the socket.
- * We don't explicitly cancel the key, so not sure if this will
- * ever fire.
- * This warning could be removed.
- */
- LOG.warn("Exception while changing ops : " + e);
- }
- }
- }
-
- /**
- * Process the response for this call. You need to have the lock on
- * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
- *
- * @param call the call
- * @return true if we proceed the call fully, false otherwise.
- * @throws IOException
- */
- private boolean processResponse(final Call call) throws IOException {
- boolean error = true;
- try {
- // Send as much data as we can in the non-blocking fashion
- long numBytes = channelWrite(call.connection.channel, call.response);
- if (numBytes < 0) {
- throw new HBaseIOException("Error writing on the socket " +
- "for the call:" + call.toShortString());
- }
- error = false;
- } finally {
- if (error) {
- LOG.debug(getName() + call.toShortString() + ": output error -- closing");
- // We will be closing this connection itself. Mark this call as done so that all the
- // buffer(s) it got from pool can get released
- call.done();
- closeConnection(call.connection);
- }
- }
-
- if (!call.response.hasRemaining()) {
- call.done();
- return true;
- } else {
- return false; // Socket can't take more, we will have to come back.
- }
- }
-
- /**
- * Process all the responses for this connection
- *
- * @return true if all the calls were processed or that someone else is doing it.
- * false if there * is still some work to do. In this case, we expect the caller to
- * delay us.
- * @throws IOException
- */
- private boolean processAllResponses(final Connection connection) throws IOException {
- // We want only one writer on the channel for a connection at a time.
- connection.responseWriteLock.lock();
- try {
- for (int i = 0; i < 20; i++) {
- // protection if some handlers manage to need all the responder
- Call call = connection.responseQueue.pollFirst();
- if (call == null) {
- return true;
- }
- if (!processResponse(call)) {
- connection.responseQueue.addFirst(call);
- return false;
- }
- }
- } finally {
- connection.responseWriteLock.unlock();
- }
-
- return connection.responseQueue.isEmpty();
- }
-
- //
- // Enqueue a response from the application.
- //
- void doRespond(Call call) throws IOException {
- boolean added = false;
-
- // If there is already a write in progress, we don't wait. This allows to free the handlers
- // immediately for other tasks.
- if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
- try {
- if (call.connection.responseQueue.isEmpty()) {
- // If we're alone, we can try to do a direct call to the socket. It's
- // an optimisation to save on context switches and data transfer between cores..
- if (processResponse(call)) {
- return; // we're done.
- }
- // Too big to fit, putting ahead.
- call.connection.responseQueue.addFirst(call);
- added = true; // We will register to the selector later, outside of the lock.
- }
- } finally {
- call.connection.responseWriteLock.unlock();
- }
- }
-
- if (!added) {
- call.connection.responseQueue.addLast(call);
- }
- call.responder.registerForWrite(call.connection);
-
- // set the serve time when the response has to be sent later
- call.timestamp = System.currentTimeMillis();
- }
- }
-
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="VO_VOLATILE_INCREMENT",
justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
- public class Connection {
+ public abstract class Connection {
// If initial preamble with version and magic has been read or not.
- private boolean connectionPreambleRead = false;
+ protected boolean connectionPreambleRead = false;
// If the connection header has been read or not.
- private boolean connectionHeaderRead = false;
- protected SocketChannel channel;
- private ByteBuff data;
- private CallCleanup callCleanup;
- private ByteBuffer dataLengthBuffer;
- protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque();
- private final Lock responseWriteLock = new ReentrantLock();
- private LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
- private long lastContact;
- private InetAddress addr;
- protected Socket socket;
+ protected boolean connectionHeaderRead = false;
+
+ protected CallCleanup callCleanup;
+
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
protected String hostAddress;
protected int remotePort;
- ConnectionHeader connectionHeader;
+ protected InetAddress addr;
+ protected ConnectionHeader connectionHeader;
/**
* Codec the client asked use.
*/
- private Codec codec;
+ protected Codec codec;
/**
* Compression codec the client asked us use.
*/
- private CompressionCodec compressionCodec;
- BlockingService service;
-
- private AuthMethod authMethod;
- private boolean saslContextEstablished;
- private boolean skipInitialSaslHandshake;
- private ByteBuffer unwrappedData;
- // When is this set? FindBugs wants to know! Says NP
- private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
- boolean useSasl;
- SaslServer saslServer;
- private CryptoAES cryptoAES;
- private boolean useWrap = false;
- private boolean useCryptoAesWrap = false;
+ protected CompressionCodec compressionCodec;
+ protected BlockingService service;
+
+ protected AuthMethod authMethod;
+ protected boolean saslContextEstablished;
+ protected boolean skipInitialSaslHandshake;
+
+ protected boolean useSasl;
+ protected SaslServer saslServer;
+ protected CryptoAES cryptoAES;
+ protected boolean useWrap = false;
+ protected boolean useCryptoAesWrap = false;
// Fake 'call' for failed authorization response
- private static final int AUTHORIZATION_FAILED_CALLID = -1;
- private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
- null, null, this, null, 0, null, null, 0, null);
- private ByteArrayOutputStream authFailedResponse =
+ protected static final int AUTHORIZATION_FAILED_CALLID = -1;
+
+ protected ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
- private static final int SASL_CALLID = -33;
- private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
- 0, null, null, 0, null);
+ protected static final int SASL_CALLID = -33;
+
// Fake 'call' for connection header response
- private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
- private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID,
- null, null, null, null, null, this, null, 0, null, null, 0, null);
+ protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
// was authentication allowed with a fallback to simple auth
- private boolean authenticatedWithFallback;
+ protected boolean authenticatedWithFallback;
- private boolean retryImmediatelySupported = false;
+ protected boolean retryImmediatelySupported = false;
public UserGroupInformation attemptingUser = null; // user name before auth
protected User user = null;
protected UserGroupInformation ugi = null;
- public Connection(SocketChannel channel, long lastContact) {
- this.channel = channel;
- this.lastContact = lastContact;
- this.data = null;
+ public Connection() {
this.callCleanup = null;
- this.dataLengthBuffer = ByteBuffer.allocate(4);
- this.socket = channel.socket();
- this.addr = socket.getInetAddress();
- if (addr == null) {
- this.hostAddress = "*Unknown*";
- } else {
- this.hostAddress = addr.getHostAddress();
- }
- this.remotePort = socket.getPort();
- if (socketSendBufferSize != 0) {
- try {
- socket.setSendBufferSize(socketSendBufferSize);
- } catch (IOException e) {
- LOG.warn("Connection: unable to set socket send buffer size to " +
- socketSendBufferSize);
- }
- }
}
- @Override
+ @Override
public String toString() {
return getHostAddress() + ":" + remotePort;
}
@@ -1468,10 +756,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return remotePort;
}
- public void setLastContact(long lastContact) {
- this.lastContact = lastContact;
- }
-
public VersionInfo getVersionInfo() {
if (connectionHeader.hasVersionInfo()) {
return connectionHeader.getVersionInfo();
@@ -1479,940 +763,38 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return null;
}
- public long getLastContact() {
- return lastContact;
+ protected String getFatalConnectionString(final int version,
+ final byte authByte) {
+ return "serverVersion=" + CURRENT_VERSION +
+ ", clientVersion=" + version + ", authMethod=" + authByte +
+ ", authSupported=" + (authMethod != null) + " from " + toString();
}
- /* Return true if the connection has no outstanding rpc */
- private boolean isIdle() {
- return rpcCount.sum() == 0;
- }
+ public abstract boolean isConnectionOpen();
- /* Decrement the outstanding RPC count */
- protected void decRpcCount() {
- rpcCount.decrement();
- }
+ }
- /* Increment the outstanding RPC count */
- protected void incRpcCount() {
- rpcCount.increment();
+ /**
+ * Datastructure for passing a {@link BlockingService} and its associated class of
+ * protobuf service interface. For example, a server that fielded what is defined
+ * in the client protobuf service would pass in an implementation of the client blocking service
+ * and then its ClientService.BlockingInterface.class. Used checking connection setup.
+ */
+ public static class BlockingServiceAndInterface {
+ private final BlockingService service;
+ private final Class> serviceInterface;
+ public BlockingServiceAndInterface(final BlockingService service,
+ final Class> serviceInterface) {
+ this.service = service;
+ this.serviceInterface = serviceInterface;
}
-
- private UserGroupInformation getAuthorizedUgi(String authorizedId)
- throws IOException {
- UserGroupInformation authorizedUgi;
- if (authMethod == AuthMethod.DIGEST) {
- TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
- secretManager);
- authorizedUgi = tokenId.getUser();
- if (authorizedUgi == null) {
- throw new AccessDeniedException(
- "Can't retrieve username from tokenIdentifier.");
- }
- authorizedUgi.addTokenIdentifier(tokenId);
- } else {
- authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
- }
- authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
- return authorizedUgi;
+ public Class> getServiceInterface() {
+ return this.serviceInterface;
}
-
- private void saslReadAndProcess(ByteBuff saslToken) throws IOException,
- InterruptedException {
- if (saslContextEstablished) {
- if (LOG.isTraceEnabled())
- LOG.trace("Have read input token of size " + saslToken.limit()
- + " for processing by saslServer.unwrap()");
-
- if (!useWrap) {
- processOneRpc(saslToken);
- } else {
- byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
- byte [] plaintextData;
- if (useCryptoAesWrap) {
- // unwrap with CryptoAES
- plaintextData = cryptoAES.unwrap(b, 0, b.length);
- } else {
- plaintextData = saslServer.unwrap(b, 0, b.length);
- }
- processUnwrappedData(plaintextData);
- }
- } else {
- byte[] replyToken;
- try {
- if (saslServer == null) {
- switch (authMethod) {
- case DIGEST:
- if (secretManager == null) {
- throw new AccessDeniedException(
- "Server is not configured to do DIGEST authentication.");
- }
- saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
- .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
- HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
- secretManager, this));
- break;
- default:
- UserGroupInformation current = UserGroupInformation.getCurrentUser();
- String fullName = current.getUserName();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Kerberos principal name is " + fullName);
- }
- final String names[] = SaslUtil.splitKerberosName(fullName);
- if (names.length != 3) {
- throw new AccessDeniedException(
- "Kerberos principal name does NOT have the expected "
- + "hostname part: " + fullName);
- }
- current.doAs(new PrivilegedExceptionAction