diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 8f67209..354f891 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -334,6 +334,7 @@ private void closeOperationHandle(TOperationHandle stmtHandle) throws SQLExcepti /** * Moves the cursor down one row from its current position. + * Blocks till results are available. * * @see java.sql.ResultSet#next() * @throws SQLException @@ -347,6 +348,14 @@ public boolean next() throws SQLException { return false; } + // Poll on the operation status, till the operation is complete. + // We need to wait only for HiveStatement to complete. + // HiveDatabaseMetaData which also uses this ResultSet returns only after the RPC is complete. + if ((statement != null) && (statement instanceof HiveStatement)) { + ((HiveStatement) statement).waitForOperationToComplete(); + ((HiveStatement) statement).setIsLogBeingGenerated(false); + } + try { TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT; if (fetchFirst) { @@ -362,7 +371,6 @@ public boolean next() throws SQLException { TFetchResultsResp fetchResp; fetchResp = client.FetchResults(fetchReq); Utils.verifySuccessWithInfo(fetchResp.getStatus()); - TRowSet results = fetchResp.getResults(); fetchedRows = RowSetFactory.create(results, protocol); fetchedRowsItr = fetchedRows.iterator(); @@ -383,7 +391,6 @@ public boolean next() throws SQLException { } catch (SQLException eS) { throw eS; } catch (Exception ex) { - ex.printStackTrace(); throw new SQLException("Error retrieving next row", ex); } // NOTE: fetchOne dosn't throw new SQLException("Method not supported"). diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index b4dba44..023df0e 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -100,7 +100,7 @@ /** * Keep this state so we can know whether the query logs are being generated in HS2. */ - private boolean isLogBeingGenerated = true; + private boolean isLogBeingGenerated = false; /** * Keep this state so we can know whether the statement is submitted to HS2 and start execution @@ -221,7 +221,10 @@ public void close() throws SQLException { } closeClientOperation(); client = null; - resultSet = null; + if (resultSet != null) { + resultSet.close(); + resultSet = null; + } isClosed = true; } @@ -265,18 +268,34 @@ public boolean execute(String sql) throws SQLException { isExecuteStatementFailed = true; throw new SQLException(ex.toString(), "08S01", ex); } + if (!stmtHandle.isHasResultSet()) { + waitForOperationToComplete(); + // Log generation would have ended now + this.isLogBeingGenerated = false; + // The query should be completed by now + return false; + } + resultSet = + new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) + .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) + .setScrollable(isScrollableResultset).build(); + // Logs will be generated as the query executes + this.isLogBeingGenerated = true; + return true; + } + + /** + * Poll on the operation status, till the operation is complete + * + * @throws SQLException + */ + void waitForOperationToComplete() throws SQLException { TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); boolean operationComplete = false; TGetOperationStatusResp statusResp; - - // Poll on the operation status, till the operation is complete while (!operationComplete) { try { - /** - * For an async SQLOperation, GetOperationStatus will use the long polling approach - * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires - */ statusResp = client.GetOperationStatus(statusReq); Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { @@ -301,24 +320,13 @@ public boolean execute(String sql) throws SQLException { } } } catch (SQLException e) { - isLogBeingGenerated = false; + this.isLogBeingGenerated = false; throw e; } catch (Exception e) { - isLogBeingGenerated = false; + this.isLogBeingGenerated = false; throw new SQLException(e.toString(), "08S01", e); } } - isLogBeingGenerated = false; - - // The query should be completed by now - if (!stmtHandle.isHasResultSet()) { - return false; - } - resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) - .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) - .setScrollable(isScrollableResultset) - .build(); - return true; } private void checkConnection(String action) throws SQLException { @@ -330,7 +338,7 @@ private void checkConnection(String action) throws SQLException { private void initFlags() { isCancelled = false; isQueryClosed = false; - isLogBeingGenerated = true; + isLogBeingGenerated = false; isExecuteStatementFailed = false; } @@ -813,7 +821,7 @@ public boolean hasMoreLogs() { throw new ClosedOrCancelledStatementException("Method getQueryLog() failed. The " + "statement has been closed or cancelled."); } - + waitForOperationToRun(); List logs = new ArrayList(); TFetchResultsResp tFetchResultsResp = null; try { @@ -849,6 +857,48 @@ public boolean hasMoreLogs() { return logs; } + /** + * Poll on the operation status, till the operation is running or complete + * + * @throws SQLException + */ + void waitForOperationToRun() throws SQLException { + TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); + boolean logReady = false; + TGetOperationStatusResp statusResp; + while (!logReady) { + try { + statusResp = client.GetOperationStatus(statusReq); + Utils.verifySuccessWithInfo(statusResp.getStatus()); + if (statusResp.isSetOperationState()) { + switch (statusResp.getOperationState()) { + case CLOSED_STATE: + case FINISHED_STATE: + case RUNNING_STATE: + logReady = true; + break; + case CANCELED_STATE: + // 01000 -> warning + throw new SQLException("Query was cancelled", "01000"); + case ERROR_STATE: + // Get the error details from the underlying exception + throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), + statusResp.getErrorCode()); + case UKNOWN_STATE: + throw new SQLException("Unknown query", "HY000"); + case INITIALIZED_STATE: + case PENDING_STATE: + break; + } + } + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new SQLException(e.toString(), "08S01", e); + } + } + } + private TFetchOrientation getFetchOrientation(boolean incremental) { if (incremental) { return TFetchOrientation.FETCH_NEXT; @@ -856,4 +906,8 @@ private TFetchOrientation getFetchOrientation(boolean incremental) { return TFetchOrientation.FETCH_FIRST; } } + + void setIsLogBeingGenerated(boolean isLogBeingGenerated) { + this.isLogBeingGenerated = isLogBeingGenerated; + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index 8868ec1..242d170 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -63,6 +66,10 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -71,7 +78,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java index 35b6c52..d7e52a8 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java @@ -20,6 +20,7 @@ import java.sql.DatabaseMetaData; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -221,7 +222,10 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -230,7 +234,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index 8db2e62..457373c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -19,6 +19,8 @@ package org.apache.hive.service.cli.operation; import java.sql.DatabaseMetaData; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; @@ -128,7 +130,10 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -137,7 +142,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java index d6f6280..806c429 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hive.service.cli.FetchOrientation; @@ -77,7 +80,10 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -86,7 +92,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index a09b39a..1b4c5ef 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; @@ -73,7 +76,10 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -82,7 +88,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index 740b851..5e78491 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -60,9 +60,9 @@ .addStringColumn("TYPE_CAT", "The types catalog.") .addStringColumn("TYPE_SCHEM", "The types schema.") .addStringColumn("TYPE_NAME", "Type name.") - .addStringColumn("SELF_REFERENCING_COL_NAME", + .addStringColumn("SELF_REFERENCING_COL_NAME", "Name of the designated \"identifier\" column of a typed table.") - .addStringColumn("REF_GENERATION", + .addStringColumn("REF_GENERATION", "Specifies how values in SELF_REFERENCING_COL_NAME are created."); protected GetTablesOperation(HiveSession parentSession, @@ -102,7 +102,7 @@ public void runInternal() throws HiveSQLException { String tablePattern = convertIdentifierPattern(tableName, true); - for (TableMeta tableMeta : + for (TableMeta tableMeta : metastoreClient.getTableMeta(schemaPattern, tablePattern, tableTypeList)) { rowSet.addRow(new Object[] { DEFAULT_HIVE_CATALOG, @@ -125,7 +125,10 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -134,7 +137,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java index 2a0fec2..1a0585c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -123,7 +126,10 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -132,7 +138,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index d9a273b..15c72a2 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -199,9 +200,16 @@ protected void setOperationException(HiveSQLException operationException) { this.operationException = operationException; } - protected final void assertState(OperationState state) throws HiveSQLException { - if (this.state != state) { - throw new HiveSQLException("Expected state " + state + ", but found " + this.state); + protected final void assertState(List states) throws HiveSQLException { + boolean validStateTransition = false; + for (OperationState state : states) { + if (this.state == state) { + validStateTransition = true; + } + } + if (!validStateTransition) { + throw new HiveSQLException("Expected states: " + states.toString() + ", but found " + + this.state); } this.lastAccessTime = System.currentTimeMillis(); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 04d816a..0d417d9 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -343,7 +343,10 @@ public void close() throws HiveSQLException { @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); if (resultSchema == null) { resultSchema = new TableSchema(driver.getSchema()); } @@ -355,7 +358,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { validateDefaultFetchOrientation(orientation); - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion());