diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index abfde42..baa4bd3 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -761,6 +761,10 @@ // Number of seconds HiveServer2 shutdown will wait for async threads to terminate HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10), + // Time in milliseconds that HiveServer2 will wait, + // before responding to calls that use long polling + // Example: SQLOperation#getState + HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 500), // HiveServer2 auth configuration HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE"), diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index b7234b4..d62a817 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -1909,6 +1909,13 @@ + hive.server2.long.polling.timeout + 500 + Time (in milliseconds) that HiveServer2 will wait, + before responding to calls that use long polling + + + hive.server2.thrift.port 10000 Port number of HiveServer2 Thrift interface. diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index f6adf92..d1dc951 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -25,10 +25,15 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -207,6 +212,36 @@ public TableSchema getResultSetSchema() throws HiveSQLException { return resultSchema; } + @Override + public OperationState getState() { + // For an async call, block till LONG_POLLING_TIMEOUT expires or + // background operation thread generates results, whichever happens first + if (shouldRunAsync()) { + int timeout = getParentSession().getHiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + try { + getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + // No Op, return to the caller since long polling timeout has expired + LOG.info("Long polling timed out"); + } + catch (CancellationException e) { + // The background operation thread was cancelled + LOG.warn("The background operation was cancelled", e); + } + catch (ExecutionException e) { + // The background operation thread was aborted + LOG.error("The background operation was aborted", e); + } + catch (InterruptedException e) { + // No op, this thread was interrupted + // In this case, the call might return sooner than long polling timeout + LOG.info("The polling thread was interrupted", e); + } + } + return super.getState(); + } + @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { @@ -318,4 +353,9 @@ private boolean shouldRunAsync() { return runAsync; } + private Future getBackgroundHandle() { + return backgroundHandle; + } + + } diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index d6caed1..f066e55 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -186,7 +186,6 @@ public void testExecuteStatementAsync() throws Exception { || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } - Thread.sleep(1000); } assertEquals("Query should return an error state", OperationState.ERROR, client.getOperationStatus(ophandle)); @@ -203,7 +202,11 @@ public void testExecuteStatementAsync() throws Exception { System.out.println("Polling timed out"); break; } + + System.out.println("Long polling starts at: " + System.currentTimeMillis()); state = client.getOperationStatus(ophandle); + System.out.println("Long polling ends at: " + System.currentTimeMillis()); + System.out.println("Polling: " + ophandle + " count=" + (++count) + " state=" + state); @@ -211,7 +214,6 @@ public void testExecuteStatementAsync() throws Exception { || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } - Thread.sleep(1000); } assertEquals("Query should be finished", OperationState.FINISHED, client.getOperationStatus(ophandle));