diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4f32390..f7cd6af 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -763,6 +763,9 @@
// Number of seconds HiveServer2 shutdown will wait for async threads to terminate
HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10L),
+ // Time in milliseconds that HiveServer2 will wait,
+ // before responding to asynchronous calls that use long polling
+ HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000),
// HiveServer2 auth configuration
HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE",
diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template
index fe7141e..28d2fe9 100644
--- a/conf/hive-default.xml.template
+++ b/conf/hive-default.xml.template
@@ -1904,11 +1904,18 @@
hive.server2.async.exec.shutdown.timeout
10
- Time (in seconds) for which HiveServer2 shutdown will wait for async
+ Time (in seconds) for which HiveServer2 shutdown will wait for async
threads to terminate
+ hive.server2.long.polling.timeout
+ 5000
+ Time (in milliseconds) that HiveServer2 will wait,
+ before responding to calls that use long polling
+
+
+
hive.server2.thrift.port
10000
Port number of HiveServer2 Thrift interface.
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 4ee1b74..fc41903 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -25,10 +25,15 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
@@ -207,6 +212,31 @@ public TableSchema getResultSetSchema() throws HiveSQLException {
return resultSchema;
}
+ @Override
+ public OperationState getState() {
+ // For an async call, block till LONG_POLLING_TIMEOUT expires or
+ // background operation thread generates results, whichever happens first
+ if (shouldRunAsync()) {
+ int timeout = getParentSession().getHiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ try {
+ getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // No Op, return to the caller since long polling timeout has expired
+ LOG.trace(getHandle() + ": Long polling timed out");
+ } catch (CancellationException e) {
+ // The background operation thread was cancelled
+ LOG.trace(getHandle() + ": The background operation was cancelled", e);
+ } catch (ExecutionException e) {
+ // The background operation thread was aborted
+ LOG.trace(getHandle() + ": The background operation was aborted", e);
+ } catch (InterruptedException e) {
+ // No op, this thread was interrupted
+ // In this case, the call might return sooner than long polling timeout
+ }
+ }
+ return super.getState();
+ }
+
@Override
public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
@@ -318,6 +348,10 @@ private boolean shouldRunAsync() {
return runAsync;
}
+ private Future> getBackgroundHandle() {
+ return backgroundHandle;
+ }
+
/**
* If there are query specific settings to overlay, then create a copy of config
* There are two cases we need to clone the session config that's being passed to hive driver
@@ -345,5 +379,4 @@ private HiveConf getConfigForOperation() throws HiveSQLException {
}
return sqlOperationConf;
}
-
}
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index cd9d99a..7dd0275 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -21,12 +21,14 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -54,7 +56,7 @@ public void tearDown() throws Exception {
}
@Test
- public void openSessionTest() throws Exception {
+ public void testOpenSession() throws Exception {
SessionHandle sessionHandle = client
.openSession("tom", "password", Collections.emptyMap());
assertNotNull(sessionHandle);
@@ -66,7 +68,7 @@ public void openSessionTest() throws Exception {
}
@Test
- public void getFunctionsTest() throws Exception {
+ public void testGetFunctions() throws Exception {
SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap());
assertNotNull(sessionHandle);
OperationHandle opHandle = client.getFunctions(sessionHandle, null, null, "*");
@@ -96,12 +98,11 @@ public void getFunctionsTest() throws Exception {
assertEquals("SPECIFIC_NAME", columnDesc.getName());
assertEquals(Type.STRING_TYPE, columnDesc.getType());
- client.closeOperation(opHandle);
client.closeSession(sessionHandle);
}
@Test
- public void getInfoTest() throws Exception {
+ public void testGetInfo() throws Exception {
SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap());
assertNotNull(sessionHandle);
@@ -117,67 +118,63 @@ public void getInfoTest() throws Exception {
client.closeSession(sessionHandle);
}
+ /**
+ * Test the blocking execution of a query
+ * @throws Exception
+ */
@Test
public void testExecuteStatement() throws Exception {
+ // Set up the test table
HashMap confOverlay = new HashMap();
- SessionHandle sessionHandle = client.openSession("tom", "password",
- new HashMap());
- assertNotNull(sessionHandle);
-
- // Change lock manager, otherwise unit-test doesn't go through
- String queryString = "SET hive.lock.manager=" +
- "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager";
- client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Drop the table if it exists
- queryString = "DROP TABLE IF EXISTS TEST_EXEC";
- client.executeStatement(sessionHandle, queryString, confOverlay);
+ String tableName = "TEST_EXEC";
+ String columnDefinitions = "(ID STRING)";
- // Create a test table
- queryString = "CREATE TABLE TEST_EXEC(ID STRING)";
- client.executeStatement(sessionHandle, queryString, confOverlay);
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
+ assertNotNull(sessionHandle);
// Blocking execute
- queryString = "SELECT ID FROM TEST_EXEC";
+ String queryString = "SELECT ID FROM " + tableName;
OperationHandle ophandle = client.executeStatement(sessionHandle, queryString, confOverlay);
// Expect query to be completed now
assertEquals("Query should be finished",
OperationState.FINISHED, client.getOperationStatus(ophandle));
+
+ // Cleanup
+ queryString = "DROP TABLE " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+ client.closeSession(sessionHandle);
}
+ /**
+ * Test async execution of a well-formed and a malformed query
+ * @throws Exception
+ */
@Test
public void testExecuteStatementAsync() throws Exception {
+ // Set up the test table
HashMap confOverlay = new HashMap();
- SessionHandle sessionHandle = client.openSession("tom", "password",
- new HashMap());
- // Timeout for the poll in case of asynchronous execute
- long pollTimeout = System.currentTimeMillis() + 100000;
- assertNotNull(sessionHandle);
+ String tableName = "TEST_EXEC_ASYNC";
+ String columnDefinitions = "(ID STRING)";
+
+ // Iteration timeout for the polling in case of asynchronous execute
+ long testIterationTimeout = System.currentTimeMillis() + 100000;
OperationState state = null;
OperationHandle ophandle;
- // Change lock manager, otherwise unit-test doesn't go through
- String queryString = "SET hive.lock.manager=" +
- "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager";
- client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Drop the table if it exists
- queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC";
- client.executeStatement(sessionHandle, queryString, confOverlay);
-
- // Create a test table
- queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)";
- client.executeStatement(sessionHandle, queryString, confOverlay);
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
+ assertNotNull(sessionHandle);
// Test async execution response when query is malformed
- String wrongQueryString = "SELECT NAME FROM TEST_EXEC";
+ String wrongQueryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
ophandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay);
int count = 0;
while (true) {
// Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
+ if (System.currentTimeMillis() > testIterationTimeout) {
System.out.println("Polling timed out");
break;
}
@@ -189,20 +186,18 @@ public void testExecuteStatementAsync() throws Exception {
|| state == OperationState.FINISHED || state == OperationState.ERROR) {
break;
}
- Thread.sleep(1000);
}
assertEquals("Query should return an error state",
OperationState.ERROR, client.getOperationStatus(ophandle));
// Test async execution when query is well formed
- queryString = "SELECT ID FROM TEST_EXEC_ASYNC";
- ophandle =
- client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+ String queryString = "SELECT ID FROM " + tableName;
+ ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
count = 0;
while (true) {
// Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
+ if (System.currentTimeMillis() > testIterationTimeout) {
System.out.println("Polling timed out");
break;
}
@@ -214,18 +209,160 @@ public void testExecuteStatementAsync() throws Exception {
|| state == OperationState.FINISHED || state == OperationState.ERROR) {
break;
}
- Thread.sleep(1000);
}
assertEquals("Query should be finished",
OperationState.FINISHED, client.getOperationStatus(ophandle));
// Cancellation test
ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
- System.out.println("cancelling " + ophandle);
+ System.out.println("Cancelling " + ophandle);
client.cancelOperation(ophandle);
state = client.getOperationStatus(ophandle);
System.out.println(ophandle + " after cancelling, state= " + state);
assertEquals("Query should be cancelled", OperationState.CANCELED, state);
+
+ // Cleanup
+ queryString = "DROP TABLE " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+ client.closeSession(sessionHandle);
+ }
+
+ /**
+ * Test async execution with long polling timeout set to different durations
+ * 1. Test with default long polling timeout
+ * 2. Test with long polling timeout set to 0
+ * 3. Test with long polling timeout set to 500 millis
+ * @throws Exception
+ */
+ @Test
+ public void testLongPolling() throws Exception {
+ // Set up the test table
+ HashMap confOverlay = new HashMap();
+ String tableName = "TEST_EXEC_LONG_POLLING";
+ String columnDefinitions = "(ID STRING)";
+
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
+ assertNotNull(sessionHandle);
+
+ // Timeout for the poll in case of asynchronous execute
+ long testIterationTimeout = System.currentTimeMillis() + 100000;
+ long longPollingStart;
+ long longPollingEnd;
+ // Set longPollingTimeout to a custom value for different test cases
+ long longPollingTimeout;
+ long longPollingTimeDelta;
+ OperationState state = null;
+ OperationHandle ophandle;
+
+ // 1. Execute an async query with default config
+ longPollingTimeout = new HiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ String queryString = "SELECT ID FROM " + tableName;
+ ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+
+ int count = 0;
+ while (true) {
+ // Break if polling times out
+ if (System.currentTimeMillis() > testIterationTimeout) {
+ System.out.println("Polling timed out");
+ break;
+ }
+ longPollingStart = System.currentTimeMillis();
+ System.out.println("Long polling starts at: " + longPollingStart);
+ state = client.getOperationStatus(ophandle);
+ longPollingEnd = System.currentTimeMillis();
+ System.out.println("Long polling ends at: " + longPollingEnd);
+ System.out.println("Polling: " + ophandle + " count=" + (++count)
+ + " state=" + state);
+
+ if (OperationState.CANCELED == state || state == OperationState.CLOSED
+ || state == OperationState.FINISHED || state == OperationState.ERROR) {
+ break;
+ }
+ else {
+ // Verify that getOperationStatus returned only after the long polling timeout
+ longPollingTimeDelta = longPollingEnd - longPollingStart;
+ // Scale down by a factor of 0.9 to account for approximate values
+ assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0);
+ }
+ }
+ assertEquals("Query should be finished",
+ OperationState.FINISHED, client.getOperationStatus(ophandle));
+
+ // 2. Execute an async query with long polling timeout set to 0
+ longPollingTimeout = 0;
+ confOverlay.put("hive.server2.long.polling.timeout", String.valueOf(longPollingTimeout));
+ queryString = "SELECT ID FROM " + tableName;
+ ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+
+ count = 0;
+ while (true) {
+ // Break if polling times out
+ if (System.currentTimeMillis() > testIterationTimeout) {
+ System.out.println("Polling timed out");
+ break;
+ }
+ longPollingStart = System.currentTimeMillis();
+ System.out.println("Long polling starts at: " + longPollingStart);
+ state = client.getOperationStatus(ophandle);
+ longPollingEnd = System.currentTimeMillis();
+ System.out.println("Long polling ends at: " + longPollingEnd);
+ System.out.println("Polling: " + ophandle + " count=" + (++count)
+ + " state=" + state);
+
+ if (OperationState.CANCELED == state || state == OperationState.CLOSED
+ || state == OperationState.FINISHED || state == OperationState.ERROR) {
+ break;
+ }
+ else {
+ // Verify that getOperationStatus returned only after the long polling timeout
+ longPollingTimeDelta = longPollingEnd - longPollingStart;
+ // Scale down by a factor of 0.9 to account for approximate values
+ assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0);
+ }
+ }
+ assertEquals("Query should be finished",
+ OperationState.FINISHED, client.getOperationStatus(ophandle));
+
+ // 3. Execute an async query with long polling timeout set to 500 millis
+ longPollingTimeout = 500;
+ confOverlay.put("hive.server2.long.polling.timeout", String.valueOf(longPollingTimeout));
+ queryString = "SELECT ID FROM " + tableName;
+ ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+
+ count = 0;
+ while (true) {
+ // Break if polling times out
+ if (System.currentTimeMillis() > testIterationTimeout) {
+ System.out.println("Polling timed out");
+ break;
+ }
+ longPollingStart = System.currentTimeMillis();
+ System.out.println("Long polling starts at: " + longPollingStart);
+ state = client.getOperationStatus(ophandle);
+ longPollingEnd = System.currentTimeMillis();
+ System.out.println("Long polling ends at: " + longPollingEnd);
+ System.out.println("Polling: " + ophandle + " count=" + (++count)
+ + " state=" + state);
+
+ if (OperationState.CANCELED == state || state == OperationState.CLOSED
+ || state == OperationState.FINISHED || state == OperationState.ERROR) {
+ break;
+ }
+ else {
+ // Verify that getOperationStatus returned only after the long polling timeout
+ longPollingTimeDelta = longPollingEnd - longPollingStart;
+ // Scale down by a factor of 0.9 to account for approximate values
+ assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0);
+ }
+ }
+ assertEquals("Query should be finished",
+ OperationState.FINISHED, client.getOperationStatus(ophandle));
+
+ // Cleanup
+ queryString = "DROP TABLE " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+ client.closeSession(sessionHandle);
}
/**
@@ -291,4 +428,31 @@ public void testConfOverlay() throws Exception {
client.closeSession(sessionHandle);
}
+
+ /**
+ * Sets up a test specific table with the given column definitions and config
+ * @param tableName
+ * @param columnDefinitions
+ * @param confOverlay
+ * @throws Exception
+ */
+ private SessionHandle setupTestData(String tableName, String columnDefinitions,
+ HashMap confOverlay) throws Exception {
+ SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay);
+ assertNotNull(sessionHandle);
+
+ String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ + " = false";
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ // Drop the table if it exists
+ queryString = "DROP TABLE IF EXISTS " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ // Create a test table
+ queryString = "CREATE TABLE " + tableName + columnDefinitions;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ return sessionHandle;
+ }
}