diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index cf575a405f3eed2e1ec470670d5832ec3af9e1c1..18f4f777266122b85e63d416230f6f35cf6489d3 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -24,16 +24,25 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.common.metrics.common.Metrics; +import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportFactory; @@ -94,7 +103,45 @@ public void run() { // TCP Server server = new TThreadPoolServer(sargs); - server.setServerEventHandler(serverEventHandler); + server.setServerEventHandler(new TServerEventHandler() { + @Override + public ServerContext createContext( + TProtocol input, TProtocol output) { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + try { + metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS); + metrics.incrementCounter(MetricsConstant.CUMULATIVE_CONNECTION_COUNT); + } catch (Exception e) { + LOG.warn("Error Reporting JDO operation to Metrics system", e); + } + } + return new ThriftCLIServerContext(); + } + + @Override + public void deleteContext(ServerContext serverContext, + TProtocol input, TProtocol output) { + Metrics metrics = MetricsFactory.getInstance(); + if (metrics != null) { + try { + metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS); + } catch (Exception e) { + LOG.warn("Error Reporting JDO operation to Metrics system", e); + } + } + } + + @Override + public void preServe() { + } + + @Override + public void processContext(ServerContext serverContext, + TTransport input, TTransport output) { + currentServerContext.set(serverContext); + } + }); String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 0a2a761867ab596e942dbc78eaabc0ef920665a3..cb7fa87e8384ab33a6b5c56275f10954fe4e7b17 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -28,9 +28,6 @@ import javax.security.auth.login.LoginException; -import org.apache.hadoop.hive.common.metrics.common.Metrics; -import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; -import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.common.ServerUtils; @@ -38,7 +35,6 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.AbstractService; import org.apache.hive.service.ServiceException; -import org.apache.hive.service.ServiceUtils; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.TSetIpAddressProcessor; import org.apache.hive.service.cli.CLIService; @@ -97,11 +93,8 @@ import org.apache.hive.service.rpc.thrift.TStatusCode; import org.apache.hive.service.server.HiveServer2; import org.apache.thrift.TException; -import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServer; -import org.apache.thrift.server.TServerEventHandler; -import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +127,6 @@ protected int maxWorkerThreads; protected long workerKeepAliveTime; - protected TServerEventHandler serverEventHandler; protected ThreadLocal currentServerContext; static class ThriftCLIServerContext implements ServerContext { @@ -153,55 +145,6 @@ public ThriftCLIService(CLIService service, String serviceName) { super(serviceName); this.cliService = service; currentServerContext = new ThreadLocal(); - serverEventHandler = new TServerEventHandler() { - @Override - public ServerContext createContext( - TProtocol input, TProtocol output) { - Metrics metrics = MetricsFactory.getInstance(); - if (metrics != null) { - try { - metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS); - metrics.incrementCounter(MetricsConstant.CUMULATIVE_CONNECTION_COUNT); - } catch (Exception e) { - LOG.warn("Error Reporting JDO operation to Metrics system", e); - } - } - return new ThriftCLIServerContext(); - } - - @Override - public void deleteContext(ServerContext serverContext, - TProtocol input, TProtocol output) { - Metrics metrics = MetricsFactory.getInstance(); - if (metrics != null) { - try { - metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS); - } catch (Exception e) { - LOG.warn("Error Reporting JDO operation to Metrics system", e); - } - } - ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext; - SessionHandle sessionHandle = context.getSessionHandle(); - if (sessionHandle != null) { - LOG.info("Session disconnected without closing properly, close it now"); - try { - cliService.closeSession(sessionHandle); - } catch (HiveSQLException e) { - LOG.warn("Failed to close session: " + e, e); - } - } - } - - @Override - public void preServe() { - } - - @Override - public void processContext(ServerContext serverContext, - TTransport input, TTransport output) { - currentServerContext.set(serverContext); - } - }; } @Override diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java index 3bd82e614a50c0b5419a926ff00a95dafd4b0ebb..cba2c154a545f5bdb8b10d252f7d1b984fa3bdbd 100644 --- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java +++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java @@ -19,13 +19,17 @@ package org.apache.hive.service.cli; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.Service; import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.thrift.RetryingThriftCLIServiceClient; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.server.HiveServer2; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; + +import org.junit.Before; import org.junit.Test; import java.lang.reflect.Method; @@ -41,6 +45,38 @@ */ public class TestRetryingThriftCLIServiceClient { protected static ThriftCLIService service; + private HiveConf hiveConf; + private HiveServer2 server; + + @Before + public void init() { + hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost"); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString()); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary"); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3); + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10); + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s"); + } + + private void startHiveServer() throws InterruptedException { + // Start hive server2 + server = new HiveServer2(); + server.init(hiveConf); + server.start(); + Thread.sleep(5000); + System.out.println("## HiveServer started"); + } + + private void stopHiveServer() { + if (server != null) { + // kill server + server.stop(); + } + } static class RetryingThriftCLIServiceClientTest extends RetryingThriftCLIServiceClient { int callCount = 0; @@ -74,31 +110,14 @@ protected synchronized TTransport connect(HiveConf conf) throws HiveSQLException return super.connect(conf); } } + @Test public void testRetryBehaviour() throws Exception { - // Start hive server2 - HiveConf hiveConf = new HiveConf(); - hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost"); - hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000); - hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString()); - hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary"); - hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3); - hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3); - hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10); - hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s"); - - final HiveServer2 server = new HiveServer2(); - server.init(hiveConf); - server.start(); - Thread.sleep(5000); - System.out.println("## HiveServer started"); - + startHiveServer(); // Check if giving invalid address causes retry in connection attempt hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 17000); try { - CLIServiceClient cliServiceClient = - RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); + RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); fail("Expected to throw exception for invalid port"); } catch (HiveSQLException sqlExc) { assertTrue(sqlExc.getCause() instanceof TTransportException); @@ -112,16 +131,14 @@ public void testRetryBehaviour() throws Exception { = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); System.out.println("## Created client"); - // kill server - server.stop(); + stopHiveServer(); Thread.sleep(5000); // submit few queries try { - Map confOverlay = new HashMap(); RetryingThriftCLIServiceClientTest.handlerInst.callCount = 0; RetryingThriftCLIServiceClientTest.handlerInst.connectCount = 0; - SessionHandle session = cliServiceClient.openSession("anonymous", "anonymous"); + cliServiceClient.openSession("anonymous", "anonymous"); } catch (HiveSQLException exc) { exc.printStackTrace(); assertTrue(exc.getCause() instanceof TException); @@ -131,4 +148,39 @@ public void testRetryBehaviour() throws Exception { cliServiceClient.closeTransport(); } } + + @Test + public void testSessionLifeAfterTransportClose() throws InterruptedException, HiveSQLException { + try { + startHiveServer(); + CLIService service = null; + for (Service s : server.getServices()) { + if (s instanceof CLIService) { + service = (CLIService) s; + } + } + if (service == null) { + service = new CLIService(server); + } + RetryingThriftCLIServiceClient.CLIServiceClientWrapper client + = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); + SessionHandle sessionHandle = client.openSession("anonymous", "anonymous"); + assertNotNull(sessionHandle); + HiveSession session = service.getSessionManager().getSession(sessionHandle); + OperationHandle op1 = session.executeStatementAsync("show databases", null); + client.closeTransport(); + // Should be able to execute without failure in the session whose transport has been closed. + OperationHandle op2 = session.executeStatementAsync("show databases", null); + client.closeSession(sessionHandle); + // operations will be lost once owning session is closed. + try { + client.getOperationStatus(op1); + fail("Should have failed."); + } catch (HiveSQLException ignored) { + + } + } finally { + stopHiveServer(); + } + } }