diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4f32390..f7cd6af 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -763,6 +763,9 @@ // Number of seconds HiveServer2 shutdown will wait for async threads to terminate HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10L), + // Time in milliseconds that HiveServer2 will wait, + // before responding to asynchronous calls that use long polling + HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000), // HiveServer2 auth configuration HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE", diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index fe7141e..28d2fe9 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -1904,11 +1904,18 @@ hive.server2.async.exec.shutdown.timeout 10 - Time (in seconds) for which HiveServer2 shutdown will wait for async + Time (in seconds) for which HiveServer2 shutdown will wait for async threads to terminate + hive.server2.long.polling.timeout + 5000 + Time (in milliseconds) that HiveServer2 will wait, + before responding to calls that use long polling + + + hive.server2.thrift.port 10000 Port number of HiveServer2 Thrift interface. diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 4ee1b74..fc41903 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -25,10 +25,15 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -207,6 +212,31 @@ public TableSchema getResultSetSchema() throws HiveSQLException { return resultSchema; } + @Override + public OperationState getState() { + // For an async call, block till LONG_POLLING_TIMEOUT expires or + // background operation thread generates results, whichever happens first + if (shouldRunAsync()) { + int timeout = getParentSession().getHiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + try { + getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // No Op, return to the caller since long polling timeout has expired + LOG.trace(getHandle() + ": Long polling timed out"); + } catch (CancellationException e) { + // The background operation thread was cancelled + LOG.trace(getHandle() + ": The background operation was cancelled", e); + } catch (ExecutionException e) { + // The background operation thread was aborted + LOG.trace(getHandle() + ": The background operation was aborted", e); + } catch (InterruptedException e) { + // No op, this thread was interrupted + // In this case, the call might return sooner than long polling timeout + } + } + return super.getState(); + } + @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { @@ -318,6 +348,10 @@ private boolean shouldRunAsync() { return runAsync; } + private Future getBackgroundHandle() { + return backgroundHandle; + } + /** * If there are query specific settings to overlay, then create a copy of config * There are two cases we need to clone the session config that's being passed to hive driver @@ -345,5 +379,4 @@ private HiveConf getConfigForOperation() throws HiveSQLException { } return sqlOperationConf; } - } diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index cd9d99a..bd2d825 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -21,12 +21,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -54,7 +56,7 @@ public void tearDown() throws Exception { } @Test - public void openSessionTest() throws Exception { + public void testOpenSession() throws Exception { SessionHandle sessionHandle = client .openSession("tom", "password", Collections.emptyMap()); assertNotNull(sessionHandle); @@ -66,7 +68,7 @@ public void openSessionTest() throws Exception { } @Test - public void getFunctionsTest() throws Exception { + public void testGetFunctions() throws Exception { SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); assertNotNull(sessionHandle); OperationHandle opHandle = client.getFunctions(sessionHandle, null, null, "*"); @@ -96,12 +98,11 @@ public void getFunctionsTest() throws Exception { assertEquals("SPECIFIC_NAME", columnDesc.getName()); assertEquals(Type.STRING_TYPE, columnDesc.getType()); - client.closeOperation(opHandle); client.closeSession(sessionHandle); } @Test - public void getInfoTest() throws Exception { + public void testGetInfo() throws Exception { SessionHandle sessionHandle = client.openSession("tom", "password", new HashMap()); assertNotNull(sessionHandle); @@ -117,115 +118,95 @@ public void getInfoTest() throws Exception { client.closeSession(sessionHandle); } + /** + * Test the blocking execution of a query + * @throws Exception + */ @Test public void testExecuteStatement() throws Exception { + // Set up the test table HashMap confOverlay = new HashMap(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); - assertNotNull(sessionHandle); - - // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Drop the table if it exists - queryString = "DROP TABLE IF EXISTS TEST_EXEC"; - client.executeStatement(sessionHandle, queryString, confOverlay); + String tableName = "TEST_EXEC"; + String columnDefinitions = "(ID STRING)"; - // Create a test table - queryString = "CREATE TABLE TEST_EXEC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); + assertNotNull(sessionHandle); // Blocking execute - queryString = "SELECT ID FROM TEST_EXEC"; + String queryString = "SELECT ID FROM " + tableName; OperationHandle ophandle = client.executeStatement(sessionHandle, queryString, confOverlay); // Expect query to be completed now assertEquals("Query should be finished", OperationState.FINISHED, client.getOperationStatus(ophandle)); + + // Cleanup + queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeSession(sessionHandle); } + + /** + * Test async execution of a well-formed and a malformed query with different long polling durations + * 1. Test malformed query with default long polling timeout + * 2. Test well-formed query with default long polling timeout + * 3. Test well-formed query with long polling timeout set to 0 + * 4. Test well-formed query with long polling timeout set to 500 millis + * 5. Test well-formed query cancellation + * @throws Exception + */ @Test public void testExecuteStatementAsync() throws Exception { + // Set up the test table HashMap confOverlay = new HashMap(); - SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); - // Timeout for the poll in case of asynchronous execute - long pollTimeout = System.currentTimeMillis() + 100000; - assertNotNull(sessionHandle); + String tableName = "TEST_EXEC_ASYNC"; + String columnDefinitions = "(ID STRING)"; + String queryString; OperationState state = null; OperationHandle ophandle; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); + assertNotNull(sessionHandle); - // Change lock manager, otherwise unit-test doesn't go through - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Drop the table if it exists - queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Create a test table - queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)"; - client.executeStatement(sessionHandle, queryString, confOverlay); - - // Test async execution response when query is malformed - String wrongQueryString = "SELECT NAME FROM TEST_EXEC"; - ophandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay); - - int count = 0; - while (true) { - // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { - System.out.println("Polling timed out"); - break; - } - state = client.getOperationStatus(ophandle); - System.out.println("Polling: " + ophandle + " count=" + (++count) - + " state=" + state); - - if (OperationState.CANCELED == state || state == OperationState.CLOSED - || state == OperationState.FINISHED || state == OperationState.ERROR) { - break; - } - Thread.sleep(1000); - } - assertEquals("Query should return an error state", - OperationState.ERROR, client.getOperationStatus(ophandle)); - - // Test async execution when query is well formed - queryString = "SELECT ID FROM TEST_EXEC_ASYNC"; - ophandle = - client.executeStatementAsync(sessionHandle, queryString, confOverlay); - - count = 0; - while (true) { - // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { - System.out.println("Polling timed out"); - break; - } - state = client.getOperationStatus(ophandle); - System.out.println("Polling: " + ophandle + " count=" + (++count) - + " state=" + state); - - if (OperationState.CANCELED == state || state == OperationState.CLOSED - || state == OperationState.FINISHED || state == OperationState.ERROR) { - break; - } - Thread.sleep(1000); - } - assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(ophandle)); - - // Cancellation test + // Set longPollingTimeout to a custom value for different test cases + int longPollingTimeout; + + // 1. Execute a malformed async query with default config + longPollingTimeout = new HiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); + + // 2. Execute an async query with default config + longPollingTimeout = new HiveConf().getIntVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + // 3. Execute an async query with long polling timeout set to 0 + longPollingTimeout = 0; + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + // 4. Execute an async query with long polling timeout set to 500 millis + longPollingTimeout = 500; + queryString = "SELECT ID FROM " + tableName; + runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout); + + // 5. Cancellation test + queryString = "SELECT ID FROM " + tableName; ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); - System.out.println("cancelling " + ophandle); + System.out.println("Cancelling " + ophandle); client.cancelOperation(ophandle); state = client.getOperationStatus(ophandle); System.out.println(ophandle + " after cancelling, state= " + state); assertEquals("Query should be cancelled", OperationState.CANCELED, state); + + // Cleanup + queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + client.closeSession(sessionHandle); } /** @@ -291,4 +272,72 @@ public void testConfOverlay() throws Exception { client.closeSession(sessionHandle); } + + /** + * Sets up a test specific table with the given column definitions and config + * @param tableName + * @param columnDefinitions + * @param confOverlay + * @throws Exception + */ + private SessionHandle setupTestData(String tableName, String columnDefinitions, + HashMap confOverlay) throws Exception { + SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay); + assertNotNull(sessionHandle); + + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS " + tableName; + client.executeStatement(sessionHandle, queryString, confOverlay); + + // Create a test table + queryString = "CREATE TABLE " + tableName + columnDefinitions; + client.executeStatement(sessionHandle, queryString, confOverlay); + + return sessionHandle; + } + + private void runQueryAsync(SessionHandle sessionHandle, String queryString, + HashMap confOverlay, OperationState expectedState, + int longPollingTimeout) throws Exception { + // Timeout for the iteration in case of asynchronous execute + long testIterationTimeout = System.currentTimeMillis() + 100000; + long longPollingStart; + long longPollingEnd; + long longPollingTimeDelta; + OperationState state = null; + confOverlay.put("hive.server2.long.polling.timeout", String.valueOf(longPollingTimeout)); + OperationHandle ophandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); + int count = 0; + while (true) { + // Break if iteration times out + if (System.currentTimeMillis() > testIterationTimeout) { + System.out.println("Polling timed out"); + break; + } + longPollingStart = System.currentTimeMillis(); + System.out.println("Long polling starts at: " + longPollingStart); + state = client.getOperationStatus(ophandle); + longPollingEnd = System.currentTimeMillis(); + System.out.println("Long polling ends at: " + longPollingEnd); + + System.out.println("Polling: " + ophandle + " count=" + (++count) + + " state=" + state); + + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED || state == OperationState.ERROR) { + break; + } + else { + // Verify that getOperationStatus returned only after the long polling timeout + longPollingTimeDelta = longPollingEnd - longPollingStart; + // Scale down by a factor of 0.9 to account for approximate values + assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0); + } + } + assertEquals(expectedState, client.getOperationStatus(ophandle)); + } }