diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c7e5b3305f2443a63b1f3210b3f2fdd563cab965..5cf16098a287c91a5652adb8820a75270f8c917f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2304,6 +2304,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h", new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false), "The check interval for session/operation timeout, which can be disabled by setting to zero or negative value."), + HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT("hive.server2.close.session.on.disconnect", true, + "Session will be closed when connection is closed. Set this to false to have session outlive its parent connection."), HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", "7d", new TimeValidator(TimeUnit.MILLISECONDS), "Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value."), diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index cf575a405f3eed2e1ec470670d5832ec3af9e1c1..d9c7b2e844802cbf568d88df3d12729235e761ea 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -24,16 +24,25 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportFactory; @@ -94,7 +103,60 @@ public void run() { // TCP Server server = new TThreadPoolServer(sargs); - server.setServerEventHandler(serverEventHandler); + server.setServerEventHandler(new TServerEventHandler() { + @Override + public ServerContext createContext( + TProtocol input, TProtocol output) { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + try { + metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS); + metrics.incrementCounter(MetricsConstant.CUMULATIVE_CONNECTION_COUNT); + } catch (Exception e) { + LOG.warn("Error Reporting JDO operation to Metrics system", e); + } + } + return new ThriftCLIServerContext(); + } + + @Override + public void deleteContext(ServerContext serverContext, + TProtocol input, TProtocol output) { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + try { + metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS); + } catch (Exception e) { + LOG.warn("Error Reporting JDO operation to Metrics system", e); + } + } + ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext; + SessionHandle sessionHandle = context.getSessionHandle(); + if (sessionHandle != null) { + LOG.info("Session disconnected without closing properly. "); + try { + boolean close = cliService.getSessionManager().getSession(sessionHandle).getHiveConf() + .getBoolVar(ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT); + LOG.info((close ? "" : "Not ") + "Closing the session: " + sessionHandle); + if (close) { + cliService.closeSession(sessionHandle); + } + } catch (HiveSQLException e) { + LOG.warn("Failed to close session: " + e, e); + } + } + } + + @Override + public void preServe() { + } + + @Override + public void processContext(ServerContext serverContext, + TTransport input, TTransport output) { + currentServerContext.set(serverContext); + } + }); String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index be9833d1ac28951cb464716ddab4a7ea9d6b0a93..e789a386c2993b6c038a53770c5c321f2be769bd 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -28,9 +28,6 @@ import javax.security.auth.login.LoginException; -import org.apache.hadoop.hive.common.metrics.common.Metrics; -import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; -import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.common.ServerUtils; @@ -38,7 +35,6 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.AbstractService; import org.apache.hive.service.ServiceException; -import org.apache.hive.service.ServiceUtils; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.TSetIpAddressProcessor; import org.apache.hive.service.cli.CLIService; @@ -97,11 +93,8 @@ import org.apache.hive.service.rpc.thrift.TStatusCode; import org.apache.hive.service.server.HiveServer2; import org.apache.thrift.TException; -import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServer; -import org.apache.thrift.server.TServerEventHandler; -import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +127,6 @@ protected int maxWorkerThreads; protected long workerKeepAliveTime; - protected TServerEventHandler serverEventHandler; protected ThreadLocal currentServerContext; static class ThriftCLIServerContext implements ServerContext { @@ -153,55 +145,6 @@ public ThriftCLIService(CLIService service, String serviceName) { super(serviceName); this.cliService = service; currentServerContext = new ThreadLocal(); - serverEventHandler = new TServerEventHandler() { - @Override - public ServerContext createContext( - TProtocol input, TProtocol output) { - Metrics metrics = MetricsFactory.getInstance(); - if (metrics != null) { - try { - metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS); - metrics.incrementCounter(MetricsConstant.CUMULATIVE_CONNECTION_COUNT); - } catch (Exception e) { - LOG.warn("Error Reporting JDO operation to Metrics system", e); - } - } - return new ThriftCLIServerContext(); - } - - @Override - public void deleteContext(ServerContext serverContext, - TProtocol input, TProtocol output) { - Metrics metrics = MetricsFactory.getInstance(); - if (metrics != null) { - try { - metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS); - } catch (Exception e) { - LOG.warn("Error Reporting JDO operation to Metrics system", e); - } - } - ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext; - SessionHandle sessionHandle = context.getSessionHandle(); - if (sessionHandle != null) { - LOG.info("Session disconnected without closing properly, close it now"); - try { - cliService.closeSession(sessionHandle); - } catch (HiveSQLException e) { - LOG.warn("Failed to close session: " + e, e); - } - } - } - - @Override - public void preServe() { - } - - @Override - public void processContext(ServerContext serverContext, - TTransport input, TTransport output) { - currentServerContext.set(serverContext); - } - }; } @Override diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java index 3bd82e614a50c0b5419a926ff00a95dafd4b0ebb..d36f6c0b97b9ff88d7a9d4411c35f217a6a8ac91 100644 --- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java +++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java @@ -19,13 +19,17 @@ package org.apache.hive.service.cli; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.Service; import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.thrift.RetryingThriftCLIServiceClient; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.server.HiveServer2; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; + +import org.junit.Before; import org.junit.Test; import java.lang.reflect.Method; @@ -41,6 +45,38 @@ */ public class TestRetryingThriftCLIServiceClient { protected static ThriftCLIService service; + private HiveConf hiveConf; + private HiveServer2 server; + + @Before + public void init() { + hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost"); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString()); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary"); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s"); + } + + private void startHiveServer() throws InterruptedException { + // Start hive server2 + server = new HiveServer2(); + server.init(hiveConf); + server.start(); + Thread.sleep(5000); + System.out.println("## HiveServer started"); + } + + private void stopHiveServer() { + if (server != null) { + // kill server + server.stop(); + } + } static class RetryingThriftCLIServiceClientTest extends RetryingThriftCLIServiceClient { int callCount = 0; @@ -74,31 +110,14 @@ protected synchronized TTransport connect(HiveConf conf) throws HiveSQLException return super.connect(conf); } } + @Test public void testRetryBehaviour() throws Exception { - // Start hive server2 - HiveConf hiveConf = new HiveConf(); - hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost"); - hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000); - hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString()); - hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary"); - hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3); - hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3); - hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10); - hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s"); - - final HiveServer2 server = new HiveServer2(); - server.init(hiveConf); - server.start(); - Thread.sleep(5000); - System.out.println("## HiveServer started"); - + startHiveServer(); // Check if giving invalid address causes retry in connection attempt hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 17000); try { - CLIServiceClient cliServiceClient = - RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); + RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); fail("Expected to throw exception for invalid port"); } catch (HiveSQLException sqlExc) { assertTrue(sqlExc.getCause() instanceof TTransportException); @@ -112,16 +131,14 @@ public void testRetryBehaviour() throws Exception { = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); System.out.println("## Created client"); - // kill server - server.stop(); + stopHiveServer(); Thread.sleep(5000); // submit few queries try { - Map confOverlay = new HashMap(); RetryingThriftCLIServiceClientTest.handlerInst.callCount = 0; RetryingThriftCLIServiceClientTest.handlerInst.connectCount = 0; - SessionHandle session = cliServiceClient.openSession("anonymous", "anonymous"); + cliServiceClient.openSession("anonymous", "anonymous"); } catch (HiveSQLException exc) { exc.printStackTrace(); assertTrue(exc.getCause() instanceof TException); @@ -131,4 +148,69 @@ public void testRetryBehaviour() throws Exception { cliServiceClient.closeTransport(); } } + + @Test + public void testTransportClose() throws InterruptedException, HiveSQLException { + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 0); + try { + startHiveServer(); + RetryingThriftCLIServiceClient.CLIServiceClientWrapper client + = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); + client.closeTransport(); + try { + client.openSession("anonymous", "anonymous"); + fail("Shouldn't be able to open session when transport is closed."); + } catch(HiveSQLException ignored) { + + } + } finally { + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3); + stopHiveServer(); + } + } + + @Test + public void testSessionLifeAfterTransportClose() throws InterruptedException, HiveSQLException { + try { + startHiveServer(); + CLIService service = null; + for (Service s : server.getServices()) { + if (s instanceof CLIService) { + service = (CLIService) s; + } + } + if (service == null) { + service = new CLIService(server); + } + RetryingThriftCLIServiceClient.CLIServiceClientWrapper client + = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); + Map conf = new HashMap<>(); + conf.put(HiveConf.ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT.varname, "false"); + SessionHandle sessionHandle = client.openSession("anonymous", "anonymous", conf); + assertNotNull(sessionHandle); + HiveSession session = service.getSessionManager().getSession(sessionHandle); + OperationHandle op1 = session.executeStatementAsync("show databases", null); + assertNotNull(op1); + client.closeTransport(); + // Verify that session wasn't closed on transport close. + assertEquals(session, service.getSessionManager().getSession(sessionHandle)); + // Should be able to execute without failure in the session whose transport has been closed. + OperationHandle op2 = session.executeStatementAsync("show databases", null); + assertNotNull(op2); + // Make new client, since transport was closed for the last one. + client = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); + client.closeSession(sessionHandle); + // operations will be lost once owning session is closed. + for (OperationHandle op: new OperationHandle[]{op1, op2}) { + try { + client.getOperationStatus(op); + fail("Should have failed."); + } catch (HiveSQLException ignored) { + + } + } + } finally { + stopHiveServer(); + } + } }