diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 32ab3d8..f9348cf 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -789,6 +789,9 @@ HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", ""), HIVE_SERVER2_SSL_KEYSTORE_PASSWORD("hive.server2.keystore.password", ""), + HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", 0l), + HIVE_SERVER2_IDLE_OPERATION_TIMEOUT("hive.server2.idle.operation.timeout", 0l), + HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,delete,compile"), HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", ""), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index c574ab5..a30ee6f 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -885,6 +885,22 @@ + hive.server2.idle.session.timeout + 0 + Session will be closed when it's not accessed for this duration (msec). + Should be a positive value + + + + hive.server2.idle.operation.timeout + 0 + Operation will be closed when it's not accessed for this duration (msec). + With positive value, it's checked for operations in termial state (FINISHED, CANCELED, CLOSED, ERROR). + With negative vable, it's checked for all of the operations regardless of state. + + + + hive.server2.thrift.http.port 10001 Port number when in HTTP mode. diff --git service/src/java/org/apache/hive/service/cli/OperationState.java service/src/java/org/apache/hive/service/cli/OperationState.java index 1ec6bd1..0f0170f 100644 --- service/src/java/org/apache/hive/service/cli/OperationState.java +++ service/src/java/org/apache/hive/service/cli/OperationState.java @@ -25,30 +25,25 @@ * */ public enum OperationState { - INITIALIZED(TOperationState.INITIALIZED_STATE), - RUNNING(TOperationState.RUNNING_STATE), - FINISHED(TOperationState.FINISHED_STATE), - CANCELED(TOperationState.CANCELED_STATE), - CLOSED(TOperationState.CLOSED_STATE), - ERROR(TOperationState.ERROR_STATE), - UNKNOWN(TOperationState.UKNOWN_STATE), - PENDING(TOperationState.PENDING_STATE); + INITIALIZED(TOperationState.INITIALIZED_STATE, false), + RUNNING(TOperationState.RUNNING_STATE, false), + FINISHED(TOperationState.FINISHED_STATE, true), + CANCELED(TOperationState.CANCELED_STATE, true), + CLOSED(TOperationState.CLOSED_STATE, true), + ERROR(TOperationState.ERROR_STATE, true), + UNKNOWN(TOperationState.UKNOWN_STATE, false), + PENDING(TOperationState.PENDING_STATE, false); private final TOperationState tOperationState; + private final boolean terminal; - OperationState(TOperationState tOperationState) { + private OperationState(TOperationState tOperationState, boolean terminal) { this.tOperationState = tOperationState; + this.terminal = terminal; } - public static OperationState getOperationState(TOperationState tOperationState) { - // TODO: replace this with a Map? - for (OperationState opState : values()) { - if (tOperationState.equals(opState.tOperationState)) { - return opState; - } - } - return OperationState.UNKNOWN; + return OperationState.values()[tOperationState.getValue()]; } public static void validateTransition(OperationState oldState, OperationState newState) @@ -101,4 +96,8 @@ public void validateTransition(OperationState newState) public TOperationState toTOperationState() { return tOperationState; } + + public boolean isTerminal() { + return terminal; + } } diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 6f4b8dc..fa26f3e 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -40,10 +40,13 @@ public static final long DEFAULT_FETCH_MAX_ROWS = 100; protected boolean hasResultSet; + private long lastAccessTime; + protected Operation(HiveSession parentSession, OperationType opType) { super(); this.parentSession = parentSession; opHandle = new OperationHandle(opType); + lastAccessTime = System.currentTimeMillis(); } public void setConfiguration(HiveConf configuration) { @@ -79,16 +82,34 @@ protected void setHasResultSet(boolean hasResultSet) { opHandle.setHasResultSet(hasResultSet); } - protected final OperationState setState(OperationState newState) throws HiveSQLException { + public boolean isTimedOut(long current) { + long timeout = HiveConf.getLongVar(configuration, + HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT); + if (timeout == 0) { + return false; + } + if (timeout > 0) { + // check only when in terminal state + return getState().isTerminal() && getLastAccessTime() + timeout <= current; + } + return getLastAccessTime() < current + -timeout; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + protected final void setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); this.state = newState; - return this.state; + this.lastAccessTime = System.currentTimeMillis(); } protected final void assertState(OperationState state) throws HiveSQLException { if (this.state != state) { throw new HiveSQLException("Expected state " + state + ", but found " + this.state); } + this.lastAccessTime = System.currentTimeMillis(); } public boolean isRunning() { diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 00058cc..4415018 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -179,4 +179,8 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio public String getUserName(); public void setUserName(String userName); + + public long getLastAccessTime(); + + public void ping(); } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index cfda752..5691734 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -19,6 +19,7 @@ package org.apache.hive.service.cli.session; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -51,6 +52,7 @@ import org.apache.hive.service.cli.operation.GetTableTypesOperation; import org.apache.hive.service.cli.operation.GetTypeInfoOperation; import org.apache.hive.service.cli.operation.MetadataOperation; +import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.operation.OperationManager; /** @@ -76,6 +78,8 @@ private IMetaStoreClient metastoreClient = null; private final Set opHandleSet = new HashSet(); + private long lastAccessTime; + public HiveSessionImpl(String username, String password, Map sessionConf) { this.username = username; this.password = password; @@ -88,6 +92,8 @@ public HiveSessionImpl(String username, String password, Map ses // set an explicit session name to control the download directory name hiveConf.set(ConfVars.HIVESESSIONID.varname, sessionHandle.getHandleIdentifier().toString()); + + lastAccessTime = System.currentTimeMillis(); sessionState = new SessionState(hiveConf); SessionState.start(sessionState); } @@ -112,11 +118,13 @@ protected synchronized void acquire() throws HiveSQLException { // need to make sure that the this connections session state is // stored in the thread local for sessions. SessionState.setCurrentSessionState(sessionState); + lastAccessTime = System.currentTimeMillis(); } protected synchronized void release() { assert sessionState != null; // no need to release sessionState... + lastAccessTime = System.currentTimeMillis(); } public SessionHandle getSessionHandle() { @@ -381,6 +389,34 @@ public void setUserName(String userName) { this.username = userName; } + public long getLastAccessTime() { + return lastAccessTime; + } + + @Override + public void ping() { + long current = System.currentTimeMillis(); + OperationManager manager = sessionManager.getOperationManager(); + for (OperationHandle handle : new ArrayList(opHandleSet)) { + Operation operation = null; + try { + operation = manager.getOperation(handle); + } catch (HiveSQLException e) { + LOG.warn("Invalid Operation " + operation.getHandle()); + opHandleSet.remove(handle); + continue; + } + if (operation.isTimedOut(current)) { + LOG.warn("Operation " + operation.getHandle() + " is Timed-out and will be closed"); + try { + closeOperation(operation.getHandle()); + } catch (Exception e) { + LOG.warn("Exception is thrown closing operation " + operation.getHandle(), e); + } + } + } + } + @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { acquire(); diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 25c6f38..7bf2a65 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.session; +import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,6 +43,9 @@ * */ public class SessionManager extends CompositeService { + + private static final int SESSION_CHECK_INTERVAL = 60000; // 1 min + private static final Log LOG = LogFactory.getLog(CompositeService.class); private HiveConf hiveConf; private final Map handleToSession = new HashMap(); @@ -48,6 +53,10 @@ private static final Object sessionMapLock = new Object(); private ThreadPoolExecutor backgroundOperationPool; + private transient boolean shutdown; + private transient Thread checker; + private transient long sessionTimeout; + public SessionManager() { super("SessionManager"); } @@ -68,6 +77,7 @@ public synchronized void init(HiveConf hiveConf) { backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize)); backgroundOperationPool.allowCoreThreadTimeOut(true); + sessionTimeout = HiveConf.getLongVar(hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT); addService(operationManager); super.init(hiveConf); } @@ -75,6 +85,34 @@ public synchronized void init(HiveConf hiveConf) { @Override public synchronized void start() { super.start(); + checker = new Thread(new Runnable() { + public void run() { + while (!shutdown) { + try { + Thread.sleep(SESSION_CHECK_INTERVAL); + } catch (InterruptedException e) { + // ignore + } + long current = System.currentTimeMillis(); + for (HiveSession session : new ArrayList(handleToSession.values())) { + if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) { + SessionHandle handle = session.getSessionHandle(); + LOG.warn("Session " + handle + " is Timed-out (last access : " + + new Date(session.getLastAccessTime()) + ") and will be closed"); + try { + closeSession(handle); + } catch (HiveSQLException e) { + LOG.warn("Exception is thrown closing session " + handle, e); + } + } else { + session.ping(); + } + } + } + } + }); + checker.setDaemon(true); + checker.start(); } @Override @@ -90,6 +128,10 @@ public synchronized void stop() { " seconds has been exceeded. RUNNING background operations will be shut down", e); } } + shutdown = true; + if (checker != null) { + checker.interrupt(); + } } public SessionHandle openSession(String username, String password, Map sessionConf)