diff --git a/common/src/java/org/apache/hadoop/hive/common/thrift/HiveThriftChainedEventHandler.java b/common/src/java/org/apache/hadoop/hive/common/thrift/HiveThriftChainedEventHandler.java new file mode 100644 index 0000000..1e44d5c --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/thrift/HiveThriftChainedEventHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.thrift; + +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TTransport; + +import java.util.ArrayList; +import java.util.List; + +public class HiveThriftChainedEventHandler implements TServerEventHandler { + + protected List eventHandlers = new ArrayList(); + + public HiveThriftChainedEventHandler() { + } + + public void addEventHandler(TServerEventHandler eventHandler) { + eventHandlers.add(eventHandler); + } + + @Override + public void preServe() { + for (TServerEventHandler eventHandler : eventHandlers) { + eventHandler.preServe(); + } + } + + @Override + public ServerContext createContext(TProtocol input, TProtocol output) { + ServerContextList serverContexts = new ServerContextList(); + for (TServerEventHandler eventHandler : eventHandlers) { + serverContexts.add(eventHandler.createContext(input, output)); + } + return serverContexts; + } + + @Override + public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { + ServerContextList serverContexts = (ServerContextList) serverContext; + for (int i = 0; i < eventHandlers.size(); i++) { + eventHandlers.get(i).deleteContext(serverContexts.get(i), input, output); + } + } + + @Override + public void processContext(ServerContext serverContext, TTransport input, TTransport output) { + ServerContextList serverContexts = (ServerContextList) serverContext; + for (int i = 0; i < eventHandlers.size(); i++) { + eventHandlers.get(i).processContext(serverContexts.get(i), input, output); + } + } + + public static class ServerContextList extends ArrayList implements ServerContext { + } +} diff --git a/jdbc/src/test/org/apache/hive/service/cli/thrift/TestDisconnectCleanupEventHandler.java b/jdbc/src/test/org/apache/hive/service/cli/thrift/TestDisconnectCleanupEventHandler.java new file mode 100644 index 0000000..2fa3f03 --- /dev/null +++ b/jdbc/src/test/org/apache/hive/service/cli/thrift/TestDisconnectCleanupEventHandler.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.thrift; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.SessionHandle; + +import junit.framework.TestCase; +import junit.framework.Assert; + +/* + * This class only tests the basic APIs. The integrated tests are done by other + * test cases like TestJdbcDriver2 etc. + */ +public class TestDisconnectCleanupEventHandler extends TestCase { + + private ThriftCLIService thriftCLIService; + private CLIService cliService; + + @Override + public void setUp() { + cliService = new CLIService(); + cliService.init(new HiveConf()); + thriftCLIService = new ThriftBinaryCLIService(cliService); + } + + @Override + public void tearDown() { + } + + /* + * This sequence is very similar to how HiveServe2 calls the EventHandler. + */ + public void testEventHandler() { + DisconnectCleanupEventHandler eventHandler = new DisconnectCleanupEventHandler(thriftCLIService, new HiveConf()); + + Assert.assertNull(DisconnectCleanupEventHandler.getTSessionHandle()); + Assert.assertNull(DisconnectCleanupEventHandler.getUser()); + Assert.assertFalse(DisconnectCleanupEventHandler.isSessionValid()); + + SessionHandle handle = new SessionHandle(); + DisconnectCleanupEventHandler.addSessionID(handle.toTSessionHandle(), "user1"); + + Assert.assertEquals(DisconnectCleanupEventHandler.getTSessionHandle(), handle.toTSessionHandle()); + Assert.assertEquals(DisconnectCleanupEventHandler.getUser(), "user1"); + + eventHandler.deleteContext(null, null, null); + Assert.assertNull(DisconnectCleanupEventHandler.getTSessionHandle()); + Assert.assertNull(DisconnectCleanupEventHandler.getUser()); + Assert.assertFalse(DisconnectCleanupEventHandler.isSessionValid()); + } +} diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index 1a7f338..1ee83e9 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -55,6 +55,10 @@ public CLIService() { super("CLIService"); } + public SessionManager getSessionManager() { + return sessionManager; + } + @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index f392d62..e915342 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -141,6 +141,10 @@ public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLExcepti return session; } + public boolean isSessionValid(SessionHandle sessionHandle) { + return handleToSession.get(sessionHandle) != null; + } + public OperationManager getOperationManager() { return operationManager; } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/DisconnectCleanupEventHandler.java b/service/src/java/org/apache/hive/service/cli/thrift/DisconnectCleanupEventHandler.java new file mode 100644 index 0000000..234c742 --- /dev/null +++ b/service/src/java/org/apache/hive/service/cli/thrift/DisconnectCleanupEventHandler.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.thrift; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.SessionHandle; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TTransport; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +/** + * This Event Handler takes care of cleaning up resources due to disconnected sessions. + */ +public class DisconnectCleanupEventHandler implements TServerEventHandler { + + private static final Log LOG = LogFactory.getLog(DisconnectCleanupEventHandler.class); + + private static ThriftCLIService thriftCliService = null; + private static HiveConf hiveConf = null; + + /* + * ThreadLocal state required for cleaning up resources for each Thrift Worker thread. + */ + private static ThreadLocal threadLocalUser = + new ThreadLocal() { + @Override + protected synchronized String initialValue() { + return null; + } + }; + private static ThreadLocal threadLocalSessionHandle = + new ThreadLocal() { + @Override + protected synchronized TSessionHandle initialValue() { + return null; + } + }; + + public DisconnectCleanupEventHandler(ThriftCLIService sqlService, HiveConf conf) { + thriftCliService = sqlService; + hiveConf = conf; + } + + /* + * A JDBC Client is associated with a User and SessionHandle and one Worker thread is + * required per connection. This information is required to cleanup the session + * if the client disconnects abruptly. Save these information as soon as a session + * is opened. + */ + public static void addSessionID(TSessionHandle tSessionHandle, String user) { + + LOG.info("Adding session handle for user " + user + " to cache " + tSessionHandle); + threadLocalSessionHandle.set(tSessionHandle); + threadLocalUser.set(user); + } + + @Override + public void preServe() { + } + + @Override + public ServerContext createContext(TProtocol input, TProtocol output) { + return null; + } + + protected static String getUser() { + return threadLocalUser.get(); + } + + protected static TSessionHandle getTSessionHandle() { + return threadLocalSessionHandle.get(); + } + + protected static SessionHandle getSessionHandle() { + return new SessionHandle(threadLocalSessionHandle.get()); + } + + protected static boolean isSessionValid() { + return getTSessionHandle() != null && + thriftCliService.getCLIService().getSessionManager().isSessionValid(getSessionHandle()); + } + + /* + * Close the session similar to how a JDBC Client would close it. + */ + @Override + public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { + + assert thriftCliService != null; + + try { + LOG.info("Cleaning up resources for SessionHandle " + getTSessionHandle() + " on behalf of " + getUser()); + /* + * Client closed connection or did not establish a session, nothing to do. + * TSessionHandle or User could be null if there was a disconnection before a session was created. + */ + if (!isSessionValid() || getTSessionHandle() == null || getUser() == null) { + return; + } + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + closeSessionDoAs(); + } else { + cleanupSession(); + } + + } catch (Throwable e) { + // Just log the exceptions and don't throw anything that disrupts Thrift Server + LOG.error("Error closing a disconnected session...", e); + + } finally { + + // Reset the thread local variables, let them not be misused in anyway. + threadLocalSessionHandle.remove(); + threadLocalUser.remove(); + } + } + + private void cleanupSession() throws HiveSQLException { + thriftCliService.CloseSessionHandle(getTSessionHandle()); + } + + private void closeSessionDoAs() throws IOException, InterruptedException { + UserGroupInformation.createProxyUser(getUser(), UserGroupInformation.getLoginUser()). + doAs(new PrivilegedExceptionAction() { + public Boolean run() throws HiveSQLException { + cleanupSession(); + return true; // We don't care about any return values for now + } + }); + } + + @Override + public void processContext(ServerContext serverContext, TTransport tTransport, TTransport tTransport2) { + } +} diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 9c8f5c1..38b6a45 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; +import org.apache.hadoop.hive.common.thrift.HiveThriftChainedEventHandler; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; @@ -72,6 +73,9 @@ public void run() { .maxWorkerThreads(maxWorkerThreads); server = new TThreadPoolServer(sargs); + HiveThriftChainedEventHandler eventHandlers = new HiveThriftChainedEventHandler(); + eventHandlers.addEventHandler(new DisconnectCleanupEventHandler(this, hiveConf)); + server.setServerEventHandler(eventHandlers); LOG.info("ThriftBinaryCLIService listening on " + serverAddress); @@ -82,4 +86,4 @@ public void run() { } } -} \ No newline at end of file +} diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 857e627..0cb240a 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -76,6 +76,10 @@ public ThriftCLIService(CLIService cliService, String serviceName) { this.cliService = cliService; } + public CLIService getCLIService() { + return cliService; + } + @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -170,8 +174,7 @@ SessionHandle getSessionHandle(TOpenSessionReq req) public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { TCloseSessionResp resp = new TCloseSessionResp(); try { - SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); - cliService.closeSession(sessionHandle); + CloseSessionHandle(req.getSessionHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error closing session: ", e); @@ -180,6 +183,11 @@ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { return resp; } + public void CloseSessionHandle(TSessionHandle tSessionHandle) throws HiveSQLException { + SessionHandle sessionHandle = new SessionHandle(tSessionHandle); + cliService.closeSession(sessionHandle); + } + @Override public TGetInfoResp GetInfo(TGetInfoReq req) throws TException { TGetInfoResp resp = new TGetInfoResp();