diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a182cd7..e2dcd3d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -815,6 +815,9 @@
// will wait for a new task to arrive before terminating
HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10),
+ // Time in milliseconds that HiveServer2 will wait,
+ // before responding to asynchronous calls that use long polling
+ HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000L),
// HiveServer2 auth configuration
HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE",
diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template
index 0d08aa2..5e638c5 100644
--- a/conf/hive-default.xml.template
+++ b/conf/hive-default.xml.template
@@ -2111,6 +2111,12 @@
+ hive.server2.long.polling.timeout
+ 5000L
+ Time in milliseconds that HiveServer2 will wait, before responding to asynchronous calls that use long polling
+
+
+
hive.server2.async.exec.wait.queue.size
100
Size of the wait queue for async thread pool in HiveServer2.
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index 56b357a..2b1e712 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -21,6 +21,10 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.security.auth.login.LoginException;
@@ -38,6 +42,7 @@
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.operation.Operation;
import org.apache.hive.service.cli.session.SessionManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
@@ -123,7 +128,7 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, Str
public SessionHandle openSessionWithImpersonation(TProtocolVersion protocol, String username,
String password, Map configuration, String delegationToken)
- throws HiveSQLException {
+ throws HiveSQLException {
SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, configuration,
true, delegationToken);
LOG.debug(sessionHandle + ": openSession()");
@@ -146,9 +151,9 @@ public SessionHandle openSession(String username, String password, Map configuration,
- String delegationToken) throws HiveSQLException {
+ String delegationToken) throws HiveSQLException {
SessionHandle sessionHandle = sessionManager.openSession(SERVER_VERSION, username, password, configuration,
- true, delegationToken);
+ true, delegationToken);
LOG.debug(sessionHandle + ": openSession()");
return sessionHandle;
}
@@ -297,8 +302,33 @@ public OperationHandle getFunctions(SessionHandle sessionHandle,
@Override
public OperationStatus getOperationStatus(OperationHandle opHandle)
throws HiveSQLException {
- OperationStatus opStatus = sessionManager.getOperationManager()
- .getOperationStatus(opHandle);
+ Operation operation = sessionManager.getOperationManager().getOperation(opHandle);
+ /**
+ * If this is a background operation run asynchronously,
+ * we block for a configured duration, before we return
+ * (duration: HIVE_SERVER2_LONG_POLLING_TIMEOUT).
+ * However, if the background operation is complete, we return immediately.
+ */
+ if (operation.shouldRunAsync()) {
+ long timeout = operation.getParentSession().getHiveConf().getLongVar(
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ try {
+ operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // No Op, return to the caller since long polling timeout has expired
+ LOG.trace(opHandle + ": Long polling timed out");
+ } catch (CancellationException e) {
+ // The background operation thread was cancelled
+ LOG.trace(opHandle + ": The background operation was cancelled", e);
+ } catch (ExecutionException e) {
+ // The background operation thread was aborted
+ LOG.trace(opHandle + ": The background operation was aborted", e);
+ } catch (InterruptedException e) {
+ // No op, this thread was interrupted
+ // In this case, the call might return sooner than long polling timeout
+ }
+ }
+ OperationStatus opStatus = operation.getStatus();
LOG.debug(opHandle + ": getOperationStatus()");
return opStatus;
}
@@ -310,7 +340,7 @@ public OperationStatus getOperationStatus(OperationHandle opHandle)
public void cancelOperation(OperationHandle opHandle)
throws HiveSQLException {
sessionManager.getOperationManager().getOperation(opHandle)
- .getParentSession().cancelOperation(opHandle);
+ .getParentSession().cancelOperation(opHandle);
LOG.debug(opHandle + ": cancelOperation()");
}
@@ -321,7 +351,7 @@ public void cancelOperation(OperationHandle opHandle)
public void closeOperation(OperationHandle opHandle)
throws HiveSQLException {
sessionManager.getOperationManager().getOperation(opHandle)
- .getParentSession().closeOperation(opHandle);
+ .getParentSession().closeOperation(opHandle);
LOG.debug(opHandle + ": closeOperation");
}
diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
index e973f83..89f2ae9 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
@@ -19,16 +19,12 @@
+import java.sql.SQLException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
-import java.sql.SQLException;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
-import org.apache.hadoop.hive.ql.processors.HiveCommand;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationType;
import org.apache.hive.service.cli.session.HiveSession;
@@ -37,8 +33,9 @@
protected String statement = null;
protected Map confOverlay = new HashMap();
- public ExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay) {
- super(parentSession, OperationType.EXECUTE_STATEMENT);
+ public ExecuteStatementOperation(HiveSession parentSession, String statement,
+ Map confOverlay, boolean runInBackground) {
+ super(parentSession, OperationType.EXECUTE_STATEMENT, runInBackground);
this.statement = statement;
setConfOverlay(confOverlay);
}
@@ -49,7 +46,7 @@ public String getStatement() {
public static ExecuteStatementOperation newExecuteStatementOperation(
HiveSession parentSession, String statement, Map confOverlay, boolean runAsync)
- throws HiveSQLException {
+ throws HiveSQLException {
String[] tokens = statement.trim().split("\\s+");
CommandProcessor processor = null;
try {
diff --git a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
index c6e1692..f708650 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
@@ -60,7 +60,7 @@
protected HiveCommandOperation(HiveSession parentSession, String statement,
CommandProcessor commandProcessor, Map confOverlay) {
- super(parentSession, statement, confOverlay);
+ super(parentSession, statement, confOverlay, false);
this.commandProcessor = commandProcessor;
setupSessionIO(parentSession.getSessionState());
}
diff --git a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
index 8dc82ab..e0d17a1 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
@@ -35,7 +35,7 @@
private static final char SEARCH_STRING_ESCAPE = '\\';
protected MetadataOperation(HiveSession parentSession, OperationType opType) {
- super(parentSession, opType);
+ super(parentSession, opType, false);
setHasResultSet(true);
}
@@ -85,15 +85,15 @@ protected String convertSchemaPattern(final String pattern) {
* format '.*' This is driven by the datanucleusFormat flag.
*/
private String convertPattern(final String pattern, boolean datanucleusFormat) {
- String wStr;
- if (datanucleusFormat) {
- wStr = "*";
- } else {
- wStr = ".*";
- }
- return pattern
- .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
- .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".");
+ String wStr;
+ if (datanucleusFormat) {
+ wStr = "*";
+ } else {
+ wStr = ".*";
+ }
+ return pattern
+ .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
+ .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".");
}
}
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 58a28b6..3f36e2d 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -18,6 +18,7 @@
package org.apache.hive.service.cli.operation;
import java.util.EnumSet;
+import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,16 +44,31 @@
public static final long DEFAULT_FETCH_MAX_ROWS = 100;
protected boolean hasResultSet;
protected volatile HiveSQLException operationException;
+ protected final boolean runAsync;
+ protected volatile Future> backgroundHandle;
protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
-
- protected Operation(HiveSession parentSession, OperationType opType) {
+
+ protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
super();
this.parentSession = parentSession;
+ this.runAsync = runInBackground;
this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
}
+ public Future> getBackgroundHandle() {
+ return backgroundHandle;
+ }
+
+ protected void setBackgroundHandle(Future> backgroundHandle) {
+ this.backgroundHandle = backgroundHandle;
+ }
+
+ public boolean shouldRunAsync() {
+ return runAsync;
+ }
+
public void setConfiguration(HiveConf configuration) {
this.configuration = new HiveConf(configuration);
}
@@ -160,7 +176,7 @@ protected void validateFetchOrientation(FetchOrientation orientation,
EnumSet supportedOrientations) throws HiveSQLException {
if (!supportedOrientations.contains(orientation)) {
throw new HiveSQLException("The fetch type " + orientation.toString() +
- " is not supported for this resultset", "HY106");
+ " is not supported for this resultset", "HY106");
}
}
}
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 03a37c8..ace791a 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -66,15 +66,12 @@
private TableSchema resultSchema = null;
private Schema mResultSchema = null;
private SerDe serde = null;
- private final boolean runAsync;
- private volatile Future> backgroundHandle;
private boolean fetchStarted = false;
public SQLOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runInBackground) {
// TODO: call setRemoteUser in ExecuteStatementOperation or higher.
- super(parentSession, statement, confOverlay);
- this.runAsync = runInBackground;
+ super(parentSession, statement, confOverlay, runInBackground);
}
/***
@@ -159,7 +156,7 @@ private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException {
public void run() throws HiveSQLException {
setState(OperationState.PENDING);
prepare(getConfigForOperation());
- if (!runAsync) {
+ if (!shouldRunAsync()) {
runInternal(getConfigForOperation());
} else {
Runnable backgroundOperation = new Runnable() {
@@ -177,8 +174,9 @@ public void run() {
};
try {
// This submit blocks if no background threads are available to run this operation
- backgroundHandle =
+ Future> backgroundHandle =
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
+ setBackgroundHandle(backgroundHandle);
} catch (RejectedExecutionException rejected) {
setState(OperationState.ERROR);
throw new HiveSQLException("All the asynchronous threads are currently busy, " +
@@ -189,7 +187,8 @@ public void run() {
private void cleanup(OperationState state) throws HiveSQLException {
setState(state);
- if (runAsync) {
+ if (shouldRunAsync()) {
+ Future> backgroundHandle = getBackgroundHandle();
if (backgroundHandle != null) {
backgroundHandle.cancel(true);
}
@@ -349,7 +348,7 @@ private SerDe getSerDe() throws SQLException {
*/
private HiveConf getConfigForOperation() throws HiveSQLException {
HiveConf sqlOperationConf = getParentSession().getHiveConf();
- if (!getConfOverlay().isEmpty() || runAsync) {
+ if (!getConfOverlay().isEmpty() || shouldRunAsync()) {
// clone the partent session config for this query
sqlOperationConf = new HiveConf(sqlOperationConf);
@@ -364,5 +363,4 @@ private HiveConf getConfigForOperation() throws HiveSQLException {
}
return sqlOperationConf;
}
-
}
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index 8ec8d43..9d06aa7 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -22,14 +22,13 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -57,7 +56,7 @@ public void tearDown() throws Exception {
}
@Test
- public void openSessionTest() throws Exception {
+ public void testOpenSession() throws Exception {
SessionHandle sessionHandle = client.openSession(
"tom", "password", Collections.emptyMap());
assertNotNull(sessionHandle);
@@ -69,7 +68,7 @@ public void openSessionTest() throws Exception {
}
@Test
- public void getFunctionsTest() throws Exception {
+ public void testGetFunctions() throws Exception {
SessionHandle sessionHandle = client.openSession("tom", "password");
assertNotNull(sessionHandle);
@@ -106,7 +105,7 @@ public void getFunctionsTest() throws Exception {
}
@Test
- public void getInfoTest() throws Exception {
+ public void testGetInfo() throws Exception {
SessionHandle sessionHandle = client.openSession(
"tom", "password", Collections.emptyMap());
assertNotNull(sessionHandle);
@@ -123,6 +122,10 @@ public void getInfoTest() throws Exception {
client.closeSession(sessionHandle);
}
+ /**
+ * Test the blocking execution of a query
+ * @throws Exception
+ */
@Test
public void testExecuteStatement() throws Exception {
HashMap confOverlay = new HashMap();
@@ -161,113 +164,171 @@ public void testExecuteStatement() throws Exception {
client.closeSession(sessionHandle);
}
+ /**
+ * Test async execution of a well-formed and a malformed query with different long polling durations
+ * - Test malformed query with default long polling timeout
+ * - Test well-formed query with default long polling timeout
+ * - Test well-formed query with long polling timeout set to 0
+ * - Test well-formed query with long polling timeout set to 500 millis
+ * - Test well-formed query cancellation
+ * @throws Exception
+ */
@Test
public void testExecuteStatementAsync() throws Exception {
- HashMap confOverlay = new HashMap();
- SessionHandle sessionHandle = client.openSession("tom", "password",
- new HashMap());
- // Timeout for the poll in case of asynchronous execute
- long pollTimeout = System.currentTimeMillis() + 100000;
+ Map confOverlay = new HashMap();
+ String tableName = "TEST_EXEC_ASYNC";
+ String columnDefinitions = "(ID STRING)";
+ String queryString;
+
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
assertNotNull(sessionHandle);
+
OperationState state = null;
OperationHandle opHandle;
OperationStatus opStatus = null;
// Change lock manager, otherwise unit-test doesn't go through
- String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ " = false";
opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
client.closeOperation(opHandle);
- // Drop the table if it exists
- queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC";
- opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
- client.closeOperation(opHandle);
+ // Set longPollingTimeout to a custom value for different test cases
+ long longPollingTimeout;
- // Create a test table
- queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)";
- opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
- client.closeOperation(opHandle);
-
- // Test async execution response when query is malformed
- // Compile time error
- // This query will error out during compilation (which is done synchronous as of now)
- String wrongQueryString = "SELECT NON_EXISTANT_COLUMN FROM TEST_EXEC_ASYNC";
+ /**
+ * Execute a malformed async query with default config,
+ * to give a compile time error.
+ * (compilation is done synchronous as of now)
+ */
+ longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
try {
- opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay);
- fail("Async syntax excution should fail");
- } catch (HiveSQLException e) {
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
+ }
+ catch (HiveSQLException e) {
// expected error
}
-
-
- // Runtime error
- wrongQueryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://fooNN:10000/a/b/c'";
- opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay);
-
- int count = 0;
- while (true) {
- // Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
- System.out.println("Polling timed out");
- break;
- }
- opStatus = client.getOperationStatus(opHandle);
- state = opStatus.getState();
- System.out.println("Polling: " + opHandle + " count=" + (++count)
- + " state=" + state);
- if (state == OperationState.CANCELED || state == OperationState.CLOSED
- || state == OperationState.FINISHED || state == OperationState.ERROR) {
- break;
- }
- Thread.sleep(1000);
- }
- assertEquals("Operation should be in error state", OperationState.ERROR, state);
+ /**
+ * Execute a malformed async query with default config,
+ * to give a runtime time error.
+ * Also check that the sqlState and errorCode should be set
+ */
+ queryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://localhost:10000/a/b/c'";
+ opStatus = runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
// sqlState, errorCode should be set
assertEquals(opStatus.getOperationException().getSQLState(), "08S01");
assertEquals(opStatus.getOperationException().getErrorCode(), 1);
- client.closeOperation(opHandle);
-
- // Test async execution when query is well formed
- queryString = "SELECT ID FROM TEST_EXEC_ASYNC";
+ /**
+ * Execute an async query with default config
+ */
+ queryString = "SELECT ID FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+ /**
+ * Execute an async query with long polling timeout set to 0
+ */
+ longPollingTimeout = 0;
+ queryString = "SELECT ID FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+ /**
+ * Execute an async query with long polling timeout set to 500 millis
+ */
+ longPollingTimeout = 500;
+ queryString = "SELECT ID FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+ /**
+ * Cancellation test
+ */
+ queryString = "SELECT ID FROM " + tableName;
opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
- assertTrue(opHandle.hasResultSet());
-
- count = 0;
+ System.out.println("Cancelling " + opHandle);
+ client.cancelOperation(opHandle);
+ state = client.getOperationStatus(opHandle).getState();
+ System.out.println(opHandle + " after cancelling, state= " + state);
+ assertEquals("Query should be cancelled", OperationState.CANCELED, state);
+
+ // Cleanup
+ queryString = "DROP TABLE " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+ client.closeSession(sessionHandle);
+ }
+
+ /**
+ * Sets up a test specific table with the given column definitions and config
+ * @param tableName
+ * @param columnDefinitions
+ * @param confOverlay
+ * @throws Exception
+ */
+ private SessionHandle setupTestData(String tableName, String columnDefinitions,
+ Map confOverlay) throws Exception {
+ SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay);
+ assertNotNull(sessionHandle);
+
+ String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ + " = false";
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ // Drop the table if it exists
+ queryString = "DROP TABLE IF EXISTS " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ // Create a test table
+ queryString = "CREATE TABLE " + tableName + columnDefinitions;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ return sessionHandle;
+ }
+
+ private OperationStatus runQueryAsync(SessionHandle sessionHandle, String queryString,
+ Map confOverlay, OperationState expectedState,
+ long longPollingTimeout) throws HiveSQLException {
+ // Timeout for the iteration in case of asynchronous execute
+ long testIterationTimeout = System.currentTimeMillis() + 100000;
+ long longPollingStart;
+ long longPollingEnd;
+ long longPollingTimeDelta;
+ OperationStatus opStatus = null;
+ OperationState state = null;
+ confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+ OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+ int count = 0;
while (true) {
- // Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
+ // Break if iteration times out
+ if (System.currentTimeMillis() > testIterationTimeout) {
System.out.println("Polling timed out");
break;
}
+ longPollingStart = System.currentTimeMillis();
+ System.out.println("Long polling starts at: " + longPollingStart);
opStatus = client.getOperationStatus(opHandle);
state = opStatus.getState();
+ longPollingEnd = System.currentTimeMillis();
+ System.out.println("Long polling ends at: " + longPollingEnd);
+
System.out.println("Polling: " + opHandle + " count=" + (++count)
+ " state=" + state);
- if (state == OperationState.CANCELED || state == OperationState.CLOSED
- || state == OperationState.FINISHED || state == OperationState.ERROR) {
+ if (state == OperationState.CANCELED ||
+ state == OperationState.CLOSED ||
+ state == OperationState.FINISHED ||
+ state == OperationState.ERROR) {
break;
+ } else {
+ // Verify that getOperationStatus returned only after the long polling timeout
+ longPollingTimeDelta = longPollingEnd - longPollingStart;
+ // Scale down by a factor of 0.9 to account for approximate values
+ assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0);
}
- Thread.sleep(1000);
}
- assertEquals("Query should be finished", OperationState.FINISHED, state);
- client.closeOperation(opHandle);
-
- // Cancellation test
- opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
- System.out.println("cancelling " + opHandle);
- client.cancelOperation(opHandle);
- state = client.getOperationStatus(opHandle).getState();
- System.out.println(opHandle + " after cancelling, state= " + state);
- assertEquals("Query should be cancelled", OperationState.CANCELED, state);
-
- // Cleanup
- queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC";
- opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
+ assertEquals(expectedState, client.getOperationStatus(opHandle).getState());
client.closeOperation(opHandle);
- client.closeSession(sessionHandle);
+ return opStatus;
}
/**