diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 33d7e5f..7383374 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -33,6 +33,7 @@
import java.net.URL;
import java.util.*;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -904,6 +905,10 @@
HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", ""),
HIVE_SERVER2_SSL_KEYSTORE_PASSWORD("hive.server2.keystore.password", ""),
+ HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "0", new TimeValidator()),
+ HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", "0", new TimeValidator()),
+ HIVE_SERVER2_IDLE_OPERATION_TIMEOUT("hive.server2.idle.operation.timeout", "0", new TimeValidator()),
+
HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,delete,compile"),
HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager"),
@@ -1275,6 +1280,36 @@ public static void setVar(Configuration conf, ConfVars var, String val) {
conf.set(var.varname, val);
}
+ private static final Pattern TIME_PATTERN = Pattern.compile("(-?\\d+)\\s*([a-zA-Z]*)");
+
+ public static long getTimeInMsec(Configuration conf, ConfVars var) {
+ return toTimeInMsec(getVar(conf, var));
+ }
+
+ public static long toTimeInMsec(String value) {
+ Matcher matcher = TIME_PATTERN.matcher(value);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Invalid time value " + value);
+ }
+ long timePart = Long.valueOf(matcher.group(1));
+ String unitPart = matcher.group(2);
+ if (unitPart == null || unitPart.isEmpty() || unitPart.equalsIgnoreCase("msec")) {
+ return timePart;
+ }
+ unitPart = unitPart.toLowerCase();
+ if (unitPart.startsWith("s")) {
+ return TimeUnit.MILLISECONDS.convert(timePart, TimeUnit.SECONDS);
+ } else if (unitPart.startsWith("m")) {
+ return TimeUnit.MILLISECONDS.convert(timePart, TimeUnit.MINUTES);
+ } else if (unitPart.startsWith("h")) {
+ return TimeUnit.MILLISECONDS.convert(timePart, TimeUnit.HOURS);
+ } else if (unitPart.startsWith("d")) {
+ return TimeUnit.MILLISECONDS.convert(timePart, TimeUnit.DAYS);
+ }
+ throw new IllegalArgumentException("Invalid time value " + value);
+ }
+
+
public static ConfVars getConfVars(String name) {
return vars.get(name);
}
@@ -1567,6 +1602,18 @@ public String validate(String value) {
}
}
+ public static class TimeValidator implements Validator {
+ @Override
+ public String validate(String value) {
+ try {
+ toTimeInMsec(value);
+ } catch (Exception e) {
+ return e.toString();
+ }
+ return null;
+ }
+ }
+
public static class RatioValidator implements Validator {
@Override
public String validate(String value) {
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 05c154e..5cd3fd8 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -920,6 +920,32 @@
+ hive.server2.session.check.interval
+ 0
+ The check interval for sessions, which would be disabled with zero or minus value.
+ Accepts a numeric value which is msec by default but also can be used with other time units appended (sec, min, hour, day).
+
+
+
+
+ hive.server2.idle.session.timeout
+ 0
+ Session will be closed when it's not accessed for this duration, which can be disabled with zero or minus value.
+ Accepts a numeric value which is msec by default but also can be used with other time units appended (sec, min, hour, day).
+
+
+
+
+ hive.server2.idle.operation.timeout
+ 0
+ Operation will be closed when it's not accessed for this duration of time (msec), which can be disabled with zero value.
+ Accepts a numeric value which is msec by default but also can be used with other time units appended (sec, min, hour, day).
+ With positive value, it's checked for operations in terminal state only (FINISHED, CANCELED, CLOSED, ERROR).
+ With negative value, it's checked for all of the operations regardless of state.
+
+
+
+
hive.server2.thrift.http.port
10001
Port number when in HTTP mode.
diff --git service/src/java/org/apache/hive/service/AbstractService.java service/src/java/org/apache/hive/service/AbstractService.java
index c2a2b2d..b9ad768 100644
--- service/src/java/org/apache/hive/service/AbstractService.java
+++ service/src/java/org/apache/hive/service/AbstractService.java
@@ -31,7 +31,7 @@
*/
public abstract class AbstractService implements Service {
- private static final Log LOG = LogFactory.getLog(AbstractService.class);
+ protected static final Log LOG = LogFactory.getLog(AbstractService.class);
/**
* Service state: initially {@link STATE#NOTINITED}.
diff --git service/src/java/org/apache/hive/service/cli/OperationState.java service/src/java/org/apache/hive/service/cli/OperationState.java
index 3e15f0c..b539ae1 100644
--- service/src/java/org/apache/hive/service/cli/OperationState.java
+++ service/src/java/org/apache/hive/service/cli/OperationState.java
@@ -25,29 +25,26 @@
*
*/
public enum OperationState {
- INITIALIZED(TOperationState.INITIALIZED_STATE),
- RUNNING(TOperationState.RUNNING_STATE),
- FINISHED(TOperationState.FINISHED_STATE),
- CANCELED(TOperationState.CANCELED_STATE),
- CLOSED(TOperationState.CLOSED_STATE),
- ERROR(TOperationState.ERROR_STATE),
- UNKNOWN(TOperationState.UKNOWN_STATE),
- PENDING(TOperationState.PENDING_STATE);
+ INITIALIZED(TOperationState.INITIALIZED_STATE, false),
+ RUNNING(TOperationState.RUNNING_STATE, false),
+ FINISHED(TOperationState.FINISHED_STATE, true),
+ CANCELED(TOperationState.CANCELED_STATE, true),
+ CLOSED(TOperationState.CLOSED_STATE, true),
+ ERROR(TOperationState.ERROR_STATE, true),
+ UNKNOWN(TOperationState.UKNOWN_STATE, false),
+ PENDING(TOperationState.PENDING_STATE, false);
private final TOperationState tOperationState;
+ private final boolean terminal;
- OperationState(TOperationState tOperationState) {
+ private OperationState(TOperationState tOperationState, boolean terminal) {
this.tOperationState = tOperationState;
+ this.terminal = terminal;
}
+ // must be sync with TOperationState in order
public static OperationState getOperationState(TOperationState tOperationState) {
- // TODO: replace this with a Map?
- for (OperationState opState : values()) {
- if (tOperationState.equals(opState.tOperationState)) {
- return opState;
- }
- }
- return OperationState.UNKNOWN;
+ return OperationState.values()[tOperationState.getValue()];
}
public static void validateTransition(OperationState oldState,
@@ -91,7 +88,8 @@ public static void validateTransition(OperationState oldState,
default:
// fall-through
}
- throw new HiveSQLException("Illegal Operation state transition");
+ throw new HiveSQLException("Illegal Operation state transition " +
+ "from " + oldState + " to " + newState);
}
public void validateTransition(OperationState newState)
@@ -102,4 +100,8 @@ public void validateTransition(OperationState newState)
public TOperationState toTOperationState() {
return tOperationState;
}
+
+ public boolean isTerminal() {
+ return terminal;
+ }
}
diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java
index d6651ba..3f6a74b 100644
--- service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -46,6 +46,9 @@
protected final boolean runAsync;
protected volatile Future> backgroundHandle;
+ private long operationTimeout;
+ private long lastAccessTime;
+
protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
@@ -54,6 +57,9 @@ protected Operation(HiveSession parentSession, OperationType opType, boolean run
this.parentSession = parentSession;
this.runAsync = runInBackground;
this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
+ lastAccessTime = System.currentTimeMillis();
+ operationTimeout = HiveConf.getTimeInMsec(configuration,
+ HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT);
}
public Future> getBackgroundHandle() {
@@ -105,10 +111,33 @@ protected void setHasResultSet(boolean hasResultSet) {
opHandle.setHasResultSet(hasResultSet);
}
- protected final OperationState setState(OperationState newState) throws HiveSQLException {
+ public boolean isTimedOut(long current) {
+ if (operationTimeout == 0) {
+ return false;
+ }
+ if (operationTimeout > 0) {
+ // check only when it's in terminal state
+ return state.isTerminal() && getLastAccessTime() + operationTimeout <= current;
+ }
+ return getLastAccessTime() + -operationTimeout <= current;
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public long getOperationTimeout() {
+ return operationTimeout;
+ }
+
+ public void setOperationTimeout(long operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ }
+
+ protected final void setState(OperationState newState) throws HiveSQLException {
state.validateTransition(newState);
this.state = newState;
- return this.state;
+ this.lastAccessTime = System.currentTimeMillis();
}
protected void setOperationException(HiveSQLException operationException) {
@@ -119,6 +148,7 @@ protected final void assertState(OperationState state) throws HiveSQLException {
if (this.state != state) {
throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
}
+ this.lastAccessTime = System.currentTimeMillis();
}
public boolean isRunning() {
diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 21c33bc..b7f664a 100644
--- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -18,6 +18,7 @@
package org.apache.hive.service.cli.operation;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -129,15 +130,18 @@ public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession,
return operation;
}
- public synchronized Operation getOperation(OperationHandle operationHandle)
- throws HiveSQLException {
- Operation operation = handleToOperation.get(operationHandle);
+ public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
+ Operation operation = _getOperation(operationHandle);
if (operation == null) {
throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
}
return operation;
}
+ private synchronized Operation _getOperation(OperationHandle operationHandle) {
+ return handleToOperation.get(operationHandle);
+ }
+
private synchronized void addOperation(Operation operation) {
handleToOperation.put(operation.getHandle(), operation);
}
@@ -191,4 +195,27 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle,
throws HiveSQLException {
return getOperation(opHandle).getNextRowSet(orientation, maxRows);
}
+
+ public List closeExpiredOperations(OperationHandle[] handles) {
+ List removed = new ArrayList();
+ long current = System.currentTimeMillis();
+ for (OperationHandle handle : handles) {
+ Operation operation = _getOperation(handle);
+ if (operation == null) {
+ LOG.warn("Invalid Operation " + handle);
+ removed.add(handle);
+ continue;
+ }
+ if (operation.isTimedOut(current)) {
+ LOG.warn("Operation " + handle + " is Timed-out and will be closed");
+ try {
+ closeOperation(operation.getHandle());
+ } catch (Exception e) {
+ LOG.warn("Exception is thrown closing operation " + operation.getHandle(), e);
+ }
+ removed.add(handle);
+ }
+ }
+ return removed;
+ }
}
diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index 9785e95..cc93e49 100644
--- service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -157,4 +157,6 @@ public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
throws HiveSQLException;
+
+ public void closeExpiredOperations();
}
diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
index a5c8e9b..bcd7f98 100644
--- service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
+++ service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
@@ -66,4 +66,6 @@
public String getIpAddress();
public void setIpAddress(String ipAddress);
+
+ public long getLastAccessTime();
}
diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index f730119..afd3ff0 100644
--- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -80,6 +80,8 @@
private IMetaStoreClient metastoreClient = null;
private final Set opHandleSet = new HashSet();
+ private long lastAccessTime;
+
public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
HiveConf serverhiveConf, Map sessionConfMap, String ipAddress) {
this.username = username;
@@ -103,6 +105,8 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo
hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue());
sessionState = new SessionState(hiveConf, username);
sessionState.setIsHiveServerQuery(true);
+
+ lastAccessTime = System.currentTimeMillis();
SessionState.start(sessionState);
}
@@ -139,11 +143,13 @@ protected synchronized void acquire() throws HiveSQLException {
// need to make sure that the this connections session state is
// stored in the thread local for sessions.
SessionState.setCurrentSessionState(sessionState);
+ lastAccessTime = System.currentTimeMillis();
}
protected synchronized void release() {
assert sessionState != null;
SessionState.detachSession();
+ lastAccessTime = System.currentTimeMillis();
}
@Override
@@ -432,6 +438,19 @@ public void setUserName(String userName) {
this.username = userName;
}
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ @Override
+ public void closeExpiredOperations() {
+ OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[0]);
+ if (handles.length > 0) {
+ OperationManager manager = sessionManager.getOperationManager();
+ opHandleSet.removeAll(manager.closeExpiredOperations(handles));
+ }
+ }
+
@Override
public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
acquire();
diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 7f6687e..e03641c 100644
--- service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -18,6 +18,8 @@
package org.apache.hive.service.cli.session;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -44,6 +46,8 @@
*/
public class SessionManager extends CompositeService {
+ private static final int SESSION_CHECK_INTERVAL = 60000; // 1 min
+
private static final Log LOG = LogFactory.getLog(CompositeService.class);
private HiveConf hiveConf;
private final Map handleToSession =
@@ -51,6 +55,12 @@
private final OperationManager operationManager = new OperationManager();
private ThreadPoolExecutor backgroundOperationPool;
+ private Thread timeoutChecker;
+ private long checkInterval;
+ private long sessionTimeout;
+
+ private volatile boolean shutdown;
+
public SessionManager() {
super("SessionManager");
}
@@ -70,6 +80,8 @@ public synchronized void init(HiveConf hiveConf) {
backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize));
backgroundOperationPool.allowCoreThreadTimeOut(true);
+ checkInterval = HiveConf.getTimeInMsec(hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL);
+ sessionTimeout = HiveConf.getTimeInMsec(hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT);
addService(operationManager);
super.init(hiveConf);
}
@@ -77,6 +89,42 @@ public synchronized void init(HiveConf hiveConf) {
@Override
public synchronized void start() {
super.start();
+ if (checkInterval <= 0) {
+ return;
+ }
+ final long interval = Math.max(checkInterval, TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS));
+ timeoutChecker = new Thread(new Runnable() {
+ public void run() {
+ for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+ long current = System.currentTimeMillis();
+ for (HiveSession session : new ArrayList(handleToSession.values())) {
+ if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) {
+ SessionHandle handle = session.getSessionHandle();
+ LOG.warn("Session " + handle + " is Timed-out (last access : " +
+ new Date(session.getLastAccessTime()) + ") and will be closed");
+ try {
+ closeSession(handle);
+ } catch (HiveSQLException e) {
+ LOG.warn("Exception is thrown closing session " + handle, e);
+ }
+ } else {
+ session.closeExpiredOperations();
+ }
+ }
+ }
+ }
+
+ private void sleepInterval(long interval) {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ });
+ timeoutChecker.setName("Session Checker [" + interval + " msec]");
+ timeoutChecker.setDaemon(true);
+ timeoutChecker.start();
}
@Override
@@ -92,6 +140,10 @@ public synchronized void stop() {
" seconds has been exceeded. RUNNING background operations will be shut down", e);
}
}
+ shutdown = true;
+ if (timeoutChecker != null) {
+ timeoutChecker.interrupt();
+ }
}
public SessionHandle openSession(TProtocolVersion protocol, String username, String password,