Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -743,6 +743,11 @@
// Number of seconds HiveServer2 shutdown will wait for async threads to terminate
HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10),
+ // Time in milliseconds that HiveServer2 will wait,
+ // before responding to calls that use long polling
+ // Example: SQLOperation#getState
+ HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", (long) 500),
+
HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000),
HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", ""),
HIVE_SERVER2_THRIFT_SASL_QOP("hive.server2.thrift.sasl.qop", "auth"),
Index: conf/hive-default.xml.template
===================================================================
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1872,6 +1872,13 @@
+ hive.server2.long.polling.timeout
+ 500
+ Time (in milliseconds) that HiveServer2 will wait,
+ before responding to calls that use long polling
+
+
+
hive.server2.thrift.port
10000
Port number of HiveServer2 Thrift interface.
Index: service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
===================================================================
--- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -25,10 +25,15 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
@@ -206,6 +211,36 @@
return resultSchema;
}
+ @Override
+ public OperationState getState() {
+ // For an async call, block till LONG_POLLING_TIMEOUT expires or
+ // background thread generates results, whichever happens first
+ if (shouldRunAsync()) {
+ long timeout = getParentSession().getHiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ try {
+ getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e) {
+ // No Op, return to the caller since long polling timeout has expired
+ LOG.info("Long polling timed out");
+ }
+ catch (CancellationException e) {
+ // The background thread has been cancelled
+ LOG.warn("The background operation was cancelled", e);
+ }
+ catch (ExecutionException e) {
+ // The background thread was aborted
+ LOG.error("The background operation was aborted", e);
+ }
+ catch (InterruptedException e) {
+ // No op, this thread was interrupted
+ // In this case, the call might return sooner than long polling timeout
+ LOG.info("The polling thread was interrupted", e);
+ }
+ }
+ return super.getState();
+ }
+
@Override
public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
@@ -317,4 +352,9 @@
return runAsync;
}
+ private Future> getBackgroundHandle() {
+ return backgroundHandle;
+ }
+
+
}
Index: service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
===================================================================
--- service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -176,7 +176,6 @@
|| state == OperationState.FINISHED || state == OperationState.ERROR) {
break;
}
- Thread.sleep(1000);
}
assertEquals("Query should return an error state",
OperationState.ERROR, client.getOperationStatus(ophandle));
@@ -193,15 +192,18 @@
System.out.println("Polling timed out");
break;
}
+
+ System.out.println("Long polling starts at: " + System.currentTimeMillis());
state = client.getOperationStatus(ophandle);
+ System.out.println("Long polling ends at: " + System.currentTimeMillis());
+
System.out.println("Polling: " + ophandle + " count=" + (++count)
+ " state=" + state);
if (OperationState.CANCELED == state || state == OperationState.CLOSED
|| state == OperationState.FINISHED || state == OperationState.ERROR) {
break;
}
- Thread.sleep(1000);
}
assertEquals("Query should be finished",
OperationState.FINISHED, client.getOperationStatus(ophandle));