diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dcfe29a..82487ee 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -33,6 +33,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -933,6 +934,10 @@ HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", ""), HIVE_SERVER2_SSL_KEYSTORE_PASSWORD("hive.server2.keystore.password", ""), + HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "0", new TimeValidator()), + HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", "0", new TimeValidator()), + HIVE_SERVER2_IDLE_OPERATION_TIMEOUT("hive.server2.idle.operation.timeout", "0", new TimeValidator()), + HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,delete,compile"), HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager"), @@ -1313,6 +1318,36 @@ public static void setVar(Configuration conf, ConfVars var, String val) { conf.set(var.varname, val); } + private static final Pattern TIME_PATTERN = Pattern.compile("(-?\\d+)\\s*([a-zA-Z]*)"); + + public static long getTimeInMsec(Configuration conf, ConfVars var) { + return toTimeInMsec(getVar(conf, var)); + } + + public static long toTimeInMsec(String value) { + Matcher matcher = TIME_PATTERN.matcher(value); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid time value " + value); + } + long timePart = Long.valueOf(matcher.group(1)); + String unitPart = matcher.group(2); + if (unitPart == null || unitPart.isEmpty() || unitPart.equalsIgnoreCase("msec")) { + return timePart; + } + unitPart = unitPart.toLowerCase(); + if (unitPart.startsWith("s")) { + return TimeUnit.MILLISECONDS.convert(timePart, TimeUnit.SECONDS); + } else if (unitPart.startsWith("m")) { + return TimeUnit.MILLISECONDS.convert(timePart, TimeUnit.MINUTES); + } else if (unitPart.startsWith("h")) { + return TimeUnit.MILLISECONDS.convert(timePart, TimeUnit.HOURS); + } else if (unitPart.startsWith("d")) { + return TimeUnit.MILLISECONDS.convert(timePart, TimeUnit.DAYS); + } + throw new IllegalArgumentException("Invalid time value " + value); + } + + public static ConfVars getConfVars(String name) { return vars.get(name); } @@ -1605,6 +1640,18 @@ public String validate(String value) { } } + public static class TimeValidator implements Validator { + @Override + public String validate(String value) { + try { + toTimeInMsec(value); + } catch (Exception e) { + return e.toString(); + } + return null; + } + } + public static class RatioValidator implements Validator { @Override public String validate(String value) { diff --git conf/hive-default.xml.template conf/hive-default.xml.template index a75f569..744c79e 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -920,6 +920,32 @@ + hive.server2.session.check.interval + 0 + The check interval for sessions, which would be disabled with zero or minus value. + Accepts a numeric value which is msec by default but also can be used with other time units appended (sec, min, hour, day). + + + + + hive.server2.idle.session.timeout + 0 + Session will be closed when it's not accessed for this duration, which can be disabled with zero or minus value. + Accepts a numeric value which is msec by default but also can be used with other time units appended (sec, min, hour, day). + + + + + hive.server2.idle.operation.timeout + 0 + Operation will be closed when it's not accessed for this duration of time (msec), which can be disabled with zero value. + Accepts a numeric value which is msec by default but also can be used with other time units appended (sec, min, hour, day). + With positive value, it's checked for operations in terminal state only (FINISHED, CANCELED, CLOSED, ERROR). + With negative value, it's checked for all of the operations regardless of state. + + + + hive.server2.thrift.http.port 10001 Port number when in HTTP mode. diff --git service/src/java/org/apache/hive/service/cli/OperationState.java service/src/java/org/apache/hive/service/cli/OperationState.java index 3e15f0c..b539ae1 100644 --- service/src/java/org/apache/hive/service/cli/OperationState.java +++ service/src/java/org/apache/hive/service/cli/OperationState.java @@ -25,29 +25,26 @@ * */ public enum OperationState { - INITIALIZED(TOperationState.INITIALIZED_STATE), - RUNNING(TOperationState.RUNNING_STATE), - FINISHED(TOperationState.FINISHED_STATE), - CANCELED(TOperationState.CANCELED_STATE), - CLOSED(TOperationState.CLOSED_STATE), - ERROR(TOperationState.ERROR_STATE), - UNKNOWN(TOperationState.UKNOWN_STATE), - PENDING(TOperationState.PENDING_STATE); + INITIALIZED(TOperationState.INITIALIZED_STATE, false), + RUNNING(TOperationState.RUNNING_STATE, false), + FINISHED(TOperationState.FINISHED_STATE, true), + CANCELED(TOperationState.CANCELED_STATE, true), + CLOSED(TOperationState.CLOSED_STATE, true), + ERROR(TOperationState.ERROR_STATE, true), + UNKNOWN(TOperationState.UKNOWN_STATE, false), + PENDING(TOperationState.PENDING_STATE, false); private final TOperationState tOperationState; + private final boolean terminal; - OperationState(TOperationState tOperationState) { + private OperationState(TOperationState tOperationState, boolean terminal) { this.tOperationState = tOperationState; + this.terminal = terminal; } + // must be sync with TOperationState in order public static OperationState getOperationState(TOperationState tOperationState) { - // TODO: replace this with a Map? - for (OperationState opState : values()) { - if (tOperationState.equals(opState.tOperationState)) { - return opState; - } - } - return OperationState.UNKNOWN; + return OperationState.values()[tOperationState.getValue()]; } public static void validateTransition(OperationState oldState, @@ -91,7 +88,8 @@ public static void validateTransition(OperationState oldState, default: // fall-through } - throw new HiveSQLException("Illegal Operation state transition"); + throw new HiveSQLException("Illegal Operation state transition " + + "from " + oldState + " to " + newState); } public void validateTransition(OperationState newState) @@ -102,4 +100,8 @@ public void validateTransition(OperationState newState) public TOperationState toTOperationState() { return tOperationState; } + + public boolean isTerminal() { + return terminal; + } } diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index d6651ba..569090b 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -46,14 +46,19 @@ protected final boolean runAsync; protected volatile Future backgroundHandle; + private long operationTimeout; + private long lastAccessTime; + protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) { - super(); this.parentSession = parentSession; this.runAsync = runInBackground; this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); + lastAccessTime = System.currentTimeMillis(); + operationTimeout = HiveConf.getTimeInMsec(parentSession.getHiveConf(), + HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT); } public Future getBackgroundHandle() { @@ -105,10 +110,33 @@ protected void setHasResultSet(boolean hasResultSet) { opHandle.setHasResultSet(hasResultSet); } - protected final OperationState setState(OperationState newState) throws HiveSQLException { + public boolean isTimedOut(long current) { + if (operationTimeout == 0) { + return false; + } + if (operationTimeout > 0) { + // check only when it's in terminal state + return state.isTerminal() && getLastAccessTime() + operationTimeout <= current; + } + return getLastAccessTime() + -operationTimeout <= current; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public long getOperationTimeout() { + return operationTimeout; + } + + public void setOperationTimeout(long operationTimeout) { + this.operationTimeout = operationTimeout; + } + + protected final void setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); this.state = newState; - return this.state; + this.lastAccessTime = System.currentTimeMillis(); } protected void setOperationException(HiveSQLException operationException) { @@ -119,6 +147,7 @@ protected final void assertState(OperationState state) throws HiveSQLException { if (this.state != state) { throw new HiveSQLException("Expected state " + state + ", but found " + this.state); } + this.lastAccessTime = System.currentTimeMillis(); } public boolean isRunning() { diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 21c33bc..b7f664a 100644 --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -18,6 +18,7 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -129,15 +130,18 @@ public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession, return operation; } - public synchronized Operation getOperation(OperationHandle operationHandle) - throws HiveSQLException { - Operation operation = handleToOperation.get(operationHandle); + public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException { + Operation operation = _getOperation(operationHandle); if (operation == null) { throw new HiveSQLException("Invalid OperationHandle: " + operationHandle); } return operation; } + private synchronized Operation _getOperation(OperationHandle operationHandle) { + return handleToOperation.get(operationHandle); + } + private synchronized void addOperation(Operation operation) { handleToOperation.put(operation.getHandle(), operation); } @@ -191,4 +195,27 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle, throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); } + + public List closeExpiredOperations(OperationHandle[] handles) { + List removed = new ArrayList(); + long current = System.currentTimeMillis(); + for (OperationHandle handle : handles) { + Operation operation = _getOperation(handle); + if (operation == null) { + LOG.warn("Invalid Operation " + handle); + removed.add(handle); + continue; + } + if (operation.isTimedOut(current)) { + LOG.warn("Operation " + handle + " is Timed-out and will be closed"); + try { + closeOperation(operation.getHandle()); + } catch (Exception e) { + LOG.warn("Exception is thrown closing operation " + operation.getHandle(), e); + } + removed.add(handle); + } + } + return removed; + } } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 9785e95..cc93e49 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -157,4 +157,6 @@ public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr) public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; + + public void closeExpiredOperations(); } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java index a5c8e9b..bcd7f98 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java @@ -66,4 +66,6 @@ public String getIpAddress(); public void setIpAddress(String ipAddress); + + public long getLastAccessTime(); } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index a9d5902..cebba61 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -80,6 +80,8 @@ private IMetaStoreClient metastoreClient = null; private final Set opHandleSet = new HashSet(); + private long lastAccessTime; + public HiveSessionImpl(TProtocolVersion protocol, String username, String password, HiveConf serverhiveConf, Map sessionConfMap, String ipAddress) { this.username = username; @@ -103,6 +105,8 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue()); sessionState = new SessionState(hiveConf, username); sessionState.setIsHiveServerQuery(true); + + lastAccessTime = System.currentTimeMillis(); SessionState.start(sessionState); } @@ -139,11 +143,13 @@ protected synchronized void acquire() throws HiveSQLException { // need to make sure that the this connections session state is // stored in the thread local for sessions. SessionState.setCurrentSessionState(sessionState); + lastAccessTime = System.currentTimeMillis(); } protected synchronized void release() { assert sessionState != null; SessionState.detachSession(); + lastAccessTime = System.currentTimeMillis(); } @Override @@ -432,6 +438,19 @@ public void setUserName(String userName) { this.username = userName; } + public long getLastAccessTime() { + return lastAccessTime; + } + + @Override + public void closeExpiredOperations() { + OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[0]); + if (handles.length > 0) { + OperationManager manager = sessionManager.getOperationManager(); + opHandleSet.removeAll(manager.closeExpiredOperations(handles)); + } + } + @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { acquire(); diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 6650c05..78aae63 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.session; +import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -46,6 +48,8 @@ */ public class SessionManager extends CompositeService { + private static final int SESSION_CHECK_INTERVAL = 60000; // 1 min + private static final Log LOG = LogFactory.getLog(CompositeService.class); private HiveConf hiveConf; private final Map handleToSession = @@ -53,6 +57,12 @@ private final OperationManager operationManager = new OperationManager(); private ThreadPoolExecutor backgroundOperationPool; + private Thread timeoutChecker; + private long checkInterval; + private long sessionTimeout; + + private volatile boolean shutdown; + public SessionManager() { super("SessionManager"); } @@ -78,6 +88,8 @@ public synchronized void init(HiveConf hiveConf) { backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize)); backgroundOperationPool.allowCoreThreadTimeOut(true); + checkInterval = HiveConf.getTimeInMsec(hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL); + sessionTimeout = HiveConf.getTimeInMsec(hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT); addService(operationManager); super.init(hiveConf); } @@ -94,6 +106,42 @@ private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveExc @Override public synchronized void start() { super.start(); + if (checkInterval <= 0) { + return; + } + final long interval = Math.max(checkInterval, TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS)); + timeoutChecker = new Thread(new Runnable() { + public void run() { + for (sleepInterval(interval); !shutdown; sleepInterval(interval)) { + long current = System.currentTimeMillis(); + for (HiveSession session : new ArrayList(handleToSession.values())) { + if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) { + SessionHandle handle = session.getSessionHandle(); + LOG.warn("Session " + handle + " is Timed-out (last access : " + + new Date(session.getLastAccessTime()) + ") and will be closed"); + try { + closeSession(handle); + } catch (HiveSQLException e) { + LOG.warn("Exception is thrown closing session " + handle, e); + } + } else { + session.closeExpiredOperations(); + } + } + } + } + + private void sleepInterval(long interval) { + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + // ignore + } + } + }); + timeoutChecker.setName("Session Checker [" + interval + " msec]"); + timeoutChecker.setDaemon(true); + timeoutChecker.start(); } @Override @@ -109,6 +157,10 @@ public synchronized void stop() { " seconds has been exceeded. RUNNING background operations will be shut down", e); } } + shutdown = true; + if (timeoutChecker != null) { + timeoutChecker.interrupt(); + } } public SessionHandle openSession(TProtocolVersion protocol, String username, String password,