diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7f4afd9..9786c2e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.conf.Validator.PatternSet; import org.apache.hadoop.hive.conf.Validator.RangeValidator; import org.apache.hadoop.hive.conf.Validator.StringSet; +import org.apache.hadoop.hive.conf.Validator.TimeValidator; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; @@ -1579,6 +1581,15 @@ HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), + HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "0s", new TimeValidator(), + "The check interval for session/operation timeout, which can be disabled by setting zero or minus value."), + HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", "0s", new TimeValidator(), + "Session will be closed when it's not accessed for this duration, which can be disabled by setting zero or minus value."), + HIVE_SERVER2_IDLE_OPERATION_TIMEOUT("hive.server2.idle.operation.timeout", "0s", new TimeValidator(), + "Operation will be closed when it's not accessed for this duration of time, which can be disabled by setting zero value.\n" + + " With positive value, it's checked for operations in terminal state only (FINISHED, CANCELED, CLOSED, ERROR).\n" + + " With negative value, it's checked for all of the operations regardless of state."), + HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role", "Comma separated list of configuration options which are immutable at runtime"), @@ -1968,6 +1979,53 @@ public void setIntVar(ConfVars var, int val) { setIntVar(this, var, val); } + public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) { + assert (var.valClass == String.class) : var.varname; + return toTime(getVar(conf, var), outUnit); + } + + public long getTimeVar(ConfVars var, TimeUnit outUnit) { + String value = getVar(this, var); + return toTime(value, outUnit); + } + + public static long toTime(String value, TimeUnit outUnit) { + String[] parsed = parseTime(value.trim()); + return toTime(parsed[0].trim(), parsed[1].trim(), outUnit); + } + + private static String[] parseTime(String value) { + char[] chars = value.toCharArray(); + int i = 0; + for (; i < chars.length && (chars[i] == '-' || Character.isDigit(chars[i])); i++) { + } + return new String[] {value.substring(0, i), value.substring(i)}; + } + + public static long toTime(String timePart, String unitPart, TimeUnit outUnit) { + return outUnit.convert(Long.valueOf(timePart), unitFor(unitPart)); + } + + public static TimeUnit unitFor(String unit) { + unit = unit.toLowerCase(); + if (unit.equals("d") || unit.startsWith("day")) { + return TimeUnit.DAYS; + } else if (unit.equals("h") || unit.startsWith("hour")) { + return TimeUnit.HOURS; + } else if (unit.equals("m") || unit.startsWith("min")) { + return TimeUnit.MINUTES; + } else if (unit.equals("s") || unit.startsWith("sec")) { + return TimeUnit.SECONDS; + } else if (unit.equals("ms") || unit.startsWith("msec")) { + return TimeUnit.MILLISECONDS; + } else if (unit.equals("us") || unit.startsWith("usec")) { + return TimeUnit.MICROSECONDS; + } else if (unit.equals("ns") || unit.startsWith("nsec")) { + return TimeUnit.NANOSECONDS; + } + throw new IllegalArgumentException("Invalid time unit " + unit); + } + public static long getLongVar(Configuration conf, ConfVars var) { assert (var.valClass == Long.class) : var.varname; return conf.getLong(var.varname, var.defaultLongVal); diff --git common/src/java/org/apache/hadoop/hive/conf/Validator.java common/src/java/org/apache/hadoop/hive/conf/Validator.java index cea9c41..06dc6f9 100644 --- common/src/java/org/apache/hadoop/hive/conf/Validator.java +++ common/src/java/org/apache/hadoop/hive/conf/Validator.java @@ -22,6 +22,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; /** @@ -156,4 +157,36 @@ public String validate(String value) { return null; } } + + public static class TimeValidator implements Validator { + + private TimeUnit timeUnit = TimeUnit.SECONDS; + private Long min; + private Long max; + + public TimeValidator() { + } + + public TimeValidator(Long min, Long max, TimeUnit timeUnit) { + this.timeUnit = timeUnit; + this.min = min; + this.max = max; + } + + @Override + public String validate(String value) { + try { + long time = HiveConf.toTime(value, timeUnit); + if (min != null && time < min) { + return value + " is smaller than " + min + " " + timeUnit.name().toLowerCase(); + } + if (max != null && time > max) { + return value + " is bigger than " + max + " " + timeUnit.name().toLowerCase(); + } + } catch (Exception e) { + return e.toString(); + } + return null; + } + } } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2SessionTimeout.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2SessionTimeout.java new file mode 100644 index 0000000..a7c9176 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2SessionTimeout.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc.miniHS2; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.service.cli.CLIServiceClient; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.SessionHandle; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + +public class TestHiveServer2SessionTimeout { + + private static MiniHS2 miniHS2 = null; + private Map confOverlay; + + @BeforeClass + public static void beforeTest() throws Exception { + miniHS2 = new MiniHS2(new HiveConf()); + } + + @Before + public void setUp() throws Exception { + confOverlay = new HashMap(); + confOverlay.put(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + confOverlay.put(ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL.varname, "3s"); + confOverlay.put(ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT.varname, "3s"); + miniHS2.start(confOverlay); + } + + @After + public void tearDown() throws Exception { + miniHS2.stop(); + } + + @Test + public void testConnection() throws Exception { + CLIServiceClient serviceClient = miniHS2.getServiceClient(); + SessionHandle sessHandle = serviceClient.openSession("foo", "bar"); + OperationHandle handle = serviceClient.executeStatement(sessHandle, "SELECT 1", confOverlay); + Thread.sleep(7000); + try { + serviceClient.closeOperation(handle); + assertTrue("Operation should have been closed by timeout!", false); + } catch (HiveSQLException e) { + assertTrue(StringUtils.stringifyException(e), e.getMessage().contains("Invalid OperationHandle")); + } + } +} diff --git service/src/java/org/apache/hive/service/cli/OperationState.java service/src/java/org/apache/hive/service/cli/OperationState.java index 3e15f0c..b539ae1 100644 --- service/src/java/org/apache/hive/service/cli/OperationState.java +++ service/src/java/org/apache/hive/service/cli/OperationState.java @@ -25,29 +25,26 @@ * */ public enum OperationState { - INITIALIZED(TOperationState.INITIALIZED_STATE), - RUNNING(TOperationState.RUNNING_STATE), - FINISHED(TOperationState.FINISHED_STATE), - CANCELED(TOperationState.CANCELED_STATE), - CLOSED(TOperationState.CLOSED_STATE), - ERROR(TOperationState.ERROR_STATE), - UNKNOWN(TOperationState.UKNOWN_STATE), - PENDING(TOperationState.PENDING_STATE); + INITIALIZED(TOperationState.INITIALIZED_STATE, false), + RUNNING(TOperationState.RUNNING_STATE, false), + FINISHED(TOperationState.FINISHED_STATE, true), + CANCELED(TOperationState.CANCELED_STATE, true), + CLOSED(TOperationState.CLOSED_STATE, true), + ERROR(TOperationState.ERROR_STATE, true), + UNKNOWN(TOperationState.UKNOWN_STATE, false), + PENDING(TOperationState.PENDING_STATE, false); private final TOperationState tOperationState; + private final boolean terminal; - OperationState(TOperationState tOperationState) { + private OperationState(TOperationState tOperationState, boolean terminal) { this.tOperationState = tOperationState; + this.terminal = terminal; } + // must be sync with TOperationState in order public static OperationState getOperationState(TOperationState tOperationState) { - // TODO: replace this with a Map? - for (OperationState opState : values()) { - if (tOperationState.equals(opState.tOperationState)) { - return opState; - } - } - return OperationState.UNKNOWN; + return OperationState.values()[tOperationState.getValue()]; } public static void validateTransition(OperationState oldState, @@ -91,7 +88,8 @@ public static void validateTransition(OperationState oldState, default: // fall-through } - throw new HiveSQLException("Illegal Operation state transition"); + throw new HiveSQLException("Illegal Operation state transition " + + "from " + oldState + " to " + newState); } public void validateTransition(OperationState newState) @@ -102,4 +100,8 @@ public void validateTransition(OperationState newState) public TOperationState toTOperationState() { return tOperationState; } + + public boolean isTerminal() { + return terminal; + } } diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 45fbd61..2fb0335 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -19,6 +19,7 @@ import java.util.EnumSet; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,14 +48,19 @@ protected final boolean runAsync; protected volatile Future backgroundHandle; + private long operationTimeout; + private long lastAccessTime; + protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) { - super(); this.parentSession = parentSession; this.runAsync = runInBackground; this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); + lastAccessTime = System.currentTimeMillis(); + operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), + HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); } public Future getBackgroundHandle() { @@ -106,10 +112,33 @@ protected void setHasResultSet(boolean hasResultSet) { opHandle.setHasResultSet(hasResultSet); } - protected final OperationState setState(OperationState newState) throws HiveSQLException { + public boolean isTimedOut(long current) { + if (operationTimeout == 0) { + return false; + } + if (operationTimeout > 0) { + // check only when it's in terminal state + return state.isTerminal() && getLastAccessTime() + operationTimeout <= current; + } + return getLastAccessTime() + -operationTimeout <= current; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public long getOperationTimeout() { + return operationTimeout; + } + + public void setOperationTimeout(long operationTimeout) { + this.operationTimeout = operationTimeout; + } + + protected final void setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); this.state = newState; - return this.state; + this.lastAccessTime = System.currentTimeMillis(); } protected void setOperationException(HiveSQLException operationException) { @@ -120,6 +149,7 @@ protected final void assertState(OperationState state) throws HiveSQLException { if (this.state != state) { throw new HiveSQLException("Expected state " + state + ", but found " + this.state); } + this.lastAccessTime = System.currentTimeMillis(); } public boolean isRunning() { diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 21c33bc..880d18d 100644 --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -129,15 +131,27 @@ public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession, return operation; } - public synchronized Operation getOperation(OperationHandle operationHandle) - throws HiveSQLException { - Operation operation = handleToOperation.get(operationHandle); + public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException { + Operation operation = getOperationInternal(operationHandle); if (operation == null) { throw new HiveSQLException("Invalid OperationHandle: " + operationHandle); } return operation; } + private synchronized Operation getOperationInternal(OperationHandle operationHandle) { + return handleToOperation.get(operationHandle); + } + + private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) { + Operation operation = handleToOperation.get(operationHandle); + if (operation != null && operation.isTimedOut(System.currentTimeMillis())) { + handleToOperation.remove(operationHandle); + return operation; + } + return null; + } + private synchronized void addOperation(Operation operation) { handleToOperation.put(operation.getHandle(), operation); } @@ -191,4 +205,16 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle, throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); } + + public List removeExpiredOperations(OperationHandle[] handles) { + List removed = new ArrayList(); + for (OperationHandle handle : handles) { + Operation operation = removeTimedOutOperation(handle); + if (operation != null) { + LOG.warn("Operation " + handle + " is timed-out and will be closed"); + removed.add(operation); + } + } + return removed; + } } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 9785e95..cc93e49 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -157,4 +157,6 @@ public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr) public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; + + public void closeExpiredOperations(); } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java index eee1cc6..409a4ca 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java @@ -74,4 +74,6 @@ public String getIpAddress(); public void setIpAddress(String ipAddress); + + public long getLastAccessTime(); } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index bc0a02c..55719b2 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -60,6 +60,7 @@ import org.apache.hive.service.cli.operation.GetTableTypesOperation; import org.apache.hive.service.cli.operation.GetTypeInfoOperation; import org.apache.hive.service.cli.operation.MetadataOperation; +import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; @@ -87,6 +88,8 @@ private IMetaStoreClient metastoreClient = null; private final Set opHandleSet = new HashSet(); + private long lastAccessTime; + public HiveSessionImpl(TProtocolVersion protocol, String username, String password, HiveConf serverhiveConf, String ipAddress) { this.username = username; @@ -106,6 +109,8 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo sessionState = new SessionState(hiveConf, username); sessionState.setUserIpAddress(ipAddress); sessionState.setIsHiveServerQuery(true); + + lastAccessTime = System.currentTimeMillis(); SessionState.start(sessionState); } @@ -210,15 +215,20 @@ public void open() { SessionState.start(sessionState); } - protected synchronized void acquire() throws HiveSQLException { + protected synchronized void acquire(boolean userAccess) { // need to make sure that the this connections session state is // stored in the thread local for sessions. SessionState.setCurrentSessionState(sessionState); + if (userAccess) { + lastAccessTime = System.currentTimeMillis(); + } } - protected synchronized void release() { - assert sessionState != null; + protected synchronized void release(boolean userAccess) { SessionState.detachSession(); + if (userAccess) { + lastAccessTime = System.currentTimeMillis(); + } } @Override @@ -257,7 +267,7 @@ public IMetaStoreClient getMetaStoreClient() throws HiveSQLException { @Override public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException { - acquire(); + acquire(true); try { switch (getInfoType) { case CLI_SERVER_NAME: @@ -277,7 +287,7 @@ public GetInfoValue getInfo(GetInfoType getInfoType) throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString()); } } finally { - release(); + release(true); } } @@ -296,7 +306,7 @@ public OperationHandle executeStatementAsync(String statement, Map confOverlay, boolean runAsync) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); ExecuteStatementOperation operation = operationManager @@ -314,14 +324,14 @@ private OperationHandle executeStatementInternal(String statement, Map tableTypes) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); MetadataOperation operation = @@ -397,14 +407,14 @@ public OperationHandle getTables(String catalogName, String schemaName, String t operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getTableTypes() throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession()); @@ -417,14 +427,14 @@ public OperationHandle getTableTypes() operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getColumns(String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(), @@ -438,14 +448,14 @@ public OperationHandle getColumns(String catalogName, String schemaName, operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getFunctions(String catalogName, String schemaName, String functionName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetFunctionsOperation operation = operationManager @@ -459,14 +469,14 @@ public OperationHandle getFunctions(String catalogName, String schemaName, Strin operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public void close() throws HiveSQLException { try { - acquire(); + acquire(true); /** * For metadata operations like getTables(), getColumns() etc, * the session allocates a private metastore handler which should be @@ -488,7 +498,7 @@ public void close() throws HiveSQLException { } catch (IOException ioe) { throw new HiveSQLException("Failure to close", ioe); } finally { - release(); + release(true); } } @@ -507,56 +517,87 @@ public void setUserName(String userName) { this.username = userName; } + public long getLastAccessTime() { + return lastAccessTime; + } + + @Override + public void closeExpiredOperations() { + OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[0]); + if (handles.length > 0) { + List operations = operationManager.removeExpiredOperations(handles); + if (!operations.isEmpty()) { + closeTimedOutOperations(operations); + } + } + } + + private void closeTimedOutOperations(List operations) { + acquire(false); + try { + for (Operation operation : operations) { + opHandleSet.remove(operation.getHandle()); + try { + operation.close(); + } catch (Exception e) { + LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e); + } + } + } finally { + release(false); + } + } + @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { sessionManager.getOperationManager().cancelOperation(opHandle); } finally { - release(); + release(true); } } @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { operationManager.closeOperation(opHandle); opHandleSet.remove(opHandle); } finally { - release(); + release(true); } } @Override public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle); } finally { - release(); + release(true); } } @Override public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows) throws HiveSQLException { - acquire(); + acquire(true); try { return sessionManager.getOperationManager() .getOperationNextRowSet(opHandle, orientation, maxRows); } finally { - release(); + release(true); } } @Override public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { return sessionManager.getOperationManager().getOperationNextRowSet(opHandle); } finally { - release(); + release(true); } } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java index 39d2184..2ac8415 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java @@ -19,7 +19,6 @@ package org.apache.hive.service.cli.session; import java.io.IOException; -import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -75,8 +74,8 @@ public String getDelegationToken () { } @Override - protected synchronized void acquire() throws HiveSQLException { - super.acquire(); + protected synchronized void acquire(boolean userAccess) { + super.acquire(userAccess); // if we have a metastore connection with impersonation, then set it first if (sessionHive != null) { Hive.set(sessionHive); @@ -90,11 +89,11 @@ protected synchronized void acquire() throws HiveSQLException { @Override public void close() throws HiveSQLException { try { - acquire(); + acquire(true); ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi); cancelDelegationToken(); } finally { - release(); + release(true); super.close(); } } diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index d573592..b45b02d 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.session; +import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -53,6 +55,12 @@ private final OperationManager operationManager = new OperationManager(); private ThreadPoolExecutor backgroundOperationPool; + private Thread timeoutChecker; + private long checkInterval; + private long sessionTimeout; + + private volatile boolean shutdown; + public SessionManager() { super("SessionManager"); } @@ -78,6 +86,10 @@ public synchronized void init(HiveConf hiveConf) { backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize)); backgroundOperationPool.allowCoreThreadTimeOut(true); + checkInterval = HiveConf.getTimeVar(hiveConf, + ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + sessionTimeout = HiveConf.getTimeVar(hiveConf, + ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); addService(operationManager); super.init(hiveConf); } @@ -94,6 +106,42 @@ private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveExc @Override public synchronized void start() { super.start(); + if (checkInterval <= 0) { + return; + } + final long interval = Math.max(checkInterval, 3000l); // minimum 3 seconds + timeoutChecker = new Thread(new Runnable() { + public void run() { + for (sleepInterval(interval); !shutdown; sleepInterval(interval)) { + long current = System.currentTimeMillis(); + for (HiveSession session : new ArrayList(handleToSession.values())) { + if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) { + SessionHandle handle = session.getSessionHandle(); + LOG.warn("Session " + handle + " is Timed-out (last access : " + + new Date(session.getLastAccessTime()) + ") and will be closed"); + try { + closeSession(handle); + } catch (HiveSQLException e) { + LOG.warn("Exception is thrown closing session " + handle, e); + } + } else { + session.closeExpiredOperations(); + } + } + } + } + + private void sleepInterval(long interval) { + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + // ignore + } + } + }); + timeoutChecker.setName("Session Checker [" + interval + " msec]"); + timeoutChecker.setDaemon(true); + timeoutChecker.start(); } @Override @@ -109,6 +157,10 @@ public synchronized void stop() { " seconds has been exceeded. RUNNING background operations will be shut down", e); } } + shutdown = true; + if (timeoutChecker != null) { + timeoutChecker.interrupt(); + } } public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,