diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 01e6ea7..aac3418 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -121,8 +121,10 @@ public void cancel() throws SQLException { TCancelOperationReq cancelReq = new TCancelOperationReq(); cancelReq.setOperationHandle(stmtHandle); try { - TCancelOperationResp cancelResp = client.CancelOperation(cancelReq); - Utils.verifySuccessWithInfo(cancelResp.getStatus()); + synchronized(this) { + TCancelOperationResp cancelResp = client.CancelOperation(cancelReq); + Utils.verifySuccessWithInfo(cancelResp.getStatus()); + } } catch (SQLException e) { throw e; } catch (Exception e) { @@ -157,8 +159,10 @@ void closeClientOperation() throws SQLException { if (stmtHandle != null) { TCloseOperationReq closeReq = new TCloseOperationReq(); closeReq.setOperationHandle(stmtHandle); - TCloseOperationResp closeResp = client.CloseOperation(closeReq); - Utils.verifySuccessWithInfo(closeResp.getStatus()); + synchronized(this) { + TCloseOperationResp closeResp = client.CloseOperation(closeReq); + Utils.verifySuccessWithInfo(closeResp.getStatus()); + } } } catch (SQLException e) { throw e; @@ -217,9 +221,11 @@ public boolean execute(String sql) throws SQLException { */ execReq.setRunAsync(true); execReq.setConfOverlay(sessConf); - TExecuteStatementResp execResp = client.ExecuteStatement(execReq); - Utils.verifySuccessWithInfo(execResp.getStatus()); - stmtHandle = execResp.getOperationHandle(); + synchronized(this) { + TExecuteStatementResp execResp = client.ExecuteStatement(execReq); + Utils.verifySuccessWithInfo(execResp.getStatus()); + stmtHandle = execResp.getOperationHandle(); + } } catch (SQLException eS) { throw eS; } catch (Exception ex) { @@ -237,8 +243,10 @@ public boolean execute(String sql) throws SQLException { * For an async SQLOperation, GetOperationStatus will use the long polling approach * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires */ - statusResp = client.GetOperationStatus(statusReq); - Utils.verifySuccessWithInfo(statusResp.getStatus()); + synchronized(this) { + statusResp = client.GetOperationStatus(statusReq); + Utils.verifySuccessWithInfo(statusResp.getStatus()); + } if (statusResp.isSetOperationState()) { switch (statusResp.getOperationState()) { case CLOSED_STATE: diff --git a/service/src/java/org/apache/hive/service/cli/OperationState.java b/service/src/java/org/apache/hive/service/cli/OperationState.java index a023908..3e15f0c 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationState.java +++ b/service/src/java/org/apache/hive/service/cli/OperationState.java @@ -40,7 +40,6 @@ this.tOperationState = tOperationState; } - public static OperationState getOperationState(TOperationState tOperationState) { // TODO: replace this with a Map? for (OperationState opState : values()) { @@ -51,13 +50,15 @@ public static OperationState getOperationState(TOperationState tOperationState) return OperationState.UNKNOWN; } - public static void validateTransition(OperationState oldState, OperationState newState) - throws HiveSQLException { + public static void validateTransition(OperationState oldState, + OperationState newState) + throws HiveSQLException { switch (oldState) { case INITIALIZED: switch (newState) { case PENDING: case RUNNING: + case CANCELED: case CLOSED: return; } diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java index 89f2ae9..3f2de10 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java @@ -17,8 +17,6 @@ */ package org.apache.hive.service.cli.operation; - - import java.sql.SQLException; import java.util.HashMap; import java.util.Map; diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 3f36e2d..d6651ba 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -34,7 +34,6 @@ import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.thrift.TProtocolVersion; - public abstract class Operation { protected final HiveSession parentSession; private OperationState state = OperationState.INITIALIZED; diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 345617c..9d00632 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -22,11 +22,14 @@ import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.AbstractService; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.OperationState; import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; @@ -38,6 +41,8 @@ */ public class OperationManager extends AbstractService { + private final Log LOG = LogFactory.getLog(OperationManager.class.getName()); + private HiveConf hiveConf; private final Map handleToOperation = new HashMap(); @@ -124,7 +129,8 @@ public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession, return operation; } - public synchronized Operation getOperation(OperationHandle operationHandle) throws HiveSQLException { + public synchronized Operation getOperation(OperationHandle operationHandle) + throws HiveSQLException { Operation operation = handleToOperation.get(operationHandle); if (operation == null) { throw new HiveSQLException("Invalid OperationHandle: " + operationHandle); @@ -140,12 +146,26 @@ private synchronized Operation removeOperation(OperationHandle opHandle) { return handleToOperation.remove(opHandle); } - public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + public OperationStatus getOperationStatus(OperationHandle opHandle) + throws HiveSQLException { return getOperation(opHandle).getStatus(); } public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { - getOperation(opHandle).cancel(); + Operation operation = getOperation(opHandle); + OperationState opState = operation.getStatus().getState(); + if (opState == OperationState.CANCELED || + opState == OperationState.CLOSED || + opState == OperationState.FINISHED || + opState == OperationState.ERROR || + opState == OperationState.UNKNOWN) { + // Cancel should be a no-op in either cases + LOG.debug("Operation is already aborted in state: " + opState); + } + else { + LOG.debug("Attempting to cancel from: " + opState); + operation.cancel(); + } } public void closeOperation(OperationHandle opHandle) throws HiveSQLException { @@ -161,7 +181,8 @@ public TableSchema getOperationResultSetSchema(OperationHandle opHandle) return getOperation(opHandle).getResultSetSchema(); } - public RowSet getOperationNextRowSet(OperationHandle opHandle) throws HiveSQLException { + public RowSet getOperationNextRowSet(OperationHandle opHandle) + throws HiveSQLException { return getOperation(opHandle).getNextRowSet(); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index ace791a..5b2ecb8 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -135,16 +135,23 @@ private void runInternal(HiveConf sqlOperationConf) throws HiveSQLException { // case, when calling fetch queries since execute() has returned. // For now, we disable the test attempts. driver.setTryCount(Integer.MAX_VALUE); - response = driver.run(); if (0 != response.getResponseCode()) { throw new HiveSQLException("Error while processing statement: " + response.getErrorMessage(), response.getSQLState(), response.getResponseCode()); } - } catch (HiveSQLException e) { - setState(OperationState.ERROR); - throw e; + // If the operation was cancelled by another thread, + // Driver#run will return a non-zero response code. + // We will simply return if the operation state is CANCELED, + // otherwise throw an exception + if (getStatus().getState() == OperationState.CANCELED) { + return; + } + else { + setState(OperationState.ERROR); + throw e; + } } catch (Exception e) { setState(OperationState.ERROR); throw new HiveSQLException("Error running query: " + e.toString(), e);