diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4f32390..f7cd6af 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -763,6 +763,9 @@ // Number of seconds HiveServer2 shutdown will wait for async threads to terminate HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10L), + // Time in milliseconds that HiveServer2 will wait, + // before responding to asynchronous calls that use long polling + HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000), // HiveServer2 auth configuration HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE", diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index fe7141e..28d2fe9 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -1904,11 +1904,18 @@ hive.server2.async.exec.shutdown.timeout 10 - Time (in seconds) for which HiveServer2 shutdown will wait for async + Time (in seconds) for which HiveServer2 shutdown will wait for async threads to terminate + hive.server2.long.polling.timeout + 5000 + Time (in milliseconds) that HiveServer2 will wait, + before responding to calls that use long polling + + + hive.server2.thrift.port 10000 Port number of HiveServer2 Thrift interface. diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 4ee1b74..fc41903 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -25,10 +25,15 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -207,6 +212,31 @@ public TableSchema getResultSetSchema() throws HiveSQLException { return resultSchema; } + @Override + public OperationState getState() { + // For an async call, block till LONG_POLLING_TIMEOUT expires or + // background operation thread generates results, whichever happens first + if (shouldRunAsync()) { + int timeout = getParentSession().getHiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + try { + getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // No Op, return to the caller since long polling timeout has expired + LOG.trace(getHandle() + ": Long polling timed out"); + } catch (CancellationException e) { + // The background operation thread was cancelled + LOG.trace(getHandle() + ": The background operation was cancelled", e); + } catch (ExecutionException e) { + // The background operation thread was aborted + LOG.trace(getHandle() + ": The background operation was aborted", e); + } catch (InterruptedException e) { + // No op, this thread was interrupted + // In this case, the call might return sooner than long polling timeout + } + } + return super.getState(); + } + @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { @@ -318,6 +348,10 @@ private boolean shouldRunAsync() { return runAsync; } + private Future getBackgroundHandle() { + return backgroundHandle; + } + /** * If there are query specific settings to overlay, then create a copy of config * There are two cases we need to clone the session config that's being passed to hive driver @@ -345,5 +379,4 @@ private HiveConf getConfigForOperation() throws HiveSQLException { } return sqlOperationConf; } - } diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index cd9d99a..7dd0275 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -21,12 +21,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -54,7 +56,7 @@ public void tearDown() throws Exception { } @Test - public void openSessionTest() throws Exception { + public void testOpenSession() throws Exception { SessionHandle sessionHandle = client .openSession("tom", "password", Collections.emptyMap()); assertNotNull(sessionHandle); @@ -66,7 +68,7 @@ public void openSessionTest() throws Exception { } @Test - public void getFunctionsTest() throws Exception { + public void testGetFunctions() throws Exception { SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); assertNotNull(sessionHandle); OperationHandle opHandle = client.getFunctions(sessionHandle, null, null, "*"); @@ -96,12 +98,11 @@ public void getFunctionsTest() throws Exception { assertEquals("SPECIFIC_NAME", columnDesc.getName()); assertEquals(Type.STRING_TYPE, columnDesc.getType()); - client.closeOperation(opHandle); client.closeSession(sessionHandle); } @Test - public void getInfoTest() throws Exception { + public void testGetInfo() throws Exception { SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); assertNotNull(sessionHandle); @@ -117,67 +118,63 @@ public void getInfoTest() throws Exception { client.closeSession(sessionHandle); } + /** + * Test the blocking execution of a query + * @throws Exception + */ @Test public void testExecuteStatement() throws Exception { + // Set up the test table HashMap confOverlay = new HashMap(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); - assertNotNull(sessionHandle); - - // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Drop the table if it exists - queryString = "DROP TABLE IF EXISTS TEST_EXEC"; - client.executeStatement(sessionHandle, queryString, confOverlay); + String tableName = "TEST_EXEC"; + String columnDefinitions = "(ID STRING)"; - // Create a test table - queryString = "CREATE TABLE TEST_EXEC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); + assertNotNull(sessionHandle); // Blocking execute - queryString = "SELECT ID FROM TEST_EXEC"; + String queryString = "SELECT ID FROM " + tableName; OperationHandle ophandle = client.executeStatement(sessionHandle, queryString, confOverlay); // Expect query to be completed now assertEquals("Query should be finished", OperationState.FINISHED, client.getOperationStatus(ophandle)); + + // Cleanup + queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeSession(sessionHandle); } + /** + * Test async execution of a well-formed and a malformed query + * @throws Exception + */ @Test public void testExecuteStatementAsync() throws Exception { + // Set up the test table HashMap confOverlay = new HashMap(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); - // Timeout for the poll in case of asynchronous execute - long pollTimeout = System.currentTimeMillis() + 100000; - assertNotNull(sessionHandle); + String tableName = "TEST_EXEC_ASYNC"; + String columnDefinitions = "(ID STRING)"; + + // Iteration timeout for the polling in case of asynchronous execute + long testIterationTimeout = System.currentTimeMillis() + 100000; OperationState state = null; OperationHandle ophandle; - // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Drop the table if it exists - queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Create a test table - queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); + assertNotNull(sessionHandle); // Test async execution response when query is malformed - String wrongQueryString = "SELECT NAME FROM TEST_EXEC"; + String wrongQueryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName; ophandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); int count = 0; while (true) { // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { + if (System.currentTimeMillis() > testIterationTimeout) { System.out.println("Polling timed out"); break; } @@ -189,20 +186,18 @@ public void testExecuteStatementAsync() throws Exception { || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } - Thread.sleep(1000); } assertEquals("Query should return an error state", OperationState.ERROR, client.getOperationStatus(ophandle)); // Test async execution when query is well formed - queryString = "SELECT ID FROM TEST_EXEC_ASYNC"; - ophandle = - client.executeStatementAsync(sessionHandle, queryString, confOverlay); + String queryString = "SELECT ID FROM " + tableName; + ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); count = 0; while (true) { // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { + if (System.currentTimeMillis() > testIterationTimeout) { System.out.println("Polling timed out"); break; } @@ -214,18 +209,160 @@ public void testExecuteStatementAsync() throws Exception { || state == OperationState.FINISHED || state == OperationState.ERROR) { break; } - Thread.sleep(1000); } assertEquals("Query should be finished", OperationState.FINISHED, client.getOperationStatus(ophandle)); // Cancellation test ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); - System.out.println("cancelling " + ophandle); + System.out.println("Cancelling " + ophandle); client.cancelOperation(ophandle); state = client.getOperationStatus(ophandle); System.out.println(ophandle + " after cancelling, state= " + state); assertEquals("Query should be cancelled", OperationState.CANCELED, state); + + // Cleanup + queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeSession(sessionHandle); + } + + /** + * Test async execution with long polling timeout set to different durations + * 1. Test with default long polling timeout + * 2. Test with long polling timeout set to 0 + * 3. Test with long polling timeout set to 500 millis + * @throws Exception + */ + @Test + public void testLongPolling() throws Exception { + // Set up the test table + HashMap confOverlay = new HashMap(); + String tableName = "TEST_EXEC_LONG_POLLING"; + String columnDefinitions = "(ID STRING)"; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); + assertNotNull(sessionHandle); + + // Timeout for the poll in case of asynchronous execute + long testIterationTimeout = System.currentTimeMillis() + 100000; + long longPollingStart; + long longPollingEnd; + // Set longPollingTimeout to a custom value for different test cases + long longPollingTimeout; + long longPollingTimeDelta; + OperationState state = null; + OperationHandle ophandle; + + // 1. Execute an async query with default config + longPollingTimeout = new HiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + String queryString = "SELECT ID FROM " + tableName; + ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + + int count = 0; + while (true) { + // Break if polling times out + if (System.currentTimeMillis() > testIterationTimeout) { + System.out.println("Polling timed out"); + break; + } + longPollingStart = System.currentTimeMillis(); + System.out.println("Long polling starts at: " + longPollingStart); + state = client.getOperationStatus(ophandle); + longPollingEnd = System.currentTimeMillis(); + System.out.println("Long polling ends at: " + longPollingEnd); + System.out.println("Polling: " + ophandle + " count=" + (++count) + + " state=" + state); + + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED || state == OperationState.ERROR) { + break; + } + else { + // Verify that getOperationStatus returned only after the long polling timeout + longPollingTimeDelta = longPollingEnd - longPollingStart; + // Scale down by a factor of 0.9 to account for approximate values + assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0); + } + } + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(ophandle)); + + // 2. Execute an async query with long polling timeout set to 0 + longPollingTimeout = 0; + confOverlay.put("hive.server2.long.polling.timeout", String.valueOf(longPollingTimeout)); + queryString = "SELECT ID FROM " + tableName; + ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + + count = 0; + while (true) { + // Break if polling times out + if (System.currentTimeMillis() > testIterationTimeout) { + System.out.println("Polling timed out"); + break; + } + longPollingStart = System.currentTimeMillis(); + System.out.println("Long polling starts at: " + longPollingStart); + state = client.getOperationStatus(ophandle); + longPollingEnd = System.currentTimeMillis(); + System.out.println("Long polling ends at: " + longPollingEnd); + System.out.println("Polling: " + ophandle + " count=" + (++count) + + " state=" + state); + + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED || state == OperationState.ERROR) { + break; + } + else { + // Verify that getOperationStatus returned only after the long polling timeout + longPollingTimeDelta = longPollingEnd - longPollingStart; + // Scale down by a factor of 0.9 to account for approximate values + assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0); + } + } + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(ophandle)); + + // 3. Execute an async query with long polling timeout set to 500 millis + longPollingTimeout = 500; + confOverlay.put("hive.server2.long.polling.timeout", String.valueOf(longPollingTimeout)); + queryString = "SELECT ID FROM " + tableName; + ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + + count = 0; + while (true) { + // Break if polling times out + if (System.currentTimeMillis() > testIterationTimeout) { + System.out.println("Polling timed out"); + break; + } + longPollingStart = System.currentTimeMillis(); + System.out.println("Long polling starts at: " + longPollingStart); + state = client.getOperationStatus(ophandle); + longPollingEnd = System.currentTimeMillis(); + System.out.println("Long polling ends at: " + longPollingEnd); + System.out.println("Polling: " + ophandle + " count=" + (++count) + + " state=" + state); + + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED || state == OperationState.ERROR) { + break; + } + else { + // Verify that getOperationStatus returned only after the long polling timeout + longPollingTimeDelta = longPollingEnd - longPollingStart; + // Scale down by a factor of 0.9 to account for approximate values + assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0); + } + } + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(ophandle)); + + // Cleanup + queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeSession(sessionHandle); } /** @@ -291,4 +428,31 @@ public void testConfOverlay() throws Exception { client.closeSession(sessionHandle); } + + /** + * Sets up a test specific table with the given column definitions and config + * @param tableName + * @param columnDefinitions + * @param confOverlay + * @throws Exception + */ + private SessionHandle setupTestData(String tableName, String columnDefinitions, + HashMap confOverlay) throws Exception { + SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay); + assertNotNull(sessionHandle); + + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Create a test table + queryString = "CREATE TABLE " + tableName + columnDefinitions; + client.executeStatement(sessionHandle, queryString, confOverlay); + + return sessionHandle; + } }