diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index bb0f711..7ce9312 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -68,6 +69,7 @@ private SerDe serde = null; private final boolean runAsync; private Future backgroundHandle; + private HiveSQLException backgroundRunException = null; public SQLOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runInBackground) { @@ -144,18 +146,20 @@ public void run() throws HiveSQLException { if (!shouldRunAsync()) { runInternal(); } else { - Runnable backgroundOperation = new Runnable() { + Callable backgroundOperation = new Callable() { SessionState ss = SessionState.get(); @Override - public void run() { + public Void call() throws HiveSQLException { SessionState.start(ss); try { runInternal(); } catch (HiveSQLException e) { - LOG.error("Error: ", e); - // TODO: Return a more detailed error to the client, - // currently the async thread only writes to the log and sets the OperationState + // save the exception for subsequent client requests + setBackgroundRunException(e); + setState(OperationState.ERROR); + throw e; } + return null; } }; try { @@ -199,7 +203,7 @@ public void close() throws HiveSQLException { @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + checkExecutionStatus(OperationState.FINISHED); if (resultSchema == null) { resultSchema = new TableSchema(driver.getSchema()); } @@ -209,7 +213,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + checkExecutionStatus(OperationState.FINISHED); ArrayList rows = new ArrayList(); driver.setMaxRows((int)maxRows); @@ -317,4 +321,33 @@ private boolean shouldRunAsync() { return runAsync; } + /** check if the query is still running or failed + * For async query, check if its still running or finished with error + * @param expectedState + * @throws HiveSQLException + */ + private void checkExecutionStatus(OperationState expectedState) throws HiveSQLException { + if (backgroundHandle != null) { + if (getState().equals(OperationState.RUNNING)) { + throw new HiveSQLException("Query still runing", "HY010"); + } else if (getState().equals(OperationState.ERROR)) { + throw new HiveSQLException(getBackgroundRunException().getMessage(), + getBackgroundRunException().getSQLState(), getBackgroundRunException().getErrorCode(), + getBackgroundRunException()); + } else if (getState().equals(OperationState.CANCELED) || backgroundHandle.isCancelled()) { + // query is already canceled. 'Invalid cursor state' exception + throw new HiveSQLException("Query execution was canceled", "24000"); + } + } + assertState(expectedState); + } + + private void setBackgroundRunException(HiveSQLException backgroundRunException) + throws HiveSQLException { + this.backgroundRunException = backgroundRunException; + } + + private HiveSQLException getBackgroundRunException() { + return backgroundRunException; + } } diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index f392d62..7432c06 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -29,8 +30,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; @@ -184,8 +185,8 @@ private void executeSessionHooks(HiveSession session) throws Exception { } } - public Future submitBackgroundOperation(Runnable r) { - return backgroundOperationPool.submit(r); + public Future submitBackgroundOperation(Callable backgroundOperation) { + return backgroundOperationPool.submit(backgroundOperation); } } diff --git service/src/test/org/apache/hive/service/cli/CLIServiceTest.java service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index a682c63..4a32a4f 100644 --- service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -158,7 +159,7 @@ public void testExecuteStatementAsync() throws Exception { client.executeStatementAsync(sessionHandle, createTable, confOverlay); // Test async execution response when query is malformed - String wrongQuery = "SELECT NAME FROM TEST_EXEC"; + String wrongQuery = "SELECT NAME FROM NON_EXISTING_TAB"; ophandle = client.executeStatementAsync(sessionHandle, wrongQuery, confOverlay); int count = 0; @@ -181,6 +182,12 @@ public void testExecuteStatementAsync() throws Exception { assertEquals("Query should return an error state", OperationState.ERROR, client.getOperationStatus(ophandle)); + try { + client.fetchResults(ophandle); + Assert.fail("Fetch should fail for malformed query"); + } catch (HiveSQLException e) { + assertEquals("42S02", e.getSQLState()); + } // Test async execution when query is well formed String select = "SELECT ID FROM TEST_EXEC_ASYNC"; ophandle =