diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index c91df83..125a982 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -49,6 +49,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.processors.DfsProcessor; import org.apache.hadoop.hive.ql.processors.SetProcessor; import org.apache.hive.common.util.HiveVersionInfo; @@ -2005,4 +2006,68 @@ public void testShowRoleGrant() throws SQLException { assertEquals("role1", res.getString(1)); res.close(); } + + /** + * Test the cancellation of a query that is running. + * We spawn 2 threads - one running the query and + * the other attempting to cancel. + * We're using a dummy udf to simulate a query, + * that runs for a sufficiently long time. + * @throws Exception + */ + @Test + public void testQueryCancel() throws Exception { + String udfName = SleepUDF.class.getName(); + Statement stmt1 = con.createStatement(); + stmt1.execute("create temporary function sleepUDF as '" + udfName + "'"); + stmt1.close(); + final Statement stmt = con.createStatement(); + // Thread executing the query + Thread tExecute = new Thread(new Runnable() { + @Override + public void run() { + try { + System.out.println("Executing query: "); + stmt.executeQuery("select sleepUDF(t1.under_col) as u0, t1.under_col as u1, " + + "t2.under_col as u2 from " + tableName + "t1 join " + tableName + + " t2 on t1.under_col = t2.under_col"); + fail("Expecting SQLException"); + } catch (SQLException e) { + // This thread should throw an exception + assertNotNull(e); + System.out.println(e.toString()); + } + } + }); + // Thread cancelling the query + Thread tCancel = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + System.out.println("Cancelling query: "); + stmt.cancel(); + } catch (Exception e) { + // No-op + } + } + }); + tExecute.start(); + tCancel.start(); + tExecute.join(); + tCancel.join(); + stmt.close(); + } + + // A udf which sleeps for 100ms to simulate a long running query + public static class SleepUDF extends UDF { + public Integer evaluate(final Integer value) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // No-op + } + return value; + } + } } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 01e6ea7..95a1843 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -24,6 +24,7 @@ import java.sql.SQLWarning; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hive.service.cli.thrift.TCLIService; import org.apache.hive.service.cli.thrift.TCancelOperationReq; @@ -75,9 +76,9 @@ */ private boolean isClosed = false; - /** - * - */ + // A fair reentrant lock + private ReentrantLock transportLock = new ReentrantLock(true); + public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle) { this(connection, client, sessHandle, false); @@ -121,7 +122,9 @@ public void cancel() throws SQLException { TCancelOperationReq cancelReq = new TCancelOperationReq(); cancelReq.setOperationHandle(stmtHandle); try { + transportLock.lock(); TCancelOperationResp cancelResp = client.CancelOperation(cancelReq); + transportLock.unlock(); Utils.verifySuccessWithInfo(cancelResp.getStatus()); } catch (SQLException e) { throw e; @@ -157,7 +160,9 @@ void closeClientOperation() throws SQLException { if (stmtHandle != null) { TCloseOperationReq closeReq = new TCloseOperationReq(); closeReq.setOperationHandle(stmtHandle); + transportLock.lock(); TCloseOperationResp closeResp = client.CloseOperation(closeReq); + transportLock.unlock(); Utils.verifySuccessWithInfo(closeResp.getStatus()); } } catch (SQLException e) { @@ -217,9 +222,11 @@ public boolean execute(String sql) throws SQLException { */ execReq.setRunAsync(true); execReq.setConfOverlay(sessConf); + transportLock.lock(); TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); + transportLock.unlock(); } catch (SQLException eS) { throw eS; } catch (Exception ex) { @@ -237,7 +244,9 @@ public boolean execute(String sql) throws SQLException { * For an async SQLOperation, GetOperationStatus will use the long polling approach * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires */ + transportLock.lock(); statusResp = client.GetOperationStatus(statusReq); + transportLock.unlock(); Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { switch (statusResp.getOperationState()) { diff --git a/service/src/java/org/apache/hive/service/cli/OperationState.java b/service/src/java/org/apache/hive/service/cli/OperationState.java index a023908..3e15f0c 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationState.java +++ b/service/src/java/org/apache/hive/service/cli/OperationState.java @@ -40,7 +40,6 @@ this.tOperationState = tOperationState; } - public static OperationState getOperationState(TOperationState tOperationState) { // TODO: replace this with a Map? for (OperationState opState : values()) { @@ -51,13 +50,15 @@ public static OperationState getOperationState(TOperationState tOperationState) return OperationState.UNKNOWN; } - public static void validateTransition(OperationState oldState, OperationState newState) - throws HiveSQLException { + public static void validateTransition(OperationState oldState, + OperationState newState) + throws HiveSQLException { switch (oldState) { case INITIALIZED: switch (newState) { case PENDING: case RUNNING: + case CANCELED: case CLOSED: return; } diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java index 89f2ae9..3f2de10 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java @@ -17,8 +17,6 @@ */ package org.apache.hive.service.cli.operation; - - import java.sql.SQLException; import java.util.HashMap; import java.util.Map; diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 3f36e2d..d6651ba 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -34,7 +34,6 @@ import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.thrift.TProtocolVersion; - public abstract class Operation { protected final HiveSession parentSession; private OperationState state = OperationState.INITIALIZED; diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 345617c..21c33bc 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -22,11 +22,14 @@ import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.AbstractService; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.OperationState; import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; @@ -38,6 +41,8 @@ */ public class OperationManager extends AbstractService { + private final Log LOG = LogFactory.getLog(OperationManager.class.getName()); + private HiveConf hiveConf; private final Map handleToOperation = new HashMap(); @@ -124,7 +129,8 @@ public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession, return operation; } - public synchronized Operation getOperation(OperationHandle operationHandle) throws HiveSQLException { + public synchronized Operation getOperation(OperationHandle operationHandle) + throws HiveSQLException { Operation operation = handleToOperation.get(operationHandle); if (operation == null) { throw new HiveSQLException("Invalid OperationHandle: " + operationHandle); @@ -140,12 +146,26 @@ private synchronized Operation removeOperation(OperationHandle opHandle) { return handleToOperation.remove(opHandle); } - public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + public OperationStatus getOperationStatus(OperationHandle opHandle) + throws HiveSQLException { return getOperation(opHandle).getStatus(); } public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { - getOperation(opHandle).cancel(); + Operation operation = getOperation(opHandle); + OperationState opState = operation.getStatus().getState(); + if (opState == OperationState.CANCELED || + opState == OperationState.CLOSED || + opState == OperationState.FINISHED || + opState == OperationState.ERROR || + opState == OperationState.UNKNOWN) { + // Cancel should be a no-op in either cases + LOG.debug(opHandle + ": Operation is already aborted in state - " + opState); + } + else { + LOG.debug(opHandle + ": Attempting to cancel from state - " + opState); + operation.cancel(); + } } public void closeOperation(OperationHandle opHandle) throws HiveSQLException { @@ -161,7 +181,8 @@ public TableSchema getOperationResultSetSchema(OperationHandle opHandle) return getOperation(opHandle).getResultSetSchema(); } - public RowSet getOperationNextRowSet(OperationHandle opHandle) throws HiveSQLException { + public RowSet getOperationNextRowSet(OperationHandle opHandle) + throws HiveSQLException { return getOperation(opHandle).getNextRowSet(); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index ace791a..5b2ecb8 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -135,16 +135,23 @@ private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException { // case, when calling fetch queries since execute() has returned. // For now, we disable the test attempts. driver.setTryCount(Integer.MAX_VALUE); - response = driver.run(); if (0 != response.getResponseCode()) { throw new HiveSQLException("Error while processing statement: " + response.getErrorMessage(), response.getSQLState(), response.getResponseCode()); } - } catch (HiveSQLException e) { - setState(OperationState.ERROR); - throw e; + // If the operation was cancelled by another thread, + // Driver#run will return a non-zero response code. + // We will simply return if the operation state is CANCELED, + // otherwise throw an exception + if (getStatus().getState() == OperationState.CANCELED) { + return; + } + else { + setState(OperationState.ERROR); + throw e; + } } catch (Exception e) { setState(OperationState.ERROR); throw new HiveSQLException("Error running query: " + e.toString(), e);