diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f740b3b..03d4e3b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1625,7 +1625,7 @@ HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "", "Bind host on which to run the HiveServer2 Thrift service."), - // http (over thrift) transport settings + // Http (over thrift) transport settings HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001, "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'http'."), HIVE_SERVER2_THRIFT_HTTP_PATH("hive.server2.thrift.http.path", "cliservice", @@ -1644,7 +1644,11 @@ "Keepalive time for an idle http worker thread. When the number of workers exceeds min workers, " + "excessive threads are killed after this time interval."), - // binary transport settings + // Binary (TCP) transport settings + HIVE_SERVER2_TCP_SOCKET_BLOCKING_TIMEOUT("hive.server2.tcp.socket.blocking.timeout", 0, + "Timeout (in seconds) on blocking socket operations (accept, read). 0 means infinite timeout."), + HIVE_SERVER2_TCP_SOCKET_KEEPALIVE("hive.server2.tcp.socket.keepalive", true, + "Whether to send tcp socket keepalive probe"), HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000, "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'binary'."), HIVE_SERVER2_THRIFT_SASL_QOP("hive.server2.thrift.sasl.qop", "auth", diff --git a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java index 8352951..dbdd735 100644 --- a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java +++ b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; @@ -215,8 +217,8 @@ public static TTransport getSSLSocket(String host, int port, int loginTimeout, return TSSLTransportFactory.getClientSocket(host, port, loginTimeout, params); } - public static TServerSocket getServerSocket(String hiveHost, int portNum) - throws TTransportException { + public static TServerSocket getServerSocket(String hiveHost, int portNum, int socketTimeout, + boolean keepAlive) throws TTransportException { InetSocketAddress serverAddress; if (hiveHost == null || hiveHost.isEmpty()) { // Wildcard bind @@ -224,14 +226,17 @@ public static TServerSocket getServerSocket(String hiveHost, int portNum) } else { serverAddress = new InetSocketAddress(hiveHost, portNum); } - return new TServerSocket(serverAddress); + TServerSocket serverSocket = new TServerSocket(serverAddress, socketTimeout); + if (keepAlive) { + serverSocket = new TServerSocketKeepAlive(serverSocket.getServerSocket()); + } + return serverSocket; } public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, String keyStorePath, - String keyStorePassWord, List sslVersionBlacklist) throws TTransportException, - UnknownHostException { - TSSLTransportFactory.TSSLTransportParameters params = - new TSSLTransportFactory.TSSLTransportParameters(); + String keyStorePassWord, List sslVersionBlacklist, int socketTimeout, + boolean keepAlive) throws TTransportException, UnknownHostException { + TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters(); params.setKeyStore(keyStorePath, keyStorePassWord); InetSocketAddress serverAddress; if (hiveHost == null || hiveHost.isEmpty()) { @@ -240,8 +245,8 @@ public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, Str } else { serverAddress = new InetSocketAddress(hiveHost, portNum); } - TServerSocket thriftServerSocket = - TSSLTransportFactory.getServerSocket(portNum, 0, serverAddress.getAddress(), params); + TServerSocket thriftServerSocket = TSSLTransportFactory.getServerSocket(portNum, socketTimeout, + serverAddress.getAddress(), params); if (thriftServerSocket.getServerSocket() instanceof SSLServerSocket) { List sslVersionBlacklistLocal = new ArrayList(); for (String sslVersion : sslVersionBlacklist) { @@ -260,6 +265,9 @@ public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, Str LOG.info("SSL Server Socket Enabled Protocols: " + Arrays.toString(sslServerSocket.getEnabledProtocols())); } + if (keepAlive) { + thriftServerSocket = new TServerSocketKeepAlive(thriftServerSocket.getServerSocket()); + } return thriftServerSocket; } @@ -347,4 +355,25 @@ public static void verifyProxyAccess(String realUser, String proxyUser, String i } } + /** + * TServerSocketKeepAlive - like TServerSocket, but will enable keepalive for + * accepted sockets. + * + */ + static class TServerSocketKeepAlive extends TServerSocket { + public TServerSocketKeepAlive(ServerSocket serverSocket) throws TTransportException { + super(serverSocket); + } + + @Override + protected TSocket acceptImpl() throws TTransportException { + TSocket ts = super.acceptImpl(); + try { + ts.getSocket().setKeepAlive(true); + } catch (SocketException e) { + throw new TTransportException(e); + } + return ts; + } + } } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index b6e851a..dcfbf36 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -58,12 +58,14 @@ public void run() { TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory(); TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); TServerSocket serverSocket = null; + int socketTimeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_TCP_SOCKET_BLOCKING_TIMEOUT); + boolean keepAlive = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TCP_SOCKET_KEEPALIVE); List sslVersionBlacklist = new ArrayList(); for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) { sslVersionBlacklist.add(sslVersion); } if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { - serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum); + serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum, socketTimeout, keepAlive); } else { String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim(); if (keyStorePath.isEmpty()) { @@ -73,7 +75,7 @@ public void run() { String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath, - keyStorePassword, sslVersionBlacklist); + keyStorePassword, sslVersionBlacklist, socketTimeout, keepAlive); } // Server args