From 9afa0ae9a9e2c38be3fbdadab230fdd399ab8e5b Mon Sep 17 00:00:00 2001 From: amrk7s Date: Mon, 25 Dec 2017 18:50:22 +0530 Subject: [PATCH] HIVE-18338 Exposing asynchronous execution through hive-jdbc client --- .../java/org/apache/hive/jdbc/HiveStatement.java | 161 ++++++++++++++------- 1 file changed, 112 insertions(+), 49 deletions(-) diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 0f2193fb632..9a2954c28aa 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -37,7 +37,6 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp; import org.apache.hive.service.rpc.thrift.TGetQueryIdReq; import org.apache.hive.service.rpc.thrift.TOperationHandle; -import org.apache.hive.service.rpc.thrift.TOperationState; import org.apache.hive.service.rpc.thrift.TSessionHandle; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -269,10 +268,7 @@ public boolean execute(String sql) throws SQLException { if (!status.isHasResultSet() && !stmtHandle.isHasResultSet()) { return false; } - resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) - .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) - .setScrollable(isScrollableResultset) - .build(); + buildResultSet(); return true; } @@ -297,13 +293,114 @@ public boolean executeAsync(String sql) throws SQLException { if (!status.isHasResultSet()) { return false; } + buildResultSet(); + return true; + } + + /** + * Utility method to build result set. To be called only if the query can produce one. + * @throws SQLException + */ + private void buildResultSet() throws SQLException { resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) .setScrollable(isScrollableResultset).build(); + } + + /** + * Given an operation handle, this method tries to latch on to the execution. + * The operation in synchronous, hence it will wait for completion + * + * @param tOperationHandle + * @return true if the first result is a ResultSet object; false if it is an update count or there + * are no results + * @throws SQLException if the latch failed + */ + public boolean latchSync(TOperationHandle tOperationHandle) throws SQLException { + refreshStatus(new TGetOperationStatusReq(tOperationHandle)); + stmtHandle = tOperationHandle; + waitForOperationToComplete(); + if (!stmtHandle.isHasResultSet()) { + return false; + } + buildResultSet(); return true; } + /** + * Given an operation handle, this method tries to latch on to the execution + * + * @param tOperationHandle + * @return true if the first result is a ResultSet object; false if it is an update count or there + * are no results + * @throws SQLException if the latch failed + */ + public boolean latchAsync(TOperationHandle tOperationHandle) throws SQLException { + refreshStatus(new TGetOperationStatusReq(tOperationHandle)); + stmtHandle = tOperationHandle; + if (!stmtHandle.isHasResultSet()) { + return false; + } + buildResultSet(); + return true; + } + + /** + * + * @param statusReq + * @return returns operation handle if avilable + * @throws SQLException + */ + private TGetOperationStatusResp refreshStatus(TGetOperationStatusReq statusReq) throws SQLException { + TGetOperationStatusResp statusResp; + try { + /** + * For an async SQLOperation, GetOperationStatus will use the long polling approach It will + * essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires + */ + statusResp = client.GetOperationStatus(statusReq); + inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse()); + Utils.verifySuccessWithInfo(statusResp.getStatus()); + if (statusResp.isSetOperationState()) { + switch (statusResp.getOperationState()) { + case CLOSED_STATE: + case FINISHED_STATE: + isOperationComplete = true; + isLogBeingGenerated = false; + break; + case CANCELED_STATE: + // 01000 -> warning + String errMsg = statusResp.getErrorMessage(); + if (errMsg != null && !errMsg.isEmpty()) { + throw new SQLException("Query was cancelled. " + errMsg, "01000"); + } else { + throw new SQLException("Query was cancelled", "01000"); + } + case TIMEDOUT_STATE: + throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds"); + case ERROR_STATE: + // Get the error details from the underlying exception + throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), + statusResp.getErrorCode()); + case UKNOWN_STATE: + throw new SQLException("Unknown query", "HY000"); + case INITIALIZED_STATE: + case PENDING_STATE: + case RUNNING_STATE: + break; + } + } + } catch (SQLException e) { + isLogBeingGenerated = false; + throw e; + } catch (Exception e) { + isLogBeingGenerated = false; + throw new SQLException(e.toString(), "08S01", e); + } + return statusResp; + } + private void runAsyncOnServer(String sql) throws SQLException { checkConnection("execute"); @@ -370,50 +467,7 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException { // Poll on the operation status, till the operation is complete while (!isOperationComplete) { - try { - /** - * For an async SQLOperation, GetOperationStatus will use the long polling approach It will - * essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires - */ - statusResp = client.GetOperationStatus(statusReq); - inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse()); - Utils.verifySuccessWithInfo(statusResp.getStatus()); - if (statusResp.isSetOperationState()) { - switch (statusResp.getOperationState()) { - case CLOSED_STATE: - case FINISHED_STATE: - isOperationComplete = true; - isLogBeingGenerated = false; - break; - case CANCELED_STATE: - // 01000 -> warning - String errMsg = statusResp.getErrorMessage(); - if (errMsg != null && !errMsg.isEmpty()) { - throw new SQLException("Query was cancelled. " + errMsg, "01000"); - } else { - throw new SQLException("Query was cancelled", "01000"); - } - case TIMEDOUT_STATE: - throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds"); - case ERROR_STATE: - // Get the error details from the underlying exception - throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), - statusResp.getErrorCode()); - case UKNOWN_STATE: - throw new SQLException("Unknown query", "HY000"); - case INITIALIZED_STATE: - case PENDING_STATE: - case RUNNING_STATE: - break; - } - } - } catch (SQLException e) { - isLogBeingGenerated = false; - throw e; - } catch (Exception e) { - isLogBeingGenerated = false; - throw new SQLException(e.toString(), "08S01", e); - } + statusResp = refreshStatus(statusReq); } /* @@ -649,6 +703,15 @@ public int getQueryTimeout() throws SQLException { return 0; } + /* + * Returns the operation handle involved in the statement. + * The operation handle can be persisted by the client to resume executions. + */ + public TOperationHandle getOperationHandle() { + return stmtHandle; + } + + /* * (non-Javadoc) *