diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 67c73e2241..df2d3a7b71 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -40,15 +40,18 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServer; import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; public class ThriftBinaryCLIService extends ThriftCLIService { private final Runnable oomHook; + protected TServer server; public ThriftBinaryCLIService(CLIService cliService, Runnable oomHook) { super(cliService, ThriftBinaryCLIService.class.getSimpleName()); @@ -56,14 +59,13 @@ public ThriftBinaryCLIService(CLIService cliService, Runnable oomHook) { } @Override - public void run() { + protected void initServer() { try { // Server thread pool String threadPoolName = "HiveServer2-Handler-Pool"; - ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads, - maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, - new SynchronousQueue(), new ThreadFactoryWithGarbageCleanup(threadPoolName), - oomHook); + ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads, maxWorkerThreads, + workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook); // Thrift configs hiveAuthFactory = new HiveAuthFactory(hiveConf); @@ -79,35 +81,32 @@ public void run() { } else { String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim(); if (keyStorePath.isEmpty()) { - throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname - + " Not configured for SSL connection"); + throw new IllegalArgumentException( + ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + " Not configured for SSL connection"); } String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); - serverSocket = HiveAuthUtils.getServerSSLSocket(hiveHost, portNum, keyStorePath, - keyStorePassword, sslVersionBlacklist); + serverSocket = HiveAuthUtils.getServerSSLSocket(hiveHost, portNum, keyStorePath, keyStorePassword, + sslVersionBlacklist); } // Server args int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE); - int requestTimeout = (int) hiveConf.getTimeVar( - HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS); - int beBackoffSlotLength = (int) hiveConf.getTimeVar( - HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS); - TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket) - .processorFactory(processorFactory).transportFactory(transportFactory) - .protocolFactory(new TBinaryProtocol.Factory()) + int requestTimeout = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, + TimeUnit.SECONDS); + int beBackoffSlotLength = (int) hiveConf + .getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS); + TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket).processorFactory(processorFactory) + .transportFactory(transportFactory).protocolFactory(new TBinaryProtocol.Factory()) .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize)) - .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS) - .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS) - .executorService(executorService); + .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS).beBackoffSlotLength(beBackoffSlotLength) + .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS).executorService(executorService); // TCP Server server = new TThreadPoolServer(sargs); server.setServerEventHandler(new TServerEventHandler() { @Override - public ServerContext createContext( - TProtocol input, TProtocol output) { + public ServerContext createContext(TProtocol input, TProtocol output) { Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { try { @@ -121,8 +120,7 @@ public ServerContext createContext( } @Override - public void deleteContext(ServerContext serverContext, - TProtocol input, TProtocol output) { + public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { try { @@ -137,7 +135,7 @@ public void deleteContext(ServerContext serverContext, LOG.info("Session disconnected without closing properly. "); try { boolean close = cliService.getSessionManager().getSession(sessionHandle).getHiveConf() - .getBoolVar(ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT); + .getBoolVar(ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT); LOG.info((close ? "" : "Not ") + "Closing the session: " + sessionHandle); if (close) { cliService.closeSession(sessionHandle); @@ -153,21 +151,39 @@ public void preServe() { } @Override - public void processContext(ServerContext serverContext, - TTransport input, TTransport output) { + public void processContext(ServerContext serverContext, TTransport input, TTransport output) { currentServerContext.set(serverContext); } }); - String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " - + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; + String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); + } catch (Exception e) { + throw new RuntimeException("Failed to init thrift server", e); + } + } + + @Override + public void run() { + try { server.serve(); } catch (Throwable t) { - LOG.error( - "Error starting HiveServer2: could not start " - + ThriftBinaryCLIService.class.getSimpleName(), t); + if (t instanceof InterruptedException) { + // This is likely a shutdown + LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down."); + } else { + LOG.error("Exception caught by " + this.getClass().getSimpleName() + + ". Exiting.", t); + } System.exit(-1); } } + @Override + protected void stopServer() { + server.stop(); + server = null; + LOG.info("Thrift server has stopped"); + } + } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index d425423b4e..68fe8d8aa1 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -134,9 +134,6 @@ protected int portNum; protected InetAddress serverIPAddress; protected String hiveHost; - protected TServer server; - protected org.eclipse.jetty.server.Server httpServer; - private boolean isStarted = false; protected boolean isEmbedded = false; @@ -145,6 +142,7 @@ protected int minWorkerThreads; protected int maxWorkerThreads; protected long workerKeepAliveTime; + private Thread serverThread; protected ThreadLocal currentServerContext; @@ -210,30 +208,30 @@ public synchronized void init(HiveConf hiveConf) { super.init(hiveConf); } + protected abstract void initServer(); + @Override public synchronized void start() { super.start(); if (!isStarted && !isEmbedded) { - new Thread(this).start(); + initServer(); + serverThread = new Thread(this); + serverThread.setName("Thrift Server"); + serverThread.start(); isStarted = true; } } + protected abstract void stopServer(); + @Override public synchronized void stop() { if (isStarted && !isEmbedded) { - if(server != null) { - server.stop(); - LOG.info("Thrift server has stopped"); - } - if((httpServer != null) && httpServer.isStarted()) { - try { - httpServer.stop(); - LOG.info("Http server has stopped"); - } catch (Exception e) { - LOG.error("Error stopping Http server: ", e); - } + if (serverThread != null) { + serverThread.interrupt(); + serverThread = null; } + stopServer(); isStarted = false; } super.stop(); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 0b3f2c3168..d4ea7aba8c 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -56,9 +56,9 @@ public class ThriftHttpCLIService extends ThriftCLIService { private static final String APPLICATION_THRIFT = "application/x-thrift"; + protected org.eclipse.jetty.server.Server server; private final Runnable oomHook; - public ThriftHttpCLIService(CLIService cliService, Runnable oomHook) { super(cliService, ThriftHttpCLIService.class.getSimpleName()); this.oomHook = oomHook; @@ -66,23 +66,23 @@ public ThriftHttpCLIService(CLIService cliService, Runnable oomHook) { /** * Configure Jetty to serve http requests. Example of a client connection URL: - * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, - * e.g. http://gateway:port/hive2/servlets/thrifths2/ + * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target + * URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/ */ @Override - public void run() { + protected void initServer() { try { // Server thread pool - // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests + // Start with minWorkerThreads, expand till maxWorkerThreads and reject + // subsequent requests String threadPoolName = "HiveServer2-HttpHandler-Pool"; ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads, - maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), - new ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook); + maxWorkerThreads,workerKeepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue(), new ThreadFactoryWithGarbageCleanup(threadPoolName), oomHook); ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService); // HTTP Server - httpServer = new Server(threadPool); - + server = new Server(threadPool); ServerConnector connector; @@ -105,20 +105,21 @@ public void run() { String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); if (keyStorePath.isEmpty()) { - throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + throw new IllegalArgumentException( + ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + " Not configured for SSL connection"); } SslContextFactory sslContextFactory = new SslContextFactory(); String[] excludedProtocols = hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(","); LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols)); sslContextFactory.addExcludeProtocols(excludedProtocols); - LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " + - Arrays.toString(sslContextFactory.getExcludeProtocols())); + LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " + + Arrays.toString(sslContextFactory.getExcludeProtocols())); sslContextFactory.setKeyStorePath(keyStorePath); sslContextFactory.setKeyStorePassword(keyStorePassword); - connector = new ServerConnector(httpServer, sslContextFactory, http); + connector = new ServerConnector(server, sslContextFactory, http); } else { - connector = new ServerConnector(httpServer, http); + connector = new ServerConnector(server, http); } connector.setPort(portNum); @@ -128,7 +129,7 @@ public void run() { TimeUnit.MILLISECONDS); connector.setIdleTimeout(maxIdleTime); - httpServer.addConnector(connector); + server.addConnector(connector); // Thrift configs hiveAuthFactory = new HiveAuthFactory(hiveConf); @@ -140,16 +141,15 @@ public void run() { // UGI for the http/_HOST (SPNego) principal UserGroupInformation httpUGI = cliService.getHttpUGI(); String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION); - TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType, - serviceUGI, httpUGI, hiveAuthFactory); + TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType, serviceUGI, httpUGI, + hiveAuthFactory); // Context handler - final ServletContextHandler context = new ServletContextHandler( - ServletContextHandler.SESSIONS); + final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath("/"); - if (hiveConf.getBoolean(ConfVars.HIVE_SERVER2_XSRF_FILTER_ENABLED.varname, false)){ + if (hiveConf.getBoolean(ConfVars.HIVE_SERVER2_XSRF_FILTER_ENABLED.varname, false)) { // context.addFilter(Utils.getXSRFFilterHolder(null, null), "/" , - // FilterMapping.REQUEST); + // FilterMapping.REQUEST); // Filtering does not work here currently, doing filter in ThriftHttpServlet LOG.debug("XSRF filter enabled"); } else { @@ -183,33 +183,45 @@ public void contextDestroyed(ServletContextEvent servletContextEvent) { } }); - final String httpPath = getHttpPath(hiveConf - .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); + final String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_SERVER2_THRIFT_HTTP_COMPRESSION_ENABLED)) { final GzipHandler gzipHandler = new GzipHandler(); gzipHandler.setHandler(context); gzipHandler.addIncludedMethods(HttpMethod.POST); gzipHandler.addIncludedMimeTypes(APPLICATION_THRIFT); - httpServer.setHandler(gzipHandler); + server.setHandler(gzipHandler); } else { - httpServer.setHandler(context); + server.setHandler(context); } context.addServlet(new ServletHolder(thriftHttpServlet), httpPath); - // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc. + // TODO: check defaults: maxTimeout, keepalive, maxBodySize, + // bodyRecieveDuration, etc. // Finally, start the server - httpServer.start(); + server.start(); String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); - httpServer.join(); + } catch (Exception e) { + throw new RuntimeException("Failed to init HttpServer", e); + } + } + + @Override + public void run() { + try { + server.join(); } catch (Throwable t) { - LOG.error( - "Error starting HiveServer2: could not start " - + ThriftHttpCLIService.class.getSimpleName(), t); - System.exit(-1); + if (t instanceof InterruptedException) { + // This is likely a shutdown + LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down thrift server."); + } else { + LOG.error("Exception caught by " + ThriftHttpCLIService.class.getSimpleName() + + ". Exiting.", t); + System.exit(-1); + } } } @@ -236,4 +248,18 @@ private String getHttpPath(String httpPath) { } return httpPath; } + + @Override + protected void stopServer() { + if((server != null) && server.isStarted()) { + try { + server.stop(); + server = null; + LOG.info("Thrift HTTP server has been stopped"); + } catch (Exception e) { + LOG.error("Error stopping HTTP server: ", e); + } + } + } + }