diff --git beeline/pom.xml beeline/pom.xml index 45fa02b..9a1c5dc 100644 --- beeline/pom.xml +++ beeline/pom.xml @@ -49,6 +49,11 @@ hive-shims ${project.version} + + org.apache.hive + hive-jdbc + ${project.version} + commons-cli @@ -88,12 +93,6 @@ org.apache.hive - hive-jdbc - ${project.version} - test - - - org.apache.hive hive-exec ${project.version} tests diff --git beeline/src/java/org/apache/hive/beeline/Commands.java beeline/src/java/org/apache/hive/beeline/Commands.java index a92d69f..1854fc7 100644 --- beeline/src/java/org/apache/hive/beeline/Commands.java +++ beeline/src/java/org/apache/hive/beeline/Commands.java @@ -38,6 +38,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.SQLWarning; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; @@ -47,10 +48,13 @@ import java.util.TreeSet; import org.apache.hadoop.hive.common.cli.ShellCmdExecutor; +import org.apache.hive.jdbc.HiveStatement; public class Commands { private final BeeLine beeLine; + private static final int DEFAULT_QUERY_PROGRESS_INTERVAL = 1000; + private static final int DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT = 10 * 1000; /** * @param beeLine @@ -758,6 +762,7 @@ private boolean execute(String line, boolean call) { try { Statement stmnt = null; boolean hasResults; + Thread logThread = null; try { long start = System.currentTimeMillis(); @@ -767,7 +772,15 @@ private boolean execute(String line, boolean call) { hasResults = ((CallableStatement) stmnt).execute(); } else { stmnt = beeLine.createStatement(); - hasResults = stmnt.execute(sql); + if (beeLine.getOpts().isSilent()) { + hasResults = stmnt.execute(sql); + } else { + logThread = new Thread(createLogRunnable(stmnt)); + logThread.setDaemon(true); + logThread.start(); + hasResults = stmnt.execute(sql); + logThread.interrupt(); + } } beeLine.showWarnings(); @@ -782,6 +795,11 @@ private boolean execute(String line, boolean call) { beeLine.info(beeLine.loc("rows-selected", count) + " " + beeLine.locElapsedTime(end - start)); } finally { + if (logThread != null) { + logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); + showRemainingLogsIfAny(stmnt); + logThread = null; + } rs.close(); } } while (BeeLine.getMoreResults(stmnt)); @@ -792,6 +810,13 @@ private boolean execute(String line, boolean call) { + " " + beeLine.locElapsedTime(end - start)); } } finally { + if (logThread != null) { + if (!logThread.isInterrupted()) { + logThread.interrupt(); + } + logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); + showRemainingLogsIfAny(stmnt); + } if (stmnt != null) { stmnt.close(); } @@ -803,6 +828,61 @@ private boolean execute(String line, boolean call) { return true; } + private Runnable createLogRunnable(Statement statement) { + if (statement instanceof HiveStatement) { + final HiveStatement hiveStatement = (HiveStatement) statement; + + Runnable runnable = new Runnable() { + @Override + public void run() { + while (hiveStatement.hasMoreLogs()) { + try { + // fetch the log periodically and output to beeline console + for (String log : hiveStatement.getQueryLog()) { + beeLine.info(log); + } + Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL); + } catch (SQLException e) { + beeLine.error(new SQLWarning(e)); + return; + } catch (InterruptedException e) { + beeLine.debug("Getting log thread is interrupted, since query is done!"); + return; + } + } + } + }; + return runnable; + } else { + beeLine.debug("The statement instance is not HiveStatement type: " + statement.getClass()); + return new Runnable() { + @Override + public void run() { + // do nothing. + } + }; + } + } + + private void showRemainingLogsIfAny(Statement statement) { + if (statement instanceof HiveStatement) { + HiveStatement hiveStatement = (HiveStatement) statement; + List logs; + do { + try { + logs = hiveStatement.getQueryLog(); + } catch (SQLException e) { + beeLine.error(new SQLWarning(e)); + return; + } + for (String log : logs) { + beeLine.info(log); + } + } while (logs.size() > 0); + } else { + beeLine.debug("The statement instance is not HiveStatement type: " + statement.getClass()); + } + } public boolean quit(String line) { beeLine.setExit(true); diff --git itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java index 1e66542..6561743 100644 --- itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java +++ itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java @@ -477,4 +477,31 @@ public void testEmbeddedBeelineConnection() throws Throwable{ final String EXPECTED_PATTERN = "embedded_table"; testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, true, argList); } + + /** + * Test Beeline could show the query progress for time-consuming query. + * @throws Throwable + */ + @Test + public void testQueryProgress() throws Throwable { + final String TEST_NAME = "testQueryProgress"; + final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" + + "select count(*) from " + tableName + ";\n"; + final String EXPECTED_PATTERN = "Parsing command"; + testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, true, getBaseArgs(JDBC_URL)); + } + + /** + * Test Beeline will hide the query progress when silent option is set. + * @throws Throwable + */ + @Test + public void testQueryProgressHidden() throws Throwable { + final String TEST_NAME = "testQueryProgress"; + final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" + + "!set silent true\n" + + "select count(*) from " + tableName + ";\n"; + final String EXPECTED_PATTERN = "Parsing command"; + testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, false, getBaseArgs(JDBC_URL)); + } } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index daf8e9e..bfa8c46 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -2130,4 +2130,82 @@ public void testNonAsciiReturnValues() throws Exception { } stmt.close(); } + + /** + * Test getting query log method in Jdbc + * @throws Exception + */ + @Test + public void testGetQueryLog() throws Exception { + // Prepare + String[] expectedLogs = { + "Parsing command", + "Parse Completed", + "Starting Semantic Analysis", + "Semantic Analysis Completed", + "Starting command" + }; + String sql = "select count(*) from " + tableName; + + // Verify the fetched log (from the beginning of log file) + HiveStatement stmt = (HiveStatement)con.createStatement(); + assertNotNull("Statement is null", stmt); + stmt.executeQuery(sql); + List logs = stmt.getQueryLog(false, 10000); + stmt.close(); + verifyFetchedLog(logs, expectedLogs); + + // Verify the fetched log (incrementally) + final HiveStatement statement = (HiveStatement)con.createStatement(); + assertNotNull("Statement is null", statement); + statement.setFetchSize(10000); + final List incrementalLogs = new ArrayList(); + + Runnable logThread = new Runnable() { + @Override + public void run() { + while (statement.hasMoreLogs()) { + try { + incrementalLogs.addAll(statement.getQueryLog()); + Thread.sleep(500); + } catch (SQLException e) { + LOG.error("Failed getQueryLog. Error message: " + e.getMessage()); + fail("error in getting log thread"); + } catch (InterruptedException e) { + LOG.error("Getting log thread is interrupted. Error message: " + e.getMessage()); + fail("error in getting log thread"); + } + } + } + }; + + Thread thread = new Thread(logThread); + thread.setDaemon(true); + thread.start(); + statement.executeQuery(sql); + thread.interrupt(); + thread.join(10000); + // fetch remaining logs + List remainingLogs; + do { + remainingLogs = statement.getQueryLog(); + incrementalLogs.addAll(remainingLogs); + } while (remainingLogs.size() > 0); + statement.close(); + + verifyFetchedLog(incrementalLogs, expectedLogs); + } + + private void verifyFetchedLog(List logs, String[] expectedLogs) { + StringBuilder stringBuilder = new StringBuilder(); + + for (String log : logs) { + stringBuilder.append(log); + } + + String accumulatedLogs = stringBuilder.toString(); + for (String expectedLog : expectedLogs) { + assertTrue(accumulatedLogs.contains(expectedLog)); + } + } } diff --git jdbc/src/java/org/apache/hive/jdbc/ClosedOrCancelledStatementException.java jdbc/src/java/org/apache/hive/jdbc/ClosedOrCancelledStatementException.java new file mode 100644 index 0000000..9880208 --- /dev/null +++ jdbc/src/java/org/apache/hive/jdbc/ClosedOrCancelledStatementException.java @@ -0,0 +1,29 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hive.jdbc; + +import java.sql.SQLException; + +public class ClosedOrCancelledStatementException extends SQLException{ + + private static final long serialVersionUID = 0; + + /** + * @param msg (exception message) + */ + public ClosedOrCancelledStatementException(String msg) { + super(msg); + } +} diff --git jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 86bc580..a7dbc82 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,6 +76,7 @@ private boolean fetchFirst = false; private final TProtocolVersion protocol; + private ReentrantLock transportLock; public static class Builder { @@ -98,6 +100,7 @@ private int fetchSize = 50; private boolean emptyResultSet = false; private boolean isScrollable = false; + private ReentrantLock transportLock = null; public Builder(Statement statement) throws SQLException { this.statement = statement; @@ -166,6 +169,11 @@ public Builder setScrollable(boolean setScrollable) { return this; } + public Builder setTransportLock(ReentrantLock transportLock) { + this.transportLock = transportLock; + return this; + } + public HiveQueryResultSet build() throws SQLException { return new HiveQueryResultSet(this); } @@ -181,6 +189,7 @@ protected HiveQueryResultSet(Builder builder) throws SQLException { this.stmtHandle = builder.stmtHandle; this.sessHandle = builder.sessHandle; this.fetchSize = builder.fetchSize; + this.transportLock = builder.transportLock; columnNames = new ArrayList(); columnTypes = new ArrayList(); columnAttributes = new ArrayList(); @@ -239,7 +248,17 @@ private void retrieveSchema() throws SQLException { try { TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(stmtHandle); // TODO need session handle - TGetResultSetMetadataResp metadataResp = client.GetResultSetMetadata(metadataReq); + TGetResultSetMetadataResp metadataResp; + if (transportLock == null) { + metadataResp = client.GetResultSetMetadata(metadataReq); + } else { + transportLock.lock(); + try { + metadataResp = client.GetResultSetMetadata(metadataReq); + } finally { + transportLock.unlock(); + } + } Utils.verifySuccess(metadataResp.getStatus()); StringBuilder namesSb = new StringBuilder(); @@ -326,7 +345,17 @@ public boolean next() throws SQLException { if (fetchedRows == null || !fetchedRowsItr.hasNext()) { TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, orientation, fetchSize); - TFetchResultsResp fetchResp = client.FetchResults(fetchReq); + TFetchResultsResp fetchResp; + if (transportLock == null) { + fetchResp = client.FetchResults(fetchReq); + } else { + transportLock.lock(); + try { + fetchResp = client.FetchResults(fetchReq); + } finally { + transportLock.unlock(); + } + } Utils.verifySuccessWithInfo(fetchResp.getStatus()); TRowSet results = fetchResp.getResults(); diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 2cbf58c..d8e33d3 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -23,10 +23,14 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.thrift.TCLIService; import org.apache.hive.service.cli.thrift.TCancelOperationReq; import org.apache.hive.service.cli.thrift.TCancelOperationResp; @@ -38,6 +42,9 @@ import org.apache.hive.service.cli.thrift.TGetOperationStatusResp; import org.apache.hive.service.cli.thrift.TOperationHandle; import org.apache.hive.service.cli.thrift.TSessionHandle; +import org.apache.hive.service.cli.thrift.TFetchResultsReq; +import org.apache.hive.service.cli.thrift.TFetchResultsResp; +import org.apache.hive.service.cli.thrift.TFetchOrientation; /** * HiveStatement. @@ -77,6 +84,27 @@ */ private boolean isClosed = false; + /** + * Keep state so we can fail certain calls made after cancel(). + */ + private boolean isCancelled = false; + + /** + * Keep this state so we can know whether the query in this statement is closed. + */ + private boolean isQueryClosed = false; + + /** + * Keep this state so we can know whether the query logs are being generated in HS2. + */ + private boolean isLogBeingGenerated = true; + + /** + * Keep this state so we can know whether the statement is submitted to HS2 and start execution + * successfully. + */ + private boolean isExecuteStatementFailed = false; + // A fair reentrant lock private ReentrantLock transportLock = new ReentrantLock(true); @@ -113,6 +141,9 @@ public void addBatch(String sql) throws SQLException { @Override public void cancel() throws SQLException { checkConnection("cancel"); + if (isCancelled) { + return; + } transportLock.lock(); try { @@ -128,6 +159,7 @@ public void cancel() throws SQLException { } finally { transportLock.unlock(); } + isCancelled = true; } /* @@ -167,6 +199,8 @@ void closeClientOperation() throws SQLException { } finally { transportLock.unlock(); } + isQueryClosed = true; + isExecuteStatementFailed = false; stmtHandle = null; } @@ -202,6 +236,7 @@ public boolean execute(String sql) throws SQLException { checkConnection("execute"); closeClientOperation(); + initFlags(); TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql); /** @@ -218,9 +253,12 @@ public boolean execute(String sql) throws SQLException { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); + isExecuteStatementFailed = false; } catch (SQLException eS) { + isExecuteStatementFailed = true; throw eS; } catch (Exception ex) { + isExecuteStatementFailed = true; throw new SQLException(ex.toString(), "08S01", ex); } finally { transportLock.unlock(); @@ -266,11 +304,14 @@ public boolean execute(String sql) throws SQLException { } } } catch (SQLException e) { + isLogBeingGenerated = false; throw e; } catch (Exception e) { + isLogBeingGenerated = false; throw new SQLException(e.toString(), "08S01", e); } } + isLogBeingGenerated = false; // The query should be completed by now if (!stmtHandle.isHasResultSet()) { @@ -278,7 +319,7 @@ public boolean execute(String sql) throws SQLException { } resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) - .setScrollable(isScrollableResultset) + .setScrollable(isScrollableResultset).setTransportLock(transportLock) .build(); return true; } @@ -289,6 +330,13 @@ private void checkConnection(String action) throws SQLException { } } + private void initFlags() { + isCancelled = false; + isQueryClosed = false; + isLogBeingGenerated = true; + isExecuteStatementFailed = false; + } + /* * (non-Javadoc) * @@ -713,4 +761,93 @@ public boolean isWrapperFor(Class iface) throws SQLException { throw new SQLException("Cannot unwrap to " + iface); } + /** + * Check whether query execution might be producing more logs to be fetched. + * This method is a public API for usage outside of Hive, although it is not part of the + * interface java.sql.Statement. + * @return true if query execution might be producing more logs. It does not indicate if last + * log lines have been fetched by getQueryLog. + */ + public boolean hasMoreLogs() { + return isLogBeingGenerated; + } + + /** + * Get the execution logs of the given SQL statement. + * This method is a public API for usage outside of Hive, although it is not part of the + * interface java.sql.Statement. + * This method gets the incremental logs during SQL execution, and uses fetchSize holden by + * HiveStatement object. + * @return a list of logs. It can be empty if there are no new logs to be retrieved at that time. + * @throws SQLException + * @throws ClosedOrCancelledStatementException if statement has been cancelled or closed + */ + public List getQueryLog() throws SQLException, ClosedOrCancelledStatementException { + return getQueryLog(true, fetchSize); + } + + /** + * Get the execution logs of the given SQL statement. + * This method is a public API for usage outside of Hive, although it is not part of the + * interface java.sql.Statement. + * @param incremental indicate getting logs either incrementally or from the beginning, + * when it is true or false. + * @param fetchSize the number of lines to fetch + * @return a list of logs. It can be empty if there are no new logs to be retrieved at that time. + * @throws SQLException + * @throws ClosedOrCancelledStatementException if statement has been cancelled or closed + */ + public List getQueryLog(boolean incremental, int fetchSize) + throws SQLException, ClosedOrCancelledStatementException { + checkConnection("getQueryLog"); + if (isCancelled) { + throw new ClosedOrCancelledStatementException("Method getQueryLog() failed. The " + + "statement has been closed or cancelled."); + } + + List logs = new ArrayList(); + TFetchResultsResp tFetchResultsResp = null; + transportLock.lock(); + try { + if (stmtHandle != null) { + TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle, + getFetchOrientation(incremental), fetchSize); + tFetchResultsReq.setFetchType((short)1); + tFetchResultsResp = client.FetchResults(tFetchResultsReq); + Utils.verifySuccessWithInfo(tFetchResultsResp.getStatus()); + } else { + if (isQueryClosed) { + throw new ClosedOrCancelledStatementException("Method getQueryLog() failed. The " + + "statement has been closed or cancelled."); + } + if (isExecuteStatementFailed) { + throw new SQLException("Method getQueryLog() failed. Because the stmtHandle in " + + "HiveStatement is null and the statement execution might fail."); + } else { + return logs; + } + } + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new SQLException("Error when getting query log: " + e, e); + } finally { + transportLock.unlock(); + } + + RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), + connection.getProtocol()); + for (Object[] row : rowSet) { + logs.add((String)row[0]); + } + return logs; + } + + private TFetchOrientation getFetchOrientation(boolean incremental) { + if (incremental) { + return TFetchOrientation.FETCH_NEXT; + } else { + return TFetchOrientation.FETCH_FIRST; + } + } }