Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -743,6 +743,11 @@ // Number of seconds HiveServer2 shutdown will wait for async threads to terminate HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10), + // Time in milliseconds that HiveServer2 will wait, + // before responding to calls that use long polling + // Example: SQLOperation#getState + HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", (long) 500), + HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000), HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", ""), HIVE_SERVER2_THRIFT_SASL_QOP("hive.server2.thrift.sasl.qop", "auth"), Index: conf/hive-default.xml.template =================================================================== --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1872,6 +1872,13 @@ + hive.server2.long.polling.timeout + 500 + Time (in milliseconds) that HiveServer2 will wait, + before responding to calls that use long polling + + + hive.server2.thrift.port 10000 Port number of HiveServer2 Thrift interface. Index: service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -25,10 +25,15 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -206,6 +211,36 @@ return resultSchema; } + @Override + public OperationState getState() { + // For an async call, block till LONG_POLLING_TIMEOUT expires or + // background thread generates results, whichever happens first + if (shouldRunAsync()) { + long timeout = getParentSession().getHiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + try { + getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + // No Op, return to the caller since long polling timeout has expired + LOG.info("Long polling timed out"); + } + catch (CancellationException e) { + // The background thread has been cancelled + LOG.warn("The background operation was cancelled", e); + } + catch (ExecutionException e) { + // The background thread was aborted + LOG.error("The background operation was aborted", e); + } + catch (InterruptedException e) { + // No op, this thread was interrupted + // In this case, the call might return sooner than long polling timeout + LOG.info("The polling thread was interrupted", e); + } + } + return super.getState(); + } + @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { @@ -317,4 +352,9 @@ return runAsync; } + private Future getBackgroundHandle() { + return backgroundHandle; + } + + } Index: service/src/test/org/apache/hive/service/cli/CLIServiceTest.java =================================================================== --- service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -176,7 +176,6 @@ || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } - Thread.sleep(1000); } assertEquals("Query should return an error state", OperationState.ERROR, client.getOperationStatus(ophandle)); @@ -193,15 +192,18 @@ System.out.println("Polling timed out"); break; } + + System.out.println("Long polling starts at: " + System.currentTimeMillis()); state = client.getOperationStatus(ophandle); + System.out.println("Long polling ends at: " + System.currentTimeMillis()); + System.out.println("Polling: " + ophandle + " count=" + (++count) + " state=" + state); if (OperationState.CANCELED == state || state == OperationState.CLOSED || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } - Thread.sleep(1000); } assertEquals("Query should be finished", OperationState.FINISHED, client.getOperationStatus(ophandle));