diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index f0d0c77..01e6ea7 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -84,7 +84,7 @@ public HiveStatement(HiveConnection connection, TCLIService.Iface client, } public HiveStatement(HiveConnection connection, TCLIService.Iface client, - TSessionHandle sessHandle, boolean isScrollableResultset) { + TSessionHandle sessHandle, boolean isScrollableResultset) { this.connection = connection; this.client = client; this.sessHandle = sessHandle; @@ -97,6 +97,7 @@ public HiveStatement(HiveConnection connection, TCLIService.Iface client, * @see java.sql.Statement#addBatch(java.lang.String) */ + @Override public void addBatch(String sql) throws SQLException { throw new SQLException("Method not supported"); } @@ -107,6 +108,7 @@ public void addBatch(String sql) throws SQLException { * @see java.sql.Statement#cancel() */ + @Override public void cancel() throws SQLException { if (isClosed) { throw new SQLException("Can't cancel after statement has been closed"); @@ -134,6 +136,7 @@ public void cancel() throws SQLException { * @see java.sql.Statement#clearBatch() */ + @Override public void clearBatch() throws SQLException { throw new SQLException("Method not supported"); } @@ -144,6 +147,7 @@ public void clearBatch() throws SQLException { * @see java.sql.Statement#clearWarnings() */ + @Override public void clearWarnings() throws SQLException { warningChain = null; } @@ -169,6 +173,7 @@ void closeClientOperation() throws SQLException { * * @see java.sql.Statement#close() */ + @Override public void close() throws SQLException { if (isClosed) { return; @@ -192,6 +197,7 @@ public void closeOnCompletion() throws SQLException { * @see java.sql.Statement#execute(java.lang.String) */ + @Override public boolean execute(String sql) throws SQLException { if (isClosed) { throw new SQLException("Can't execute after statement has been closed"); @@ -203,6 +209,13 @@ public boolean execute(String sql) throws SQLException { } TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql); + /** + * Run asynchronously whenever possible + * Currently only a SQLOperation can be run asynchronously, + * in a background operation thread + * Compilation is synchronous and execution is asynchronous + */ + execReq.setRunAsync(true); execReq.setConfOverlay(sessConf); TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); @@ -213,46 +226,49 @@ public boolean execute(String sql) throws SQLException { throw new SQLException(ex.toString(), "08S01", ex); } - if (!stmtHandle.isHasResultSet()) { - // Poll until the query has completed one way or another. DML queries will not return a result - // set, but we should not return from this method until the query has completed to avoid - // racing with possible subsequent session shutdown, or queries that depend on the results - // materialised here. - TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); - boolean requestComplete = false; - while (!requestComplete) { - try { - TGetOperationStatusResp statusResp = client.GetOperationStatus(statusReq); - Utils.verifySuccessWithInfo(statusResp.getStatus()); - if (statusResp.isSetOperationState()) { - switch (statusResp.getOperationState()) { - case CLOSED_STATE: - case FINISHED_STATE: - return false; - case CANCELED_STATE: - // 01000 -> warning - throw new SQLException("Query was cancelled", "01000"); - case ERROR_STATE: - // HY000 -> general error - throw new SQLException("Query failed", "HY000"); - case UKNOWN_STATE: - throw new SQLException("Unknown query", "HY000"); - case INITIALIZED_STATE: - case PENDING_STATE: - case RUNNING_STATE: - break; - } + TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); + boolean operationComplete = false; + TGetOperationStatusResp statusResp; + + // Poll on the operation status, till the operation is complete + while (!operationComplete) { + try { + /** + * For an async SQLOperation, GetOperationStatus will use the long polling approach + * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires + */ + statusResp = client.GetOperationStatus(statusReq); + Utils.verifySuccessWithInfo(statusResp.getStatus()); + if (statusResp.isSetOperationState()) { + switch (statusResp.getOperationState()) { + case CLOSED_STATE: + case FINISHED_STATE: + operationComplete = true; + break; + case CANCELED_STATE: + // 01000 -> warning + throw new SQLException("Query was cancelled", "01000"); + case ERROR_STATE: + // Get the error details from the underlying exception + throw new SQLException(statusResp.getErrorMessage(), + statusResp.getSqlState(), statusResp.getErrorCode()); + case UKNOWN_STATE: + throw new SQLException("Unknown query", "HY000"); + case INITIALIZED_STATE: + case PENDING_STATE: + case RUNNING_STATE: + break; } - } catch (Exception ex) { - throw new SQLException(ex.toString(), "08S01", ex); - } - - try { - Thread.sleep(100); - } catch (InterruptedException ex) { - // Ignore } + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new SQLException(e.toString(), "08S01", e); } + } + + // The query should be completed by now + if (!stmtHandle.isHasResultSet()) { return false; } resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) @@ -268,6 +284,7 @@ public boolean execute(String sql) throws SQLException { * @see java.sql.Statement#execute(java.lang.String, int) */ + @Override public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { throw new SQLException("Method not supported"); } @@ -278,6 +295,7 @@ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { * @see java.sql.Statement#execute(java.lang.String, int[]) */ + @Override public boolean execute(String sql, int[] columnIndexes) throws SQLException { throw new SQLException("Method not supported"); } @@ -288,6 +306,7 @@ public boolean execute(String sql, int[] columnIndexes) throws SQLException { * @see java.sql.Statement#execute(java.lang.String, java.lang.String[]) */ + @Override public boolean execute(String sql, String[] columnNames) throws SQLException { throw new SQLException("Method not supported"); } @@ -298,6 +317,7 @@ public boolean execute(String sql, String[] columnNames) throws SQLException { * @see java.sql.Statement#executeBatch() */ + @Override public int[] executeBatch() throws SQLException { throw new SQLException("Method not supported"); } @@ -308,6 +328,7 @@ public boolean execute(String sql, String[] columnNames) throws SQLException { * @see java.sql.Statement#executeQuery(java.lang.String) */ + @Override public ResultSet executeQuery(String sql) throws SQLException { if (!execute(sql)) { throw new SQLException("The query did not generate a result set!"); @@ -321,6 +342,7 @@ public ResultSet executeQuery(String sql) throws SQLException { * @see java.sql.Statement#executeUpdate(java.lang.String) */ + @Override public int executeUpdate(String sql) throws SQLException { execute(sql); return 0; @@ -332,6 +354,7 @@ public int executeUpdate(String sql) throws SQLException { * @see java.sql.Statement#executeUpdate(java.lang.String, int) */ + @Override public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { throw new SQLException("Method not supported"); } @@ -342,6 +365,7 @@ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException * @see java.sql.Statement#executeUpdate(java.lang.String, int[]) */ + @Override public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { throw new SQLException("Method not supported"); } @@ -352,6 +376,7 @@ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { * @see java.sql.Statement#executeUpdate(java.lang.String, java.lang.String[]) */ + @Override public int executeUpdate(String sql, String[] columnNames) throws SQLException { throw new SQLException("Method not supported"); } @@ -362,6 +387,7 @@ public int executeUpdate(String sql, String[] columnNames) throws SQLException { * @see java.sql.Statement#getConnection() */ + @Override public Connection getConnection() throws SQLException { return this.connection; } @@ -372,6 +398,7 @@ public Connection getConnection() throws SQLException { * @see java.sql.Statement#getFetchDirection() */ + @Override public int getFetchDirection() throws SQLException { throw new SQLException("Method not supported"); } @@ -382,6 +409,7 @@ public int getFetchDirection() throws SQLException { * @see java.sql.Statement#getFetchSize() */ + @Override public int getFetchSize() throws SQLException { return fetchSize; } @@ -392,6 +420,7 @@ public int getFetchSize() throws SQLException { * @see java.sql.Statement#getGeneratedKeys() */ + @Override public ResultSet getGeneratedKeys() throws SQLException { throw new SQLException("Method not supported"); } @@ -402,6 +431,7 @@ public ResultSet getGeneratedKeys() throws SQLException { * @see java.sql.Statement#getMaxFieldSize() */ + @Override public int getMaxFieldSize() throws SQLException { throw new SQLException("Method not supported"); } @@ -412,6 +442,7 @@ public int getMaxFieldSize() throws SQLException { * @see java.sql.Statement#getMaxRows() */ + @Override public int getMaxRows() throws SQLException { return maxRows; } @@ -422,6 +453,7 @@ public int getMaxRows() throws SQLException { * @see java.sql.Statement#getMoreResults() */ + @Override public boolean getMoreResults() throws SQLException { throw new SQLException("Method not supported"); } @@ -432,6 +464,7 @@ public boolean getMoreResults() throws SQLException { * @see java.sql.Statement#getMoreResults(int) */ + @Override public boolean getMoreResults(int current) throws SQLException { throw new SQLException("Method not supported"); } @@ -442,6 +475,7 @@ public boolean getMoreResults(int current) throws SQLException { * @see java.sql.Statement#getQueryTimeout() */ + @Override public int getQueryTimeout() throws SQLException { throw new SQLException("Method not supported"); } @@ -452,6 +486,7 @@ public int getQueryTimeout() throws SQLException { * @see java.sql.Statement#getResultSet() */ + @Override public ResultSet getResultSet() throws SQLException { return resultSet; } @@ -462,6 +497,7 @@ public ResultSet getResultSet() throws SQLException { * @see java.sql.Statement#getResultSetConcurrency() */ + @Override public int getResultSetConcurrency() throws SQLException { throw new SQLException("Method not supported"); } @@ -472,6 +508,7 @@ public int getResultSetConcurrency() throws SQLException { * @see java.sql.Statement#getResultSetHoldability() */ + @Override public int getResultSetHoldability() throws SQLException { throw new SQLException("Method not supported"); } @@ -482,6 +519,7 @@ public int getResultSetHoldability() throws SQLException { * @see java.sql.Statement#getResultSetType() */ + @Override public int getResultSetType() throws SQLException { throw new SQLException("Method not supported"); } @@ -492,6 +530,7 @@ public int getResultSetType() throws SQLException { * @see java.sql.Statement#getUpdateCount() */ + @Override public int getUpdateCount() throws SQLException { return 0; } @@ -502,6 +541,7 @@ public int getUpdateCount() throws SQLException { * @see java.sql.Statement#getWarnings() */ + @Override public SQLWarning getWarnings() throws SQLException { return warningChain; } @@ -512,6 +552,7 @@ public SQLWarning getWarnings() throws SQLException { * @see java.sql.Statement#isClosed() */ + @Override public boolean isClosed() throws SQLException { return isClosed; } @@ -527,6 +568,7 @@ public boolean isCloseOnCompletion() throws SQLException { * @see java.sql.Statement#isPoolable() */ + @Override public boolean isPoolable() throws SQLException { throw new SQLException("Method not supported"); } @@ -537,6 +579,7 @@ public boolean isPoolable() throws SQLException { * @see java.sql.Statement#setCursorName(java.lang.String) */ + @Override public void setCursorName(String name) throws SQLException { throw new SQLException("Method not supported"); } @@ -547,6 +590,7 @@ public void setCursorName(String name) throws SQLException { * @see java.sql.Statement#setEscapeProcessing(boolean) */ + @Override public void setEscapeProcessing(boolean enable) throws SQLException { throw new SQLException("Method not supported"); } @@ -557,6 +601,7 @@ public void setEscapeProcessing(boolean enable) throws SQLException { * @see java.sql.Statement#setFetchDirection(int) */ + @Override public void setFetchDirection(int direction) throws SQLException { throw new SQLException("Method not supported"); } @@ -567,6 +612,7 @@ public void setFetchDirection(int direction) throws SQLException { * @see java.sql.Statement#setFetchSize(int) */ + @Override public void setFetchSize(int rows) throws SQLException { fetchSize = rows; } @@ -577,6 +623,7 @@ public void setFetchSize(int rows) throws SQLException { * @see java.sql.Statement#setMaxFieldSize(int) */ + @Override public void setMaxFieldSize(int max) throws SQLException { throw new SQLException("Method not supported"); } @@ -587,6 +634,7 @@ public void setMaxFieldSize(int max) throws SQLException { * @see java.sql.Statement#setMaxRows(int) */ + @Override public void setMaxRows(int max) throws SQLException { if (max < 0) { throw new SQLException("max must be >= 0"); @@ -600,6 +648,7 @@ public void setMaxRows(int max) throws SQLException { * @see java.sql.Statement#setPoolable(boolean) */ + @Override public void setPoolable(boolean poolable) throws SQLException { throw new SQLException("Method not supported"); } @@ -610,6 +659,7 @@ public void setPoolable(boolean poolable) throws SQLException { * @see java.sql.Statement#setQueryTimeout(int) */ + @Override public void setQueryTimeout(int seconds) throws SQLException { throw new SQLException("Method not supported"); } @@ -620,6 +670,7 @@ public void setQueryTimeout(int seconds) throws SQLException { * @see java.sql.Wrapper#isWrapperFor(java.lang.Class) */ + @Override public boolean isWrapperFor(Class iface) throws SQLException { throw new SQLException("Method not supported"); } @@ -630,6 +681,7 @@ public boolean isWrapperFor(Class iface) throws SQLException { * @see java.sql.Wrapper#unwrap(java.lang.Class) */ + @Override public T unwrap(Class iface) throws SQLException { throw new SQLException("Method not supported"); }