diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index b46c5b4..f5751f1 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -44,6 +44,7 @@ import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.HiveServer2; /** * CLIService. @@ -64,15 +65,18 @@ private SessionManager sessionManager; private UserGroupInformation serviceUGI; private UserGroupInformation httpUGI; + // The HiveServer2 instance running this service + private final HiveServer2 hiveServer2; - public CLIService() { + public CLIService(HiveServer2 hiveServer2) { super(CLIService.class.getSimpleName()); + this.hiveServer2 = hiveServer2; } @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - sessionManager = new SessionManager(); + sessionManager = new SessionManager(hiveServer2); addService(sessionManager); // If the hadoop cluster is secure, do a kerberos login for the service from the keytab if (ShimLoader.getHadoopShims().isSecurityEnabled()) { @@ -201,7 +205,8 @@ public SessionHandle openSessionWithImpersonation(String username, String passwo * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle) */ @Override - public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { + public void closeSession(SessionHandle sessionHandle) + throws HiveSQLException { sessionManager.closeSession(sessionHandle); LOG.debug(sessionHandle + ": closeSession()"); } diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index ecc9b96..137359b 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -43,6 +43,7 @@ import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.HiveServer2; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; /** @@ -65,9 +66,12 @@ private long sessionTimeout; private volatile boolean shutdown; + // The HiveServer2 instance running this service + private final HiveServer2 hiveServer2; - public SessionManager() { + public SessionManager(HiveServer2 hiveServer2) { super(SessionManager.class.getSimpleName()); + this.hiveServer2 = hiveServer2; } @Override @@ -232,10 +236,10 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, Str /** * Opens a new session and creates a session handle. * The username passed to this method is the effective username. - * If withImpersonation is true (==doAs true) we wrap all the calls in HiveSession + * If withImpersonation is true (==doAs true) we wrap all the calls in HiveSession * within a UGI.doAs, where UGI corresponds to the effective user. - * @see org.apache.hive.service.cli.thrift.ThriftCLIService#getUserName() - * + * @see org.apache.hive.service.cli.thrift.ThriftCLIService#getUserName() + * * @param protocol * @param username * @param password @@ -288,6 +292,24 @@ public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { throw new HiveSQLException("Session does not exist!"); } session.close(); + // Shutdown HiveServer2 if it has been deregistered from ZooKeeper and has no active sessions + if (!(hiveServer2 == null) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) + && (!hiveServer2.isRegisteredWithZooKeeper())) { + // Asynchronously shutdown this instance of HiveServer2, + // if there are no active client sessions + if (getOpenSessionCount() == 0) { + LOG.info("This instance of HiveServer2 has been removed from the list of server " + + "instances available for dynamic service discovery. " + + "The last client session has ended - will shutdown now."); + Thread shutdownThread = new Thread() { + @Override + public void run() { + hiveServer2.stop(); + } + }; + shutdownThread.start(); + } + } } public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException { @@ -376,6 +398,5 @@ private void executeSessionHooks(HiveSession session) throws Exception { public int getOpenSessionCount() { return handleToSession.size(); } - } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java index 9ee9785..61700c1 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java @@ -30,7 +30,7 @@ public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService { public EmbeddedThriftBinaryCLIService() { - super(new CLIService()); + super(new CLIService(null)); isEmbedded = true; HiveConf.setLoadHiveServer2Config(true); cliService.init(new HiveConf()); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 4a1e004..a0a6e18 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -66,7 +66,6 @@ protected int minWorkerThreads; protected int maxWorkerThreads; protected long workerKeepAliveTime; - private HiveServer2 hiveServer2; public ThriftCLIService(CLIService cliService, String serviceName) { super(serviceName); @@ -264,9 +263,9 @@ private String getIpAddress() { /** * Returns the effective username. - * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user + * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user * 2. If hive.server2.allow.user.substitution = true: the username of the end user, - * that the connecting user is trying to proxy for. + * that the connecting user is trying to proxy for. * This includes a check whether the connecting user is allowed to proxy for the end user. * @param req * @return @@ -366,24 +365,6 @@ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { } catch (Exception e) { LOG.warn("Error closing session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); - } finally { - if (!(isEmbedded) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) - && (!hiveServer2.isRegisteredWithZooKeeper())) { - // Asynchronously shutdown this instance of HiveServer2, - // if there are no active client sessions - if (cliService.getSessionManager().getOpenSessionCount() == 0) { - LOG.info("This instance of HiveServer2 has been removed from the list of server " - + "instances available for dynamic service discovery. " - + "The last client session has ended - will shutdown now."); - Thread shutdownThread = new Thread() { - @Override - public void run() { - hiveServer2.stop(); - } - }; - shutdownThread.start(); - } - } } return resp; } @@ -666,10 +647,4 @@ private boolean isKerberosAuthMode() { return cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION) .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString()); } - - public void setHiveServer2(HiveServer2 hiveServer2) { - this.hiveServer2 = hiveServer2; - } - } - diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index c667533..89876d5 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -64,7 +64,7 @@ public HiveServer2() { @Override public synchronized void init(HiveConf hiveConf) { - cliService = new CLIService(); + cliService = new CLIService(this); addService(cliService); if (isHTTPTransportMode(hiveConf)) { thriftCLIService = new ThriftHttpCLIService(cliService); @@ -72,7 +72,6 @@ public synchronized void init(HiveConf hiveConf) { thriftCLIService = new ThriftBinaryCLIService(cliService); } addService(thriftCLIService); - thriftCLIService.setHiveServer2(this); super.init(hiveConf); // Add a shutdown hook for catching SIGTERM & SIGINT diff --git a/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java b/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java index fb784aa..03f3964 100644 --- a/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java +++ b/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java @@ -39,7 +39,7 @@ public void testDoAsSetting(){ hconf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)); - CLIService cliService = new CLIService(); + CLIService cliService = new CLIService(null); cliService.init(hconf); ThriftCLIService tcliService = new ThriftBinaryCLIService(cliService); tcliService.init(hconf); diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java index 47d3a56..37b698b 100644 --- a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java +++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java @@ -52,7 +52,7 @@ */ private class FakeEmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService { public FakeEmbeddedThriftBinaryCLIService(HiveConf hiveConf) { - super(new CLIService()); + super(new CLIService(null)); isEmbedded = true; cliService.init(hiveConf); cliService.start();