diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b1fd468..bf66409 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -763,6 +763,10 @@ // Number of seconds HiveServer2 shutdown will wait for async threads to terminate HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10L), + // Time in milliseconds that HiveServer2 will wait, + // before responding to calls that use long polling + // Example: SQLOperation#getState + HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000), // HiveServer2 auth configuration HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE", diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index fe7141e..7f1e81d 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -1909,6 +1909,13 @@ + hive.server2.long.polling.timeout + 5000 + Time (in milliseconds) that HiveServer2 will wait, + before responding to calls that use long polling + + + hive.server2.thrift.port 10000 Port number of HiveServer2 Thrift interface. diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 4ee1b74..32588d2 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -25,10 +25,15 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -207,6 +212,36 @@ public TableSchema getResultSetSchema() throws HiveSQLException { return resultSchema; } + @Override + public OperationState getState() { + // For an async call, block till LONG_POLLING_TIMEOUT expires or + // background operation thread generates results, whichever happens first + if (shouldRunAsync()) { + int timeout = getParentSession().getHiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + try { + getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + // No Op, return to the caller since long polling timeout has expired + LOG.trace("Long polling timed out"); + } + catch (CancellationException e) { + // The background operation thread was cancelled + LOG.info("The background operation was cancelled", e); + } + catch (ExecutionException e) { + // The background operation thread was aborted + LOG.error("The background operation was aborted", e); + } + catch (InterruptedException e) { + // No op, this thread was interrupted + // In this case, the call might return sooner than long polling timeout + LOG.debug("The polling thread was interrupted", e); + } + } + return super.getState(); + } + @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { @@ -318,6 +353,10 @@ private boolean shouldRunAsync() { return runAsync; } + private Future getBackgroundHandle() { + return backgroundHandle; + } + /** * If there are query specific settings to overlay, then create a copy of config * There are two cases we need to clone the session config that's being passed to hive driver @@ -345,5 +384,4 @@ private HiveConf getConfigForOperation() throws HiveSQLException { } return sqlOperationConf; } - } diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index cd9d99a..32dd5ee 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; import java.util.Collections; import java.util.HashMap; @@ -148,11 +149,16 @@ public void testExecuteStatement() throws Exception { @Test public void testExecuteStatementAsync() throws Exception { - HashMap confOverlay = new HashMap(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); // Timeout for the poll in case of asynchronous execute - long pollTimeout = System.currentTimeMillis() + 100000; + long testIterationTimeout = System.currentTimeMillis() + 100000; + long longPollingStart; + long longPollingEnd; + // Set longPollingTimeout to a lower value for test since we execute short queries here + long longPollingTimeout = 500; + long longPollingTimeDelta; + HashMap confOverlay = new HashMap(); + confOverlay.put("hive.server2.long.polling.timeout", String.valueOf(longPollingTimeout)); + SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay); assertNotNull(sessionHandle); OperationState state = null; OperationHandle ophandle; @@ -177,7 +183,7 @@ public void testExecuteStatementAsync() throws Exception { int count = 0; while (true) { // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { + if (System.currentTimeMillis() > testIterationTimeout) { System.out.println("Polling timed out"); break; } @@ -189,24 +195,27 @@ public void testExecuteStatementAsync() throws Exception { || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } - Thread.sleep(1000); } assertEquals("Query should return an error state", OperationState.ERROR, client.getOperationStatus(ophandle)); // Test async execution when query is well formed queryString = "SELECT ID FROM TEST_EXEC_ASYNC"; - ophandle = - client.executeStatementAsync(sessionHandle, queryString, confOverlay); + ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); count = 0; while (true) { // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { + if (System.currentTimeMillis() > testIterationTimeout) { System.out.println("Polling timed out"); break; } + + longPollingStart = System.currentTimeMillis(); + System.out.println("Long polling starts at: " + longPollingStart); state = client.getOperationStatus(ophandle); + longPollingEnd = System.currentTimeMillis(); + System.out.println("Long polling ends at: " + longPollingEnd); System.out.println("Polling: " + ophandle + " count=" + (++count) + " state=" + state); @@ -214,7 +223,12 @@ public void testExecuteStatementAsync() throws Exception { || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } - Thread.sleep(1000); + else { + // Verify that getOperationStatus returned only after the long polling timeout + longPollingTimeDelta = longPollingEnd - longPollingStart; + // Scale down by a factor of 0.9 to account for approximate values + assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0); + } } assertEquals("Query should be finished", OperationState.FINISHED, client.getOperationStatus(ophandle));