diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index a2eade1..19287dc 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -2127,4 +2127,20 @@ public void testNonAsciiReturnValues() throws Exception { } stmt.close(); } + + /** + * Support Statement.setQueryTimeout() + */ + @Test + public void testQueryTimeout() throws Exception { + Statement stmt = con.createStatement(); + stmt.setQueryTimeout(1); + try { + stmt.execute("select value, sum(under_col) as sum from " + tableName + + " group by value order by sum"); + assertTrue("Should throw timeout exception", false); + } catch (SQLException e) { + assertEquals("Invalid error code", 1265, e.getErrorCode()); + } + } } diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 952fcde..b335f48 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hive.service.cli.operation.SQLOperation; import org.apache.hive.service.cli.thrift.TCLIService; import org.apache.hive.service.cli.thrift.TCancelOperationReq; import org.apache.hive.service.cli.thrift.TCancelOperationResp; @@ -499,7 +500,12 @@ public boolean getMoreResults(int current) throws SQLException { @Override public int getQueryTimeout() throws SQLException { - throw new SQLException("Method not supported"); + String value = sessConf.get(SQLOperation.QUERY_TIMEOUT_SEC); + try { + return value == null ? 0 : Integer.valueOf(value); + } catch (Exception e) { + return 0; + } } /* @@ -683,7 +689,7 @@ public void setPoolable(boolean poolable) throws SQLException { @Override public void setQueryTimeout(int seconds) throws SQLException { - throw new SQLException("Method not supported"); + sessConf.put(SQLOperation.QUERY_TIMEOUT_SEC, String.valueOf(seconds)); } /* diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 401e639..f70e345 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.TimeoutException; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -139,6 +140,7 @@ // A limit on the number of threads that can be launched private int maxthreads; private int tryCount = Integer.MAX_VALUE; + private long queryTimeout; private boolean destroyed; @@ -1226,7 +1228,7 @@ public int execute() throws CommandNeedRetryException { // At any time, at most maxthreads tasks can be running // The main thread polls the TaskRunners to check if they have finished. - DriverContext driverCxt = new DriverContext(ctx); + DriverContext driverCxt = new DriverContext(ctx, queryTimeout); driverCxt.prepare(plan); ctx.setHDFSCleanup(true); @@ -1386,17 +1388,24 @@ public int execute() throws CommandNeedRetryException { throw e; } catch (Exception e) { ctx.restoreOriginalTracker(); + int code; + // TODO: do better with handling types of Exception here + if (e instanceof TimeoutException) { + errorMessage = "FAILED: " + e.getMessage(); + code = 1265; + } else { + errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); + code = 12; + } if (SessionState.get() != null) { SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, - String.valueOf(12)); + String.valueOf(code)); } - // TODO: do better with handling types of Exception here - errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); SQLState = "08S01"; downstreamError = e; console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return (12); + return code; } finally { if (SessionState.get() != null) { SessionState.get().getHiveHistory().endQuery(queryId); @@ -1635,4 +1644,11 @@ public String getErrorMsg() { return errorMessage; } + public void setQueryTimeout(long queryTimeout) { + this.queryTimeout = queryTimeout; + } + + public long getQueryTimeout() { + return queryTimeout; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index c7d3b66..932caaa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -39,6 +39,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,13 +67,16 @@ final Map statsTasks = new HashMap(1); + final Timer timer = new Timer(); + public DriverContext() { } - public DriverContext(Context ctx) { + public DriverContext(Context ctx, long queryTimeout) { this.runnable = new ConcurrentLinkedQueue>(); this.running = new LinkedBlockingQueue(); this.ctx = ctx; + timer.setTimeout(queryTimeout); } public synchronized boolean isShutdown() { @@ -105,8 +109,8 @@ public synchronized void launching(TaskRunner runner) throws HiveException { * * @return The result object for any completed/failed task */ - public synchronized TaskRunner pollFinished() throws InterruptedException { - while (!shutdown) { + public synchronized TaskRunner pollFinished() throws InterruptedException, TimeoutException { + while (!shutdown && timer.checkTimeout()) { Iterator it = running.iterator(); while (it.hasNext()) { TaskRunner runner = it.next(); @@ -222,4 +226,26 @@ public void apply(FileSinkOperator fsOp) { statsTasks.get(statKey).getWork().setSourceTask(mapredTask); } } + + private static class Timer { + long prev; + long timeout = -1; + long elapsed; + private void setTimeout(long timeout) { + this.timeout = timeout; + this.prev = System.currentTimeMillis(); + this.elapsed = 0; + } + private boolean checkTimeout() throws TimeoutException { + if (timeout > 0) { + long current = System.currentTimeMillis(); + elapsed += current - prev; + prev = current; + if (elapsed >= timeout) { + throw new TimeoutException("Execution Timeout, elapsed " + timeout + "msec"); + } + } + return true; + } + } } diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 5e7ee93..580f1c1 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -29,6 +29,7 @@ import java.util.Properties; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -67,6 +68,8 @@ */ public class SQLOperation extends ExecuteStatementOperation { + public static final String QUERY_TIMEOUT_SEC = "query.timeout.sec"; + private Driver driver = null; private CommandProcessorResponse response; private TableSchema resultSchema = null; @@ -90,6 +93,7 @@ public void prepare(HiveConf sqlOperationConf) throws HiveSQLException { try { driver = new Driver(sqlOperationConf, getParentSession().getUserName()); + driver.setQueryTimeout(getQueryTimeout()); // In Hive server mode, we are not able to retry in the FetchTask // case, when calling fetch queries since execute() has returned. // For now, we disable the test attempts. @@ -165,6 +169,15 @@ private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException { setState(OperationState.FINISHED); } + private long getQueryTimeout() { + String value = confOverlay.get(QUERY_TIMEOUT_SEC); + try { + return value == null ? 0 : TimeUnit.SECONDS.toMillis(Long.parseLong(value)); + } catch (Exception e) { + return 0; + } + } + @Override public void run() throws HiveSQLException { setState(OperationState.PENDING); @@ -177,7 +190,7 @@ public void run() throws HiveSQLException { // current Hive object needs to be set in aysnc thread in case of remote metastore. // The metastore client in Hive is associated with right user final Hive sessionHive = getCurrentHive(); - // current UGI will get used by metastore when metsatore is in embedded mode + // current UGI will get used by metastore when metastore is in embedded mode // so this needs to get passed to the new async thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); @@ -410,7 +423,7 @@ private SerDe getSerDe() throws SQLException { private HiveConf getConfigForOperation() throws HiveSQLException { HiveConf sqlOperationConf = getParentSession().getHiveConf(); if (!getConfOverlay().isEmpty() || shouldRunAsync()) { - // clone the partent session config for this query + // clone the parent session config for this query sqlOperationConf = new HiveConf(sqlOperationConf); // apply overlay query specific settings, if any