diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4f32390..eebd49a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -763,6 +763,9 @@
// Number of seconds HiveServer2 shutdown will wait for async threads to terminate
HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10L),
+ // Time in milliseconds that HiveServer2 will wait,
+ // before responding to asynchronous calls that use long polling
+ HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000L),
// HiveServer2 auth configuration
HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE",
diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template
index fe7141e..28d2fe9 100644
--- a/conf/hive-default.xml.template
+++ b/conf/hive-default.xml.template
@@ -1904,11 +1904,18 @@
hive.server2.async.exec.shutdown.timeout
10
- Time (in seconds) for which HiveServer2 shutdown will wait for async
+ Time (in seconds) for which HiveServer2 shutdown will wait for async
threads to terminate
+ hive.server2.long.polling.timeout
+ 5000
+ Time (in milliseconds) that HiveServer2 will wait,
+ before responding to calls that use long polling
+
+
+
hive.server2.thrift.port
10000
Port number of HiveServer2 Thrift interface.
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 4ee1b74..300c0fb 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -25,10 +25,15 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
@@ -207,6 +212,31 @@ public TableSchema getResultSetSchema() throws HiveSQLException {
return resultSchema;
}
+ @Override
+ public OperationState getState() {
+ // For an async call, block till LONG_POLLING_TIMEOUT expires or
+ // background operation thread generates results, whichever happens first
+ if (shouldRunAsync()) {
+ long timeout = getParentSession().getHiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ try {
+ getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // No Op, return to the caller since long polling timeout has expired
+ LOG.trace(getHandle() + ": Long polling timed out");
+ } catch (CancellationException e) {
+ // The background operation thread was cancelled
+ LOG.trace(getHandle() + ": The background operation was cancelled", e);
+ } catch (ExecutionException e) {
+ // The background operation thread was aborted
+ LOG.trace(getHandle() + ": The background operation was aborted", e);
+ } catch (InterruptedException e) {
+ // No op, this thread was interrupted
+ // In this case, the call might return sooner than long polling timeout
+ }
+ }
+ return super.getState();
+ }
+
@Override
public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
@@ -318,6 +348,10 @@ private boolean shouldRunAsync() {
return runAsync;
}
+ private Future> getBackgroundHandle() {
+ return backgroundHandle;
+ }
+
/**
* If there are query specific settings to overlay, then create a copy of config
* There are two cases we need to clone the session config that's being passed to hive driver
@@ -345,5 +379,4 @@ private HiveConf getConfigForOperation() throws HiveSQLException {
}
return sqlOperationConf;
}
-
}
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index cd9d99a..0c7755a 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -21,12 +21,14 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -37,258 +39,306 @@
*/
public abstract class CLIServiceTest {
- protected static CLIServiceClient client;
-
- /**
- * @throws java.lang.Exception
- */
- @Before
- public void setUp() throws Exception {
- }
-
- /**
- * @throws java.lang.Exception
- */
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public void openSessionTest() throws Exception {
- SessionHandle sessionHandle = client
- .openSession("tom", "password", Collections.emptyMap());
- assertNotNull(sessionHandle);
- client.closeSession(sessionHandle);
-
- sessionHandle = client.openSession("tom", "password");
- assertNotNull(sessionHandle);
- client.closeSession(sessionHandle);
- }
-
- @Test
- public void getFunctionsTest() throws Exception {
- SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap());
- assertNotNull(sessionHandle);
- OperationHandle opHandle = client.getFunctions(sessionHandle, null, null, "*");
- TableSchema schema = client.getResultSetMetadata(opHandle);
-
- ColumnDescriptor columnDesc = schema.getColumnDescriptorAt(0);
- assertEquals("FUNCTION_CAT", columnDesc.getName());
- assertEquals(Type.STRING_TYPE, columnDesc.getType());
-
- columnDesc = schema.getColumnDescriptorAt(1);
- assertEquals("FUNCTION_SCHEM", columnDesc.getName());
- assertEquals(Type.STRING_TYPE, columnDesc.getType());
-
- columnDesc = schema.getColumnDescriptorAt(2);
- assertEquals("FUNCTION_NAME", columnDesc.getName());
- assertEquals(Type.STRING_TYPE, columnDesc.getType());
-
- columnDesc = schema.getColumnDescriptorAt(3);
- assertEquals("REMARKS", columnDesc.getName());
- assertEquals(Type.STRING_TYPE, columnDesc.getType());
-
- columnDesc = schema.getColumnDescriptorAt(4);
- assertEquals("FUNCTION_TYPE", columnDesc.getName());
- assertEquals(Type.INT_TYPE, columnDesc.getType());
-
- columnDesc = schema.getColumnDescriptorAt(5);
- assertEquals("SPECIFIC_NAME", columnDesc.getName());
- assertEquals(Type.STRING_TYPE, columnDesc.getType());
-
- client.closeOperation(opHandle);
- client.closeSession(sessionHandle);
- }
-
- @Test
- public void getInfoTest() throws Exception {
- SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap());
- assertNotNull(sessionHandle);
-
- GetInfoValue value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_NAME);
- System.out.println(value.getStringValue());
-
- value = client.getInfo(sessionHandle, GetInfoType.CLI_SERVER_NAME);
- System.out.println(value.getStringValue());
-
- value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_VER);
- System.out.println(value.getStringValue());
-
- client.closeSession(sessionHandle);
- }
-
- @Test
- public void testExecuteStatement() throws Exception {
- HashMap confOverlay = new HashMap();
- SessionHandle sessionHandle = client.openSession("tom", "password",
- new HashMap());
- assertNotNull(sessionHandle);
-
- // Change lock manager, otherwise unit-test doesn't go through
- String queryString = "SET hive.lock.manager=" +
- "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager";
- client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Drop the table if it exists
- queryString = "DROP TABLE IF EXISTS TEST_EXEC";
- client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Create a test table
- queryString = "CREATE TABLE TEST_EXEC(ID STRING)";
- client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Blocking execute
- queryString = "SELECT ID FROM TEST_EXEC";
- OperationHandle ophandle = client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Expect query to be completed now
- assertEquals("Query should be finished",
- OperationState.FINISHED, client.getOperationStatus(ophandle));
- }
-
- @Test
- public void testExecuteStatementAsync() throws Exception {
- HashMap confOverlay = new HashMap();
- SessionHandle sessionHandle = client.openSession("tom", "password",
- new HashMap());
- // Timeout for the poll in case of asynchronous execute
- long pollTimeout = System.currentTimeMillis() + 100000;
- assertNotNull(sessionHandle);
- OperationState state = null;
- OperationHandle ophandle;
-
- // Change lock manager, otherwise unit-test doesn't go through
- String queryString = "SET hive.lock.manager=" +
- "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager";
- client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Drop the table if it exists
- queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC";
- client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Create a test table
- queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)";
- client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Test async execution response when query is malformed
- String wrongQueryString = "SELECT NAME FROM TEST_EXEC";
- ophandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay);
-
- int count = 0;
- while (true) {
- // Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
- System.out.println("Polling timed out");
- break;
- }
- state = client.getOperationStatus(ophandle);
- System.out.println("Polling: " + ophandle + " count=" + (++count)
- + " state=" + state);
-
- if (OperationState.CANCELED == state || state == OperationState.CLOSED
- || state == OperationState.FINISHED || state == OperationState.ERROR) {
- break;
- }
- Thread.sleep(1000);
- }
- assertEquals("Query should return an error state",
- OperationState.ERROR, client.getOperationStatus(ophandle));
-
- // Test async execution when query is well formed
- queryString = "SELECT ID FROM TEST_EXEC_ASYNC";
- ophandle =
- client.executeStatementAsync(sessionHandle, queryString, confOverlay);
-
- count = 0;
- while (true) {
- // Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
- System.out.println("Polling timed out");
- break;
- }
- state = client.getOperationStatus(ophandle);
- System.out.println("Polling: " + ophandle + " count=" + (++count)
- + " state=" + state);
-
- if (OperationState.CANCELED == state || state == OperationState.CLOSED
- || state == OperationState.FINISHED || state == OperationState.ERROR) {
- break;
- }
- Thread.sleep(1000);
- }
- assertEquals("Query should be finished",
- OperationState.FINISHED, client.getOperationStatus(ophandle));
-
- // Cancellation test
- ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
- System.out.println("cancelling " + ophandle);
- client.cancelOperation(ophandle);
- state = client.getOperationStatus(ophandle);
- System.out.println(ophandle + " after cancelling, state= " + state);
- assertEquals("Query should be cancelled", OperationState.CANCELED, state);
- }
-
- /**
- * Test per statement configuration overlay.
- * Create a table using hiveconf: var substitution, with the conf var passed
- * via confOverlay.Verify the confOverlay works for the query and does set the
- * value in the session configuration
- * @throws Exception
- */
- @Test
- public void testConfOverlay() throws Exception {
- SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap());
- assertNotNull(sessionHandle);
- String tabName = "TEST_CONF_EXEC";
- String tabNameVar = "tabNameVar";
-
- String setLockMgr = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
- + " = false";
- OperationHandle opHandle = client.executeStatement(sessionHandle, setLockMgr, null);
- client.closeOperation(opHandle);
-
- String dropTable = "DROP TABLE IF EXISTS " + tabName;
- opHandle = client.executeStatement(sessionHandle, dropTable, null);
- client.closeOperation(opHandle);
-
- // set a pass a property to operation and check if its set the query config
- Map confOverlay = new HashMap();
- confOverlay.put(tabNameVar, tabName);
-
- // execute statement with the conf overlay
- String createTab = "CREATE TABLE ${hiveconf:" + tabNameVar + "} (id int)";
- opHandle = client.executeStatement(sessionHandle, createTab, confOverlay);
- assertNotNull(opHandle);
- // query should pass and create the table
- assertEquals("Query should be finished",
- OperationState.FINISHED, client.getOperationStatus(opHandle));
- client.closeOperation(opHandle);
-
- // select from the new table should pass
- String selectTab = "SELECT * FROM " + tabName;
- opHandle = client.executeStatement(sessionHandle, selectTab, null);
- assertNotNull(opHandle);
- // query should pass and create the table
- assertEquals("Query should be finished",
- OperationState.FINISHED, client.getOperationStatus(opHandle));
- client.closeOperation(opHandle);
-
- // the settings in confoverly should not be part of session config
- // another query referring that property with the conf overlay should fail
- selectTab = "SELECT * FROM ${hiveconf:" + tabNameVar + "}";
- try {
- opHandle = client.executeStatement(sessionHandle, selectTab, null);
- fail("Query should fail");
- } catch (HiveSQLException e) {
- // Expected exception
- }
-
- // cleanup
- dropTable = "DROP TABLE IF EXISTS " + tabName;
- opHandle = client.executeStatement(sessionHandle, dropTable, null);
- client.closeOperation(opHandle);
-
-
- client.closeSession(sessionHandle);
- }
+ protected static CLIServiceClient client;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testOpenSession() throws Exception {
+ SessionHandle sessionHandle = client
+ .openSession("tom", "password", Collections.emptyMap());
+ assertNotNull(sessionHandle);
+ client.closeSession(sessionHandle);
+
+ sessionHandle = client.openSession("tom", "password");
+ assertNotNull(sessionHandle);
+ client.closeSession(sessionHandle);
+ }
+
+ @Test
+ public void testGetFunctions() throws Exception {
+ SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap());
+ assertNotNull(sessionHandle);
+ OperationHandle opHandle = client.getFunctions(sessionHandle, null, null, "*");
+ TableSchema schema = client.getResultSetMetadata(opHandle);
+
+ ColumnDescriptor columnDesc = schema.getColumnDescriptorAt(0);
+ assertEquals("FUNCTION_CAT", columnDesc.getName());
+ assertEquals(Type.STRING_TYPE, columnDesc.getType());
+
+ columnDesc = schema.getColumnDescriptorAt(1);
+ assertEquals("FUNCTION_SCHEM", columnDesc.getName());
+ assertEquals(Type.STRING_TYPE, columnDesc.getType());
+
+ columnDesc = schema.getColumnDescriptorAt(2);
+ assertEquals("FUNCTION_NAME", columnDesc.getName());
+ assertEquals(Type.STRING_TYPE, columnDesc.getType());
+
+ columnDesc = schema.getColumnDescriptorAt(3);
+ assertEquals("REMARKS", columnDesc.getName());
+ assertEquals(Type.STRING_TYPE, columnDesc.getType());
+
+ columnDesc = schema.getColumnDescriptorAt(4);
+ assertEquals("FUNCTION_TYPE", columnDesc.getName());
+ assertEquals(Type.INT_TYPE, columnDesc.getType());
+
+ columnDesc = schema.getColumnDescriptorAt(5);
+ assertEquals("SPECIFIC_NAME", columnDesc.getName());
+ assertEquals(Type.STRING_TYPE, columnDesc.getType());
+
+ client.closeSession(sessionHandle);
+ }
+
+ @Test
+ public void testGetInfo() throws Exception {
+ SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap());
+ assertNotNull(sessionHandle);
+
+ GetInfoValue value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_NAME);
+ System.out.println(value.getStringValue());
+
+ value = client.getInfo(sessionHandle, GetInfoType.CLI_SERVER_NAME);
+ System.out.println(value.getStringValue());
+
+ value = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_VER);
+ System.out.println(value.getStringValue());
+
+ client.closeSession(sessionHandle);
+ }
+
+ /**
+ * Test the blocking execution of a query
+ * @throws Exception
+ */
+ @Test
+ public void testExecuteStatement() throws Exception {
+ // Set up the test table
+ Map confOverlay = new HashMap();
+ String tableName = "TEST_EXEC";
+ String columnDefinitions = "(ID STRING)";
+
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
+ assertNotNull(sessionHandle);
+
+ // Blocking execute
+ String queryString = "SELECT ID FROM " + tableName;
+ OperationHandle ophandle = client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ // Expect query to be completed now
+ assertEquals("Query should be finished",
+ OperationState.FINISHED, client.getOperationStatus(ophandle));
+
+ // Cleanup
+ queryString = "DROP TABLE " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+ client.closeSession(sessionHandle);
+ }
+
+
+ /**
+ * Test async execution of a well-formed and a malformed query with different long polling durations
+ * 1. Test malformed query with default long polling timeout
+ * 2. Test well-formed query with default long polling timeout
+ * 3. Test well-formed query with long polling timeout set to 0
+ * 4. Test well-formed query with long polling timeout set to 500 millis
+ * 5. Test well-formed query cancellation
+ * @throws Exception
+ */
+ @Test
+ public void testExecuteStatementAsync() throws Exception {
+ // Set up the test table
+ Map confOverlay = new HashMap();
+ String tableName = "TEST_EXEC_ASYNC";
+ String columnDefinitions = "(ID STRING)";
+ String queryString;
+ OperationState state = null;
+ OperationHandle ophandle;
+
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
+ assertNotNull(sessionHandle);
+
+ // Set longPollingTimeout to a custom value for different test cases
+ long longPollingTimeout;
+
+ // 1. Execute a malformed async query with default config
+ longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
+
+ // 2. Execute an async query with default config
+ longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ queryString = "SELECT ID FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+ // 3. Execute an async query with long polling timeout set to 0
+ longPollingTimeout = 0;
+ queryString = "SELECT ID FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+ // 4. Execute an async query with long polling timeout set to 500 millis
+ longPollingTimeout = 500;
+ queryString = "SELECT ID FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+ // 5. Cancellation test
+ queryString = "SELECT ID FROM " + tableName;
+ ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+ System.out.println("Cancelling " + ophandle);
+ client.cancelOperation(ophandle);
+ state = client.getOperationStatus(ophandle);
+ System.out.println(ophandle + " after cancelling, state= " + state);
+ assertEquals("Query should be cancelled", OperationState.CANCELED, state);
+
+ // Cleanup
+ queryString = "DROP TABLE " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+ client.closeSession(sessionHandle);
+ }
+
+ /**
+ * Test per statement configuration overlay.
+ * Create a table using hiveconf: var substitution, with the conf var passed
+ * via confOverlay.Verify the confOverlay works for the query and does set the
+ * value in the session configuration
+ * @throws Exception
+ */
+ @Test
+ public void testConfOverlay() throws Exception {
+ SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap());
+ assertNotNull(sessionHandle);
+ String tabName = "TEST_CONF_EXEC";
+ String tabNameVar = "tabNameVar";
+
+ String setLockMgr = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ + " = false";
+ OperationHandle opHandle = client.executeStatement(sessionHandle, setLockMgr, null);
+ client.closeOperation(opHandle);
+
+ String dropTable = "DROP TABLE IF EXISTS " + tabName;
+ opHandle = client.executeStatement(sessionHandle, dropTable, null);
+ client.closeOperation(opHandle);
+
+ // set a pass a property to operation and check if its set the query config
+ Map confOverlay = new HashMap();
+ confOverlay.put(tabNameVar, tabName);
+
+ // execute statement with the conf overlay
+ String createTab = "CREATE TABLE ${hiveconf:" + tabNameVar + "} (id int)";
+ opHandle = client.executeStatement(sessionHandle, createTab, confOverlay);
+ assertNotNull(opHandle);
+ // query should pass and create the table
+ assertEquals("Query should be finished",
+ OperationState.FINISHED, client.getOperationStatus(opHandle));
+ client.closeOperation(opHandle);
+
+ // select from the new table should pass
+ String selectTab = "SELECT * FROM " + tabName;
+ opHandle = client.executeStatement(sessionHandle, selectTab, null);
+ assertNotNull(opHandle);
+ // query should pass and create the table
+ assertEquals("Query should be finished",
+ OperationState.FINISHED, client.getOperationStatus(opHandle));
+ client.closeOperation(opHandle);
+
+ // the settings in confoverly should not be part of session config
+ // another query referring that property with the conf overlay should fail
+ selectTab = "SELECT * FROM ${hiveconf:" + tabNameVar + "}";
+ try {
+ opHandle = client.executeStatement(sessionHandle, selectTab, null);
+ fail("Query should fail");
+ } catch (HiveSQLException e) {
+ // Expected exception
+ }
+
+ // cleanup
+ dropTable = "DROP TABLE IF EXISTS " + tabName;
+ opHandle = client.executeStatement(sessionHandle, dropTable, null);
+ client.closeOperation(opHandle);
+
+
+ client.closeSession(sessionHandle);
+ }
+
+ /**
+ * Sets up a test specific table with the given column definitions and config
+ * @param tableName
+ * @param columnDefinitions
+ * @param confOverlay
+ * @throws Exception
+ */
+ private SessionHandle setupTestData(String tableName, String columnDefinitions,
+ Map confOverlay) throws Exception {
+ SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay);
+ assertNotNull(sessionHandle);
+
+ String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ + " = false";
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ // Drop the table if it exists
+ queryString = "DROP TABLE IF EXISTS " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ // Create a test table
+ queryString = "CREATE TABLE " + tableName + columnDefinitions;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ return sessionHandle;
+ }
+
+ private void runQueryAsync(SessionHandle sessionHandle, String queryString,
+ Map confOverlay, OperationState expectedState,
+ long longPollingTimeout) throws Exception {
+ // Timeout for the iteration in case of asynchronous execute
+ long testIterationTimeout = System.currentTimeMillis() + 100000;
+ long longPollingStart;
+ long longPollingEnd;
+ long longPollingTimeDelta;
+ OperationState state = null;
+ confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+ OperationHandle ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+ int count = 0;
+ while (true) {
+ // Break if iteration times out
+ if (System.currentTimeMillis() > testIterationTimeout) {
+ System.out.println("Polling timed out");
+ break;
+ }
+ longPollingStart = System.currentTimeMillis();
+ System.out.println("Long polling starts at: " + longPollingStart);
+ state = client.getOperationStatus(ophandle);
+ longPollingEnd = System.currentTimeMillis();
+ System.out.println("Long polling ends at: " + longPollingEnd);
+
+ System.out.println("Polling: " + ophandle + " count=" + (++count)
+ + " state=" + state);
+
+ if (state == OperationState.CANCELED ||
+ state == OperationState.CLOSED ||
+ state == OperationState.FINISHED ||
+ state == OperationState.ERROR) {
+ break;
+ } else {
+ // Verify that getOperationStatus returned only after the long polling timeout
+ longPollingTimeDelta = longPollingEnd - longPollingStart;
+ // Scale down by a factor of 0.9 to account for approximate values
+ assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0);
+ }
+ }
+ assertEquals(expectedState, client.getOperationStatus(ophandle));
+ }
}