diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a182cd7..e2dcd3d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -815,6 +815,9 @@ // will wait for a new task to arrive before terminating HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10), + // Time in milliseconds that HiveServer2 will wait, + // before responding to asynchronous calls that use long polling + HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000L), // HiveServer2 auth configuration HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE", diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index 0d08aa2..5e638c5 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -2111,6 +2111,12 @@ + hive.server2.long.polling.timeout + 5000L + Time in milliseconds that HiveServer2 will wait, before responding to asynchronous calls that use long polling + + + hive.server2.async.exec.wait.queue.size 100 Size of the wait queue for async thread pool in HiveServer2. diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index 56b357a..2b1e712 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -21,6 +21,10 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.security.auth.login.LoginException; @@ -38,6 +42,7 @@ import org.apache.hive.service.CompositeService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; @@ -123,7 +128,7 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, Str public SessionHandle openSessionWithImpersonation(TProtocolVersion protocol, String username, String password, Map configuration, String delegationToken) - throws HiveSQLException { + throws HiveSQLException { SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, configuration, true, delegationToken); LOG.debug(sessionHandle + ": openSession()"); @@ -146,9 +151,9 @@ public SessionHandle openSession(String username, String password, Map configuration, - String delegationToken) throws HiveSQLException { + String delegationToken) throws HiveSQLException { SessionHandle sessionHandle = sessionManager.openSession(SERVER_VERSION, username, password, configuration, - true, delegationToken); + true, delegationToken); LOG.debug(sessionHandle + ": openSession()"); return sessionHandle; } @@ -297,8 +302,33 @@ public OperationHandle getFunctions(SessionHandle sessionHandle, @Override public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { - OperationStatus opStatus = sessionManager.getOperationManager() - .getOperationStatus(opHandle); + Operation operation = sessionManager.getOperationManager().getOperation(opHandle); + /** + * If this is a background operation run asynchronously, + * we block for a configured duration, before we return + * (duration: HIVE_SERVER2_LONG_POLLING_TIMEOUT). + * However, if the background operation is complete, we return immediately. + */ + if (operation.shouldRunAsync()) { + long timeout = operation.getParentSession().getHiveConf().getLongVar( + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + try { + operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // No Op, return to the caller since long polling timeout has expired + LOG.trace(opHandle + ": Long polling timed out"); + } catch (CancellationException e) { + // The background operation thread was cancelled + LOG.trace(opHandle + ": The background operation was cancelled", e); + } catch (ExecutionException e) { + // The background operation thread was aborted + LOG.trace(opHandle + ": The background operation was aborted", e); + } catch (InterruptedException e) { + // No op, this thread was interrupted + // In this case, the call might return sooner than long polling timeout + } + } + OperationStatus opStatus = operation.getStatus(); LOG.debug(opHandle + ": getOperationStatus()"); return opStatus; } @@ -310,7 +340,7 @@ public OperationStatus getOperationStatus(OperationHandle opHandle) public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().cancelOperation(opHandle); + .getParentSession().cancelOperation(opHandle); LOG.debug(opHandle + ": cancelOperation()"); } @@ -321,7 +351,7 @@ public void cancelOperation(OperationHandle opHandle) public void closeOperation(OperationHandle opHandle) throws HiveSQLException { sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().closeOperation(opHandle); + .getParentSession().closeOperation(opHandle); LOG.debug(opHandle + ": closeOperation"); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java index e973f83..89f2ae9 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java @@ -19,16 +19,12 @@ +import java.sql.SQLException; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; -import java.sql.SQLException; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hadoop.hive.ql.processors.CommandProcessor; -import org.apache.hadoop.hive.ql.processors.HiveCommand; +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationType; import org.apache.hive.service.cli.session.HiveSession; @@ -37,8 +33,9 @@ protected String statement = null; protected Map confOverlay = new HashMap(); - public ExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay) { - super(parentSession, OperationType.EXECUTE_STATEMENT); + public ExecuteStatementOperation(HiveSession parentSession, String statement, + Map confOverlay, boolean runInBackground) { + super(parentSession, OperationType.EXECUTE_STATEMENT, runInBackground); this.statement = statement; setConfOverlay(confOverlay); } @@ -49,7 +46,7 @@ public String getStatement() { public static ExecuteStatementOperation newExecuteStatementOperation( HiveSession parentSession, String statement, Map confOverlay, boolean runAsync) - throws HiveSQLException { + throws HiveSQLException { String[] tokens = statement.trim().split("\\s+"); CommandProcessor processor = null; try { diff --git a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java index c6e1692..f708650 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java @@ -60,7 +60,7 @@ protected HiveCommandOperation(HiveSession parentSession, String statement, CommandProcessor commandProcessor, Map confOverlay) { - super(parentSession, statement, confOverlay); + super(parentSession, statement, confOverlay, false); this.commandProcessor = commandProcessor; setupSessionIO(parentSession.getSessionState()); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java index 8dc82ab..e0d17a1 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java @@ -35,7 +35,7 @@ private static final char SEARCH_STRING_ESCAPE = '\\'; protected MetadataOperation(HiveSession parentSession, OperationType opType) { - super(parentSession, opType); + super(parentSession, opType, false); setHasResultSet(true); } @@ -85,15 +85,15 @@ protected String convertSchemaPattern(final String pattern) { * format '.*' This is driven by the datanucleusFormat flag. */ private String convertPattern(final String pattern, boolean datanucleusFormat) { - String wStr; - if (datanucleusFormat) { - wStr = "*"; - } else { - wStr = ".*"; - } - return pattern - .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr) - .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", "."); + String wStr; + if (datanucleusFormat) { + wStr = "*"; + } else { + wStr = ".*"; + } + return pattern + .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr) + .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", "."); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 58a28b6..3f36e2d 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -18,6 +18,7 @@ package org.apache.hive.service.cli.operation; import java.util.EnumSet; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,16 +44,31 @@ public static final long DEFAULT_FETCH_MAX_ROWS = 100; protected boolean hasResultSet; protected volatile HiveSQLException operationException; + protected final boolean runAsync; + protected volatile Future backgroundHandle; protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); - - protected Operation(HiveSession parentSession, OperationType opType) { + + protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) { super(); this.parentSession = parentSession; + this.runAsync = runInBackground; this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); } + public Future getBackgroundHandle() { + return backgroundHandle; + } + + protected void setBackgroundHandle(Future backgroundHandle) { + this.backgroundHandle = backgroundHandle; + } + + public boolean shouldRunAsync() { + return runAsync; + } + public void setConfiguration(HiveConf configuration) { this.configuration = new HiveConf(configuration); } @@ -160,7 +176,7 @@ protected void validateFetchOrientation(FetchOrientation orientation, EnumSet supportedOrientations) throws HiveSQLException { if (!supportedOrientations.contains(orientation)) { throw new HiveSQLException("The fetch type " + orientation.toString() + - " is not supported for this resultset", "HY106"); + " is not supported for this resultset", "HY106"); } } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 03a37c8..ace791a 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -66,15 +66,12 @@ private TableSchema resultSchema = null; private Schema mResultSchema = null; private SerDe serde = null; - private final boolean runAsync; - private volatile Future backgroundHandle; private boolean fetchStarted = false; public SQLOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runInBackground) { // TODO: call setRemoteUser in ExecuteStatementOperation or higher. - super(parentSession, statement, confOverlay); - this.runAsync = runInBackground; + super(parentSession, statement, confOverlay, runInBackground); } /*** @@ -159,7 +156,7 @@ private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException { public void run() throws HiveSQLException { setState(OperationState.PENDING); prepare(getConfigForOperation()); - if (!runAsync) { + if (!shouldRunAsync()) { runInternal(getConfigForOperation()); } else { Runnable backgroundOperation = new Runnable() { @@ -177,8 +174,9 @@ public void run() { }; try { // This submit blocks if no background threads are available to run this operation - backgroundHandle = + Future backgroundHandle = getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); + setBackgroundHandle(backgroundHandle); } catch (RejectedExecutionException rejected) { setState(OperationState.ERROR); throw new HiveSQLException("All the asynchronous threads are currently busy, " + @@ -189,7 +187,8 @@ public void run() { private void cleanup(OperationState state) throws HiveSQLException { setState(state); - if (runAsync) { + if (shouldRunAsync()) { + Future backgroundHandle = getBackgroundHandle(); if (backgroundHandle != null) { backgroundHandle.cancel(true); } @@ -349,7 +348,7 @@ private SerDe getSerDe() throws SQLException { */ private HiveConf getConfigForOperation() throws HiveSQLException { HiveConf sqlOperationConf = getParentSession().getHiveConf(); - if (!getConfOverlay().isEmpty() || runAsync) { + if (!getConfOverlay().isEmpty() || shouldRunAsync()) { // clone the partent session config for this query sqlOperationConf = new HiveConf(sqlOperationConf); @@ -364,5 +363,4 @@ private HiveConf getConfigForOperation() throws HiveSQLException { } return sqlOperationConf; } - } diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index 8ec8d43..9d06aa7 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -22,14 +22,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assert.assertTrue; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -57,7 +56,7 @@ public void tearDown() throws Exception { } @Test - public void openSessionTest() throws Exception { + public void testOpenSession() throws Exception { SessionHandle sessionHandle = client.openSession( "tom", "password", Collections.emptyMap()); assertNotNull(sessionHandle); @@ -69,7 +68,7 @@ public void openSessionTest() throws Exception { } @Test - public void getFunctionsTest() throws Exception { + public void testGetFunctions() throws Exception { SessionHandle sessionHandle = client.openSession("tom", "password"); assertNotNull(sessionHandle); @@ -106,7 +105,7 @@ public void getFunctionsTest() throws Exception { } @Test - public void getInfoTest() throws Exception { + public void testGetInfo() throws Exception { SessionHandle sessionHandle = client.openSession( "tom", "password", Collections.emptyMap()); assertNotNull(sessionHandle); @@ -123,6 +122,10 @@ public void getInfoTest() throws Exception { client.closeSession(sessionHandle); } + /** + * Test the blocking execution of a query + * @throws Exception + */ @Test public void testExecuteStatement() throws Exception { HashMap confOverlay = new HashMap(); @@ -161,113 +164,171 @@ public void testExecuteStatement() throws Exception { client.closeSession(sessionHandle); } + /** + * Test async execution of a well-formed and a malformed query with different long polling durations + * - Test malformed query with default long polling timeout + * - Test well-formed query with default long polling timeout + * - Test well-formed query with long polling timeout set to 0 + * - Test well-formed query with long polling timeout set to 500 millis + * - Test well-formed query cancellation + * @throws Exception + */ @Test public void testExecuteStatementAsync() throws Exception { - HashMap confOverlay = new HashMap(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); - // Timeout for the poll in case of asynchronous execute - long pollTimeout = System.currentTimeMillis() + 100000; + Map confOverlay = new HashMap(); + String tableName = "TEST_EXEC_ASYNC"; + String columnDefinitions = "(ID STRING)"; + String queryString; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); assertNotNull(sessionHandle); + OperationState state = null; OperationHandle opHandle; OperationStatus opStatus = null; // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + " = false"; opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); client.closeOperation(opHandle); - // Drop the table if it exists - queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; - opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); - client.closeOperation(opHandle); + // Set longPollingTimeout to a custom value for different test cases + long longPollingTimeout; - // Create a test table - queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)"; - opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); - client.closeOperation(opHandle); - - // Test async execution response when query is malformed - // Compile time error - // This query will error out during compilation (which is done synchronous as of now) - String wrongQueryString = "SELECT NON_EXISTANT_COLUMN FROM TEST_EXEC_ASYNC"; + /** + * Execute a malformed async query with default config, + * to give a compile time error. + * (compilation is done synchronous as of now) + */ + longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName; try { - opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); - fail("Async syntax excution should fail"); - } catch (HiveSQLException e) { + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); + } + catch (HiveSQLException e) { // expected error } - - - // Runtime error - wrongQueryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://fooNN:10000/a/b/c'"; - opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); - - int count = 0; - while (true) { - // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { - System.out.println("Polling timed out"); - break; - } - opStatus = client.getOperationStatus(opHandle); - state = opStatus.getState(); - System.out.println("Polling: " + opHandle + " count=" + (++count) - + " state=" + state); - if (state == OperationState.CANCELED || state == OperationState.CLOSED - || state == OperationState.FINISHED || state == OperationState.ERROR) { - break; - } - Thread.sleep(1000); - } - assertEquals("Operation should be in error state", OperationState.ERROR, state); + /** + * Execute a malformed async query with default config, + * to give a runtime time error. + * Also check that the sqlState and errorCode should be set + */ + queryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://localhost:10000/a/b/c'"; + opStatus = runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); // sqlState, errorCode should be set assertEquals(opStatus.getOperationException().getSQLState(), "08S01"); assertEquals(opStatus.getOperationException().getErrorCode(), 1); - client.closeOperation(opHandle); - - // Test async execution when query is well formed - queryString = "SELECT ID FROM TEST_EXEC_ASYNC"; + /** + * Execute an async query with default config + */ + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + /** + * Execute an async query with long polling timeout set to 0 + */ + longPollingTimeout = 0; + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + /** + * Execute an async query with long polling timeout set to 500 millis + */ + longPollingTimeout = 500; + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + /** + * Cancellation test + */ + queryString = "SELECT ID FROM " + tableName; opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); - assertTrue(opHandle.hasResultSet()); - - count = 0; + System.out.println("Cancelling " + opHandle); + client.cancelOperation(opHandle); + state = client.getOperationStatus(opHandle).getState(); + System.out.println(opHandle + " after cancelling, state= " + state); + assertEquals("Query should be cancelled", OperationState.CANCELED, state); + + // Cleanup + queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeSession(sessionHandle); + } + + /** + * Sets up a test specific table with the given column definitions and config + * @param tableName + * @param columnDefinitions + * @param confOverlay + * @throws Exception + */ + private SessionHandle setupTestData(String tableName, String columnDefinitions, + Map confOverlay) throws Exception { + SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay); + assertNotNull(sessionHandle); + + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Create a test table + queryString = "CREATE TABLE " + tableName + columnDefinitions; + client.executeStatement(sessionHandle, queryString, confOverlay); + + return sessionHandle; + } + + private OperationStatus runQueryAsync(SessionHandle sessionHandle, String queryString, + Map confOverlay, OperationState expectedState, + long longPollingTimeout) throws HiveSQLException { + // Timeout for the iteration in case of asynchronous execute + long testIterationTimeout = System.currentTimeMillis() + 100000; + long longPollingStart; + long longPollingEnd; + long longPollingTimeDelta; + OperationStatus opStatus = null; + OperationState state = null; + confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout)); + OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + int count = 0; while (true) { - // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { + // Break if iteration times out + if (System.currentTimeMillis() > testIterationTimeout) { System.out.println("Polling timed out"); break; } + longPollingStart = System.currentTimeMillis(); + System.out.println("Long polling starts at: " + longPollingStart); opStatus = client.getOperationStatus(opHandle); state = opStatus.getState(); + longPollingEnd = System.currentTimeMillis(); + System.out.println("Long polling ends at: " + longPollingEnd); + System.out.println("Polling: " + opHandle + " count=" + (++count) + " state=" + state); - if (state == OperationState.CANCELED || state == OperationState.CLOSED - || state == OperationState.FINISHED || state == OperationState.ERROR) { + if (state == OperationState.CANCELED || + state == OperationState.CLOSED || + state == OperationState.FINISHED || + state == OperationState.ERROR) { break; + } else { + // Verify that getOperationStatus returned only after the long polling timeout + longPollingTimeDelta = longPollingEnd - longPollingStart; + // Scale down by a factor of 0.9 to account for approximate values + assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0); } - Thread.sleep(1000); } - assertEquals("Query should be finished", OperationState.FINISHED, state); - client.closeOperation(opHandle); - - // Cancellation test - opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); - System.out.println("cancelling " + opHandle); - client.cancelOperation(opHandle); - state = client.getOperationStatus(opHandle).getState(); - System.out.println(opHandle + " after cancelling, state= " + state); - assertEquals("Query should be cancelled", OperationState.CANCELED, state); - - // Cleanup - queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; - opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + assertEquals(expectedState, client.getOperationStatus(opHandle).getState()); client.closeOperation(opHandle); - client.closeSession(sessionHandle); + return opStatus; } /**