diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4f32390..eebd49a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -763,6 +763,9 @@ // Number of seconds HiveServer2 shutdown will wait for async threads to terminate HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10L), + // Time in milliseconds that HiveServer2 will wait, + // before responding to asynchronous calls that use long polling + HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000L), // HiveServer2 auth configuration HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE", diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index fe7141e..28d2fe9 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -1904,11 +1904,18 @@ hive.server2.async.exec.shutdown.timeout 10 - Time (in seconds) for which HiveServer2 shutdown will wait for async + Time (in seconds) for which HiveServer2 shutdown will wait for async threads to terminate + hive.server2.long.polling.timeout + 5000 + Time (in milliseconds) that HiveServer2 will wait, + before responding to calls that use long polling + + + hive.server2.thrift.port 10000 Port number of HiveServer2 Thrift interface. diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 4ee1b74..300c0fb 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -25,10 +25,15 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -207,6 +212,31 @@ public TableSchema getResultSetSchema() throws HiveSQLException { return resultSchema; } + @Override + public OperationState getState() { + // For an async call, block till LONG_POLLING_TIMEOUT expires or + // background operation thread generates results, whichever happens first + if (shouldRunAsync()) { + long timeout = getParentSession().getHiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + try { + getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // No Op, return to the caller since long polling timeout has expired + LOG.trace(getHandle() + ": Long polling timed out"); + } catch (CancellationException e) { + // The background operation thread was cancelled + LOG.trace(getHandle() + ": The background operation was cancelled", e); + } catch (ExecutionException e) { + // The background operation thread was aborted + LOG.trace(getHandle() + ": The background operation was aborted", e); + } catch (InterruptedException e) { + // No op, this thread was interrupted + // In this case, the call might return sooner than long polling timeout + } + } + return super.getState(); + } + @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { @@ -318,6 +348,10 @@ private boolean shouldRunAsync() { return runAsync; } + private Future getBackgroundHandle() { + return backgroundHandle; + } + /** * If there are query specific settings to overlay, then create a copy of config * There are two cases we need to clone the session config that's being passed to hive driver @@ -345,5 +379,4 @@ private HiveConf getConfigForOperation() throws HiveSQLException { } return sqlOperationConf; } - } diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index cd9d99a..0c7755a 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -21,12 +21,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -37,258 +39,306 @@ */ public abstract class CLIServiceTest { - protected static CLIServiceClient client; - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - } - - /** - * @throws java.lang.Exception - */ - @After - public void tearDown() throws Exception { - } - - @Test - public void openSessionTest() throws Exception { - SessionHandle sessionHandle = client - .openSession("tom", "password", Collections.emptyMap()); - assertNotNull(sessionHandle); - client.closeSession(sessionHandle); - - sessionHandle = client.openSession("tom", "password"); - assertNotNull(sessionHandle); - client.closeSession(sessionHandle); - } - - @Test - public void getFunctionsTest() throws Exception { - SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); - assertNotNull(sessionHandle); - OperationHandle opHandle = client.getFunctions(sessionHandle, null, null, "*"); - TableSchema schema = client.getResultSetMetadata(opHandle); - - ColumnDescriptor columnDesc = schema.getColumnDescriptorAt(0); - assertEquals("FUNCTION_CAT", columnDesc.getName()); - assertEquals(Type.STRING_TYPE, columnDesc.getType()); - - columnDesc = schema.getColumnDescriptorAt(1); - assertEquals("FUNCTION_SCHEM", columnDesc.getName()); - assertEquals(Type.STRING_TYPE, columnDesc.getType()); - - columnDesc = schema.getColumnDescriptorAt(2); - assertEquals("FUNCTION_NAME", columnDesc.getName()); - assertEquals(Type.STRING_TYPE, columnDesc.getType()); - - columnDesc = schema.getColumnDescriptorAt(3); - assertEquals("REMARKS", columnDesc.getName()); - assertEquals(Type.STRING_TYPE, columnDesc.getType()); - - columnDesc = schema.getColumnDescriptorAt(4); - assertEquals("FUNCTION_TYPE", columnDesc.getName()); - assertEquals(Type.INT_TYPE, columnDesc.getType()); - - columnDesc = schema.getColumnDescriptorAt(5); - assertEquals("SPECIFIC_NAME", columnDesc.getName()); - assertEquals(Type.STRING_TYPE, columnDesc.getType()); - - client.closeOperation(opHandle); - client.closeSession(sessionHandle); - } - - @Test - public void getInfoTest() throws Exception { - SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); - assertNotNull(sessionHandle); - - GetInfoValue value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_NAME); - System.out.println(value.getStringValue()); - - value = client.getInfo(sessionHandle, GetInfoType.CLI_SERVER_NAME); - System.out.println(value.getStringValue()); - - value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_VER); - System.out.println(value.getStringValue()); - - client.closeSession(sessionHandle); - } - - @Test - public void testExecuteStatement() throws Exception { - HashMap confOverlay = new HashMap(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); - assertNotNull(sessionHandle); - - // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Drop the table if it exists - queryString = "DROP TABLE IF EXISTS TEST_EXEC"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Create a test table - queryString = "CREATE TABLE TEST_EXEC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Blocking execute - queryString = "SELECT ID FROM TEST_EXEC"; - OperationHandle ophandle = client.executeStatement(sessionHandle, queryString, confOverlay); - - // Expect query to be completed now - assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(ophandle)); - } - - @Test - public void testExecuteStatementAsync() throws Exception { - HashMap confOverlay = new HashMap(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); - // Timeout for the poll in case of asynchronous execute - long pollTimeout = System.currentTimeMillis() + 100000; - assertNotNull(sessionHandle); - OperationState state = null; - OperationHandle ophandle; - - // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Drop the table if it exists - queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Create a test table - queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Test async execution response when query is malformed - String wrongQueryString = "SELECT NAME FROM TEST_EXEC"; - ophandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); - - int count = 0; - while (true) { - // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { - System.out.println("Polling timed out"); - break; - } - state = client.getOperationStatus(ophandle); - System.out.println("Polling: " + ophandle + " count=" + (++count) - + " state=" + state); - - if (OperationState.CANCELED == state || state == OperationState.CLOSED - || state == OperationState.FINISHED || state == OperationState.ERROR) { - break; - } - Thread.sleep(1000); - } - assertEquals("Query should return an error state", - OperationState.ERROR, client.getOperationStatus(ophandle)); - - // Test async execution when query is well formed - queryString = "SELECT ID FROM TEST_EXEC_ASYNC"; - ophandle = - client.executeStatementAsync(sessionHandle, queryString, confOverlay); - - count = 0; - while (true) { - // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { - System.out.println("Polling timed out"); - break; - } - state = client.getOperationStatus(ophandle); - System.out.println("Polling: " + ophandle + " count=" + (++count) - + " state=" + state); - - if (OperationState.CANCELED == state || state == OperationState.CLOSED - || state == OperationState.FINISHED || state == OperationState.ERROR) { - break; - } - Thread.sleep(1000); - } - assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(ophandle)); - - // Cancellation test - ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); - System.out.println("cancelling " + ophandle); - client.cancelOperation(ophandle); - state = client.getOperationStatus(ophandle); - System.out.println(ophandle + " after cancelling, state= " + state); - assertEquals("Query should be cancelled", OperationState.CANCELED, state); - } - - /** - * Test per statement configuration overlay. - * Create a table using hiveconf: var substitution, with the conf var passed - * via confOverlay.Verify the confOverlay works for the query and does set the - * value in the session configuration - * @throws Exception - */ - @Test - public void testConfOverlay() throws Exception { - SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); - assertNotNull(sessionHandle); - String tabName = "TEST_CONF_EXEC"; - String tabNameVar = "tabNameVar"; - - String setLockMgr = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname - + " = false"; - OperationHandle opHandle = client.executeStatement(sessionHandle, setLockMgr, null); - client.closeOperation(opHandle); - - String dropTable = "DROP TABLE IF EXISTS " + tabName; - opHandle = client.executeStatement(sessionHandle, dropTable, null); - client.closeOperation(opHandle); - - // set a pass a property to operation and check if its set the query config - Map confOverlay = new HashMap(); - confOverlay.put(tabNameVar, tabName); - - // execute statement with the conf overlay - String createTab = "CREATE TABLE ${hiveconf:" + tabNameVar + "} (id int)"; - opHandle = client.executeStatement(sessionHandle, createTab, confOverlay); - assertNotNull(opHandle); - // query should pass and create the table - assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(opHandle)); - client.closeOperation(opHandle); - - // select from the new table should pass - String selectTab = "SELECT * FROM " + tabName; - opHandle = client.executeStatement(sessionHandle, selectTab, null); - assertNotNull(opHandle); - // query should pass and create the table - assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(opHandle)); - client.closeOperation(opHandle); - - // the settings in confoverly should not be part of session config - // another query referring that property with the conf overlay should fail - selectTab = "SELECT * FROM ${hiveconf:" + tabNameVar + "}"; - try { - opHandle = client.executeStatement(sessionHandle, selectTab, null); - fail("Query should fail"); - } catch (HiveSQLException e) { - // Expected exception - } - - // cleanup - dropTable = "DROP TABLE IF EXISTS " + tabName; - opHandle = client.executeStatement(sessionHandle, dropTable, null); - client.closeOperation(opHandle); - - - client.closeSession(sessionHandle); - } + protected static CLIServiceClient client; + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + } + + @Test + public void testOpenSession() throws Exception { + SessionHandle sessionHandle = client + .openSession("tom", "password", Collections.emptyMap()); + assertNotNull(sessionHandle); + client.closeSession(sessionHandle); + + sessionHandle = client.openSession("tom", "password"); + assertNotNull(sessionHandle); + client.closeSession(sessionHandle); + } + + @Test + public void testGetFunctions() throws Exception { + SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); + assertNotNull(sessionHandle); + OperationHandle opHandle = client.getFunctions(sessionHandle, null, null, "*"); + TableSchema schema = client.getResultSetMetadata(opHandle); + + ColumnDescriptor columnDesc = schema.getColumnDescriptorAt(0); + assertEquals("FUNCTION_CAT", columnDesc.getName()); + assertEquals(Type.STRING_TYPE, columnDesc.getType()); + + columnDesc = schema.getColumnDescriptorAt(1); + assertEquals("FUNCTION_SCHEM", columnDesc.getName()); + assertEquals(Type.STRING_TYPE, columnDesc.getType()); + + columnDesc = schema.getColumnDescriptorAt(2); + assertEquals("FUNCTION_NAME", columnDesc.getName()); + assertEquals(Type.STRING_TYPE, columnDesc.getType()); + + columnDesc = schema.getColumnDescriptorAt(3); + assertEquals("REMARKS", columnDesc.getName()); + assertEquals(Type.STRING_TYPE, columnDesc.getType()); + + columnDesc = schema.getColumnDescriptorAt(4); + assertEquals("FUNCTION_TYPE", columnDesc.getName()); + assertEquals(Type.INT_TYPE, columnDesc.getType()); + + columnDesc = schema.getColumnDescriptorAt(5); + assertEquals("SPECIFIC_NAME", columnDesc.getName()); + assertEquals(Type.STRING_TYPE, columnDesc.getType()); + + client.closeSession(sessionHandle); + } + + @Test + public void testGetInfo() throws Exception { + SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); + assertNotNull(sessionHandle); + + GetInfoValue value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_NAME); + System.out.println(value.getStringValue()); + + value = client.getInfo(sessionHandle, GetInfoType.CLI_SERVER_NAME); + System.out.println(value.getStringValue()); + + value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_VER); + System.out.println(value.getStringValue()); + + client.closeSession(sessionHandle); + } + + /** + * Test the blocking execution of a query + * @throws Exception + */ + @Test + public void testExecuteStatement() throws Exception { + // Set up the test table + Map confOverlay = new HashMap(); + String tableName = "TEST_EXEC"; + String columnDefinitions = "(ID STRING)"; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); + assertNotNull(sessionHandle); + + // Blocking execute + String queryString = "SELECT ID FROM " + tableName; + OperationHandle ophandle = client.executeStatement(sessionHandle, queryString, confOverlay); + + // Expect query to be completed now + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(ophandle)); + + // Cleanup + queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeSession(sessionHandle); + } + + + /** + * Test async execution of a well-formed and a malformed query with different long polling durations + * 1. Test malformed query with default long polling timeout + * 2. Test well-formed query with default long polling timeout + * 3. Test well-formed query with long polling timeout set to 0 + * 4. Test well-formed query with long polling timeout set to 500 millis + * 5. Test well-formed query cancellation + * @throws Exception + */ + @Test + public void testExecuteStatementAsync() throws Exception { + // Set up the test table + Map confOverlay = new HashMap(); + String tableName = "TEST_EXEC_ASYNC"; + String columnDefinitions = "(ID STRING)"; + String queryString; + OperationState state = null; + OperationHandle ophandle; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); + assertNotNull(sessionHandle); + + // Set longPollingTimeout to a custom value for different test cases + long longPollingTimeout; + + // 1. Execute a malformed async query with default config + longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); + + // 2. Execute an async query with default config + longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + // 3. Execute an async query with long polling timeout set to 0 + longPollingTimeout = 0; + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + // 4. Execute an async query with long polling timeout set to 500 millis + longPollingTimeout = 500; + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + // 5. Cancellation test + queryString = "SELECT ID FROM " + tableName; + ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + System.out.println("Cancelling " + ophandle); + client.cancelOperation(ophandle); + state = client.getOperationStatus(ophandle); + System.out.println(ophandle + " after cancelling, state= " + state); + assertEquals("Query should be cancelled", OperationState.CANCELED, state); + + // Cleanup + queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeSession(sessionHandle); + } + + /** + * Test per statement configuration overlay. + * Create a table using hiveconf: var substitution, with the conf var passed + * via confOverlay.Verify the confOverlay works for the query and does set the + * value in the session configuration + * @throws Exception + */ + @Test + public void testConfOverlay() throws Exception { + SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); + assertNotNull(sessionHandle); + String tabName = "TEST_CONF_EXEC"; + String tabNameVar = "tabNameVar"; + + String setLockMgr = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + OperationHandle opHandle = client.executeStatement(sessionHandle, setLockMgr, null); + client.closeOperation(opHandle); + + String dropTable = "DROP TABLE IF EXISTS " + tabName; + opHandle = client.executeStatement(sessionHandle, dropTable, null); + client.closeOperation(opHandle); + + // set a pass a property to operation and check if its set the query config + Map confOverlay = new HashMap(); + confOverlay.put(tabNameVar, tabName); + + // execute statement with the conf overlay + String createTab = "CREATE TABLE ${hiveconf:" + tabNameVar + "} (id int)"; + opHandle = client.executeStatement(sessionHandle, createTab, confOverlay); + assertNotNull(opHandle); + // query should pass and create the table + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(opHandle)); + client.closeOperation(opHandle); + + // select from the new table should pass + String selectTab = "SELECT * FROM " + tabName; + opHandle = client.executeStatement(sessionHandle, selectTab, null); + assertNotNull(opHandle); + // query should pass and create the table + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(opHandle)); + client.closeOperation(opHandle); + + // the settings in confoverly should not be part of session config + // another query referring that property with the conf overlay should fail + selectTab = "SELECT * FROM ${hiveconf:" + tabNameVar + "}"; + try { + opHandle = client.executeStatement(sessionHandle, selectTab, null); + fail("Query should fail"); + } catch (HiveSQLException e) { + // Expected exception + } + + // cleanup + dropTable = "DROP TABLE IF EXISTS " + tabName; + opHandle = client.executeStatement(sessionHandle, dropTable, null); + client.closeOperation(opHandle); + + + client.closeSession(sessionHandle); + } + + /** + * Sets up a test specific table with the given column definitions and config + * @param tableName + * @param columnDefinitions + * @param confOverlay + * @throws Exception + */ + private SessionHandle setupTestData(String tableName, String columnDefinitions, + Map confOverlay) throws Exception { + SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay); + assertNotNull(sessionHandle); + + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Create a test table + queryString = "CREATE TABLE " + tableName + columnDefinitions; + client.executeStatement(sessionHandle, queryString, confOverlay); + + return sessionHandle; + } + + private void runQueryAsync(SessionHandle sessionHandle, String queryString, + Map confOverlay, OperationState expectedState, + long longPollingTimeout) throws Exception { + // Timeout for the iteration in case of asynchronous execute + long testIterationTimeout = System.currentTimeMillis() + 100000; + long longPollingStart; + long longPollingEnd; + long longPollingTimeDelta; + OperationState state = null; + confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout)); + OperationHandle ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + int count = 0; + while (true) { + // Break if iteration times out + if (System.currentTimeMillis() > testIterationTimeout) { + System.out.println("Polling timed out"); + break; + } + longPollingStart = System.currentTimeMillis(); + System.out.println("Long polling starts at: " + longPollingStart); + state = client.getOperationStatus(ophandle); + longPollingEnd = System.currentTimeMillis(); + System.out.println("Long polling ends at: " + longPollingEnd); + + System.out.println("Polling: " + ophandle + " count=" + (++count) + + " state=" + state); + + if (state == OperationState.CANCELED || + state == OperationState.CLOSED || + state == OperationState.FINISHED || + state == OperationState.ERROR) { + break; + } else { + // Verify that getOperationStatus returned only after the long polling timeout + longPollingTimeDelta = longPollingEnd - longPollingStart; + // Scale down by a factor of 0.9 to account for approximate values + assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0); + } + } + assertEquals(expectedState, client.getOperationStatus(ophandle)); + } }