diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index abfde42..a1e805e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -761,6 +761,10 @@
// Number of seconds HiveServer2 shutdown will wait for async threads to terminate
HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10),
+ // Time in milliseconds that HiveServer2 will wait,
+ // before responding to calls that use long polling
+ // Example: SQLOperation#getState
+ HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000),
// HiveServer2 auth configuration
HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE"),
diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template
index 5da90ec..9f39eb0 100644
--- a/conf/hive-default.xml.template
+++ b/conf/hive-default.xml.template
@@ -1909,6 +1909,13 @@
+ hive.server2.long.polling.timeout
+ 5000
+ Time (in milliseconds) that HiveServer2 will wait,
+ before responding to calls that use long polling
+
+
+
hive.server2.thrift.port
10000
Port number of HiveServer2 Thrift interface.
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index f6adf92..f2c0ad7 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -25,10 +25,15 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
@@ -207,6 +212,36 @@ public TableSchema getResultSetSchema() throws HiveSQLException {
return resultSchema;
}
+ @Override
+ public OperationState getState() {
+ // For an async call, block till LONG_POLLING_TIMEOUT expires or
+ // background operation thread generates results, whichever happens first
+ if (shouldRunAsync()) {
+ int timeout = getParentSession().getHiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ try {
+ getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e) {
+ // No Op, return to the caller since long polling timeout has expired
+ LOG.trace("Long polling timed out");
+ }
+ catch (CancellationException e) {
+ // The background operation thread was cancelled
+ LOG.info("The background operation was cancelled", e);
+ }
+ catch (ExecutionException e) {
+ // The background operation thread was aborted
+ LOG.error("The background operation was aborted", e);
+ }
+ catch (InterruptedException e) {
+ // No op, this thread was interrupted
+ // In this case, the call might return sooner than long polling timeout
+ LOG.debug("The polling thread was interrupted", e);
+ }
+ }
+ return super.getState();
+ }
+
@Override
public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
@@ -318,4 +353,9 @@ private boolean shouldRunAsync() {
return runAsync;
}
+ private Future> getBackgroundHandle() {
+ return backgroundHandle;
+ }
+
+
}
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index d6caed1..e11b5a6 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
@@ -145,11 +146,16 @@ public void testExecuteStatement() throws Exception {
@Test
public void testExecuteStatementAsync() throws Exception {
- HashMap confOverlay = new HashMap();
- SessionHandle sessionHandle = client.openSession("tom", "password",
- new HashMap());
// Timeout for the poll in case of asynchronous execute
- long pollTimeout = System.currentTimeMillis() + 100000;
+ long testIterationTimeout = System.currentTimeMillis() + 100000;
+ long longPollingStart;
+ long longPollingEnd;
+ // Set longPollingTimeout to a lower value for test since we execute short queries here
+ long longPollingTimeout = 500;
+ long longPollingTimeDelta;
+ HashMap confOverlay = new HashMap();
+ confOverlay.put("hive.server2.long.polling.timeout", String.valueOf(longPollingTimeout));
+ SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay);
assertNotNull(sessionHandle);
OperationState state = null;
OperationHandle ophandle;
@@ -174,7 +180,7 @@ public void testExecuteStatementAsync() throws Exception {
int count = 0;
while (true) {
// Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
+ if (System.currentTimeMillis() > testIterationTimeout) {
System.out.println("Polling timed out");
break;
}
@@ -186,24 +192,27 @@ public void testExecuteStatementAsync() throws Exception {
|| state == OperationState.FINISHED || state == OperationState.ERROR) {
break;
}
- Thread.sleep(1000);
}
assertEquals("Query should return an error state",
OperationState.ERROR, client.getOperationStatus(ophandle));
// Test async execution when query is well formed
queryString = "SELECT ID FROM TEST_EXEC_ASYNC";
- ophandle =
- client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+ ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
count = 0;
while (true) {
// Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
+ if (System.currentTimeMillis() > testIterationTimeout) {
System.out.println("Polling timed out");
break;
}
+
+ longPollingStart = System.currentTimeMillis();
+ System.out.println("Long polling starts at: " + longPollingStart);
state = client.getOperationStatus(ophandle);
+ longPollingEnd = System.currentTimeMillis();
+ System.out.println("Long polling ends at: " + longPollingEnd);
System.out.println("Polling: " + ophandle + " count=" + (++count)
+ " state=" + state);
@@ -211,7 +220,11 @@ public void testExecuteStatementAsync() throws Exception {
|| state == OperationState.FINISHED || state == OperationState.ERROR) {
break;
}
- Thread.sleep(1000);
+ else {
+ // Verify that getOperationStatus returned only after the long polling timeout
+ longPollingTimeDelta = longPollingEnd - longPollingStart;
+ assertTrue(longPollingTimeDelta - longPollingTimeout > 0);
+ }
}
assertEquals("Query should be finished",
OperationState.FINISHED, client.getOperationStatus(ophandle));