diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 32ab3d8..f9348cf 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -789,6 +789,9 @@
HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", ""),
HIVE_SERVER2_SSL_KEYSTORE_PASSWORD("hive.server2.keystore.password", ""),
+ HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", 0l),
+ HIVE_SERVER2_IDLE_OPERATION_TIMEOUT("hive.server2.idle.operation.timeout", 0l),
+
HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,delete,compile"),
HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", ""),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index c574ab5..a30ee6f 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -885,6 +885,22 @@
+ hive.server2.idle.session.timeout
+ 0
+ Session will be closed when it's not accessed for this duration (msec).
+ Should be a positive value
+
+
+
+ hive.server2.idle.operation.timeout
+ 0
+ Operation will be closed when it's not accessed for this duration (msec).
+ With positive value, it's checked for operations in termial state (FINISHED, CANCELED, CLOSED, ERROR).
+ With negative vable, it's checked for all of the operations regardless of state.
+
+
+
+
hive.server2.thrift.http.port
10001
Port number when in HTTP mode.
diff --git service/src/java/org/apache/hive/service/cli/OperationState.java service/src/java/org/apache/hive/service/cli/OperationState.java
index 1ec6bd1..0f0170f 100644
--- service/src/java/org/apache/hive/service/cli/OperationState.java
+++ service/src/java/org/apache/hive/service/cli/OperationState.java
@@ -25,30 +25,25 @@
*
*/
public enum OperationState {
- INITIALIZED(TOperationState.INITIALIZED_STATE),
- RUNNING(TOperationState.RUNNING_STATE),
- FINISHED(TOperationState.FINISHED_STATE),
- CANCELED(TOperationState.CANCELED_STATE),
- CLOSED(TOperationState.CLOSED_STATE),
- ERROR(TOperationState.ERROR_STATE),
- UNKNOWN(TOperationState.UKNOWN_STATE),
- PENDING(TOperationState.PENDING_STATE);
+ INITIALIZED(TOperationState.INITIALIZED_STATE, false),
+ RUNNING(TOperationState.RUNNING_STATE, false),
+ FINISHED(TOperationState.FINISHED_STATE, true),
+ CANCELED(TOperationState.CANCELED_STATE, true),
+ CLOSED(TOperationState.CLOSED_STATE, true),
+ ERROR(TOperationState.ERROR_STATE, true),
+ UNKNOWN(TOperationState.UKNOWN_STATE, false),
+ PENDING(TOperationState.PENDING_STATE, false);
private final TOperationState tOperationState;
+ private final boolean terminal;
- OperationState(TOperationState tOperationState) {
+ private OperationState(TOperationState tOperationState, boolean terminal) {
this.tOperationState = tOperationState;
+ this.terminal = terminal;
}
-
public static OperationState getOperationState(TOperationState tOperationState) {
- // TODO: replace this with a Map?
- for (OperationState opState : values()) {
- if (tOperationState.equals(opState.tOperationState)) {
- return opState;
- }
- }
- return OperationState.UNKNOWN;
+ return OperationState.values()[tOperationState.getValue()];
}
public static void validateTransition(OperationState oldState, OperationState newState)
@@ -101,4 +96,8 @@ public void validateTransition(OperationState newState)
public TOperationState toTOperationState() {
return tOperationState;
}
+
+ public boolean isTerminal() {
+ return terminal;
+ }
}
diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 6f4b8dc..fa26f3e 100644
--- service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -40,10 +40,13 @@
public static final long DEFAULT_FETCH_MAX_ROWS = 100;
protected boolean hasResultSet;
+ private long lastAccessTime;
+
protected Operation(HiveSession parentSession, OperationType opType) {
super();
this.parentSession = parentSession;
opHandle = new OperationHandle(opType);
+ lastAccessTime = System.currentTimeMillis();
}
public void setConfiguration(HiveConf configuration) {
@@ -79,16 +82,34 @@ protected void setHasResultSet(boolean hasResultSet) {
opHandle.setHasResultSet(hasResultSet);
}
- protected final OperationState setState(OperationState newState) throws HiveSQLException {
+ public boolean isTimedOut(long current) {
+ long timeout = HiveConf.getLongVar(configuration,
+ HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT);
+ if (timeout == 0) {
+ return false;
+ }
+ if (timeout > 0) {
+ // check only when in terminal state
+ return getState().isTerminal() && getLastAccessTime() + timeout <= current;
+ }
+ return getLastAccessTime() < current + -timeout;
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ protected final void setState(OperationState newState) throws HiveSQLException {
state.validateTransition(newState);
this.state = newState;
- return this.state;
+ this.lastAccessTime = System.currentTimeMillis();
}
protected final void assertState(OperationState state) throws HiveSQLException {
if (this.state != state) {
throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
}
+ this.lastAccessTime = System.currentTimeMillis();
}
public boolean isRunning() {
diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index 00058cc..4415018 100644
--- service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -179,4 +179,8 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio
public String getUserName();
public void setUserName(String userName);
+
+ public long getLastAccessTime();
+
+ public void ping();
}
diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index cfda752..5691734 100644
--- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -19,6 +19,7 @@
package org.apache.hive.service.cli.session;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -51,6 +52,7 @@
import org.apache.hive.service.cli.operation.GetTableTypesOperation;
import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.Operation;
import org.apache.hive.service.cli.operation.OperationManager;
/**
@@ -76,6 +78,8 @@
private IMetaStoreClient metastoreClient = null;
private final Set opHandleSet = new HashSet();
+ private long lastAccessTime;
+
public HiveSessionImpl(String username, String password, Map sessionConf) {
this.username = username;
this.password = password;
@@ -88,6 +92,8 @@ public HiveSessionImpl(String username, String password, Map ses
// set an explicit session name to control the download directory name
hiveConf.set(ConfVars.HIVESESSIONID.varname,
sessionHandle.getHandleIdentifier().toString());
+
+ lastAccessTime = System.currentTimeMillis();
sessionState = new SessionState(hiveConf);
SessionState.start(sessionState);
}
@@ -112,11 +118,13 @@ protected synchronized void acquire() throws HiveSQLException {
// need to make sure that the this connections session state is
// stored in the thread local for sessions.
SessionState.setCurrentSessionState(sessionState);
+ lastAccessTime = System.currentTimeMillis();
}
protected synchronized void release() {
assert sessionState != null;
// no need to release sessionState...
+ lastAccessTime = System.currentTimeMillis();
}
public SessionHandle getSessionHandle() {
@@ -381,6 +389,34 @@ public void setUserName(String userName) {
this.username = userName;
}
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ @Override
+ public void ping() {
+ long current = System.currentTimeMillis();
+ OperationManager manager = sessionManager.getOperationManager();
+ for (OperationHandle handle : new ArrayList(opHandleSet)) {
+ Operation operation = null;
+ try {
+ operation = manager.getOperation(handle);
+ } catch (HiveSQLException e) {
+ LOG.warn("Invalid Operation " + operation.getHandle());
+ opHandleSet.remove(handle);
+ continue;
+ }
+ if (operation.isTimedOut(current)) {
+ LOG.warn("Operation " + operation.getHandle() + " is Timed-out and will be closed");
+ try {
+ closeOperation(operation.getHandle());
+ } catch (Exception e) {
+ LOG.warn("Exception is thrown closing operation " + operation.getHandle(), e);
+ }
+ }
+ }
+ }
+
@Override
public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
acquire();
diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 25c6f38..7bf2a65 100644
--- service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -18,6 +18,8 @@
package org.apache.hive.service.cli.session;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -41,6 +43,9 @@
*
*/
public class SessionManager extends CompositeService {
+
+ private static final int SESSION_CHECK_INTERVAL = 60000; // 1 min
+
private static final Log LOG = LogFactory.getLog(CompositeService.class);
private HiveConf hiveConf;
private final Map handleToSession = new HashMap();
@@ -48,6 +53,10 @@
private static final Object sessionMapLock = new Object();
private ThreadPoolExecutor backgroundOperationPool;
+ private transient boolean shutdown;
+ private transient Thread checker;
+ private transient long sessionTimeout;
+
public SessionManager() {
super("SessionManager");
}
@@ -68,6 +77,7 @@ public synchronized void init(HiveConf hiveConf) {
backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize));
backgroundOperationPool.allowCoreThreadTimeOut(true);
+ sessionTimeout = HiveConf.getLongVar(hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT);
addService(operationManager);
super.init(hiveConf);
}
@@ -75,6 +85,34 @@ public synchronized void init(HiveConf hiveConf) {
@Override
public synchronized void start() {
super.start();
+ checker = new Thread(new Runnable() {
+ public void run() {
+ while (!shutdown) {
+ try {
+ Thread.sleep(SESSION_CHECK_INTERVAL);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ long current = System.currentTimeMillis();
+ for (HiveSession session : new ArrayList(handleToSession.values())) {
+ if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) {
+ SessionHandle handle = session.getSessionHandle();
+ LOG.warn("Session " + handle + " is Timed-out (last access : " +
+ new Date(session.getLastAccessTime()) + ") and will be closed");
+ try {
+ closeSession(handle);
+ } catch (HiveSQLException e) {
+ LOG.warn("Exception is thrown closing session " + handle, e);
+ }
+ } else {
+ session.ping();
+ }
+ }
+ }
+ }
+ });
+ checker.setDaemon(true);
+ checker.start();
}
@Override
@@ -90,6 +128,10 @@ public synchronized void stop() {
" seconds has been exceeded. RUNNING background operations will be shut down", e);
}
}
+ shutdown = true;
+ if (checker != null) {
+ checker.interrupt();
+ }
}
public SessionHandle openSession(String username, String password, Map sessionConf)