diff --git beeline/pom.xml beeline/pom.xml index 45fa02b..9a1c5dc 100644 --- beeline/pom.xml +++ beeline/pom.xml @@ -49,6 +49,11 @@ hive-shims ${project.version} + + org.apache.hive + hive-jdbc + ${project.version} + commons-cli @@ -88,12 +93,6 @@ org.apache.hive - hive-jdbc - ${project.version} - test - - - org.apache.hive hive-exec ${project.version} tests diff --git beeline/src/java/org/apache/hive/beeline/Commands.java beeline/src/java/org/apache/hive/beeline/Commands.java index a92d69f..85aaedf 100644 --- beeline/src/java/org/apache/hive/beeline/Commands.java +++ beeline/src/java/org/apache/hive/beeline/Commands.java @@ -47,10 +47,12 @@ import java.util.TreeSet; import org.apache.hadoop.hive.common.cli.ShellCmdExecutor; +import org.apache.hive.jdbc.HiveStatement; public class Commands { private final BeeLine beeLine; + private static final int DEFAULT_QUERY_PROGRESS_INTERVAL = 500; /** * @param beeLine @@ -758,6 +760,7 @@ private boolean execute(String line, boolean call) { try { Statement stmnt = null; boolean hasResults; + Thread logThread = null; try { long start = System.currentTimeMillis(); @@ -767,7 +770,14 @@ private boolean execute(String line, boolean call) { hasResults = ((CallableStatement) stmnt).execute(); } else { stmnt = beeLine.createStatement(); - hasResults = stmnt.execute(sql); + if (beeLine.getOpts().isSilent()) { + hasResults = stmnt.execute(sql); + } else { + logThread = new Thread(createLogRunnable(stmnt)); + logThread.start(); + hasResults = stmnt.execute(sql); + logThread.interrupt(); + } } beeLine.showWarnings(); @@ -795,6 +805,9 @@ private boolean execute(String line, boolean call) { if (stmnt != null) { stmnt.close(); } + if (logThread != null && !logThread.isInterrupted()) { + logThread.interrupt(); + } } } catch (Exception e) { return beeLine.error(e); @@ -803,6 +816,56 @@ private boolean execute(String line, boolean call) { return true; } + private Runnable createLogRunnable(Statement statement) { + if (statement instanceof HiveStatement) { + final HiveStatement hiveStatement = (HiveStatement) statement; + + Runnable runnable = new Runnable() { + @Override + public void run() { + long timeout = System.currentTimeMillis() + 100000; + + try { + // wait util the query leaves IDLE state + while (hiveStatement.getQueryStatus() == HiveStatement.QueryStatus.IDLE) { + if (System.currentTimeMillis() > timeout) { + break; + } + Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL); + } + + // fetch the log periodically and output to beeline console + while (hiveStatement.getQueryStatus() == HiveStatement.QueryStatus.STARTED) { + showProgress(hiveStatement.getQueryLog()); + Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL); + } + } catch (SQLException e) { + beeLine.error(e); + return; + } catch (InterruptedException e) { + beeLine.debug("Getting log thread is interrupted, since query is done!"); + return; + } + } + }; + return runnable; + } else { + beeLine.error("The statement instance is not HiveStatement type: " + statement.getClass()); + return new Runnable() { + @Override + public void run() { + // do nothing. + } + }; + } + } + + private void showProgress(List logs) { + for (String log : logs) { + // TODO: add a possible filter to reduce the output logs for beeline users. + beeLine.info(log); + } + } public boolean quit(String line) { beeLine.setExit(true); diff --git itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java index e1d44ec..8aaa694 100644 --- itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java +++ itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java @@ -477,4 +477,31 @@ public void testEmbeddedBeelineConnection() throws Throwable{ final String EXPECTED_PATTERN = "embedded_table"; testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, true, argList); } + + /** + * Test Beeline could show the query progress for time-consuming query. + * @throws Throwable + */ + @Test + public void testQueryProgress() throws Throwable { + final String TEST_NAME = "testQueryProgress"; + final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" + + "select count(*) from " + tableName + ";\n"; + final String EXPECTED_PATTERN = "Parsing command"; + testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, true, getBaseArgs(JDBC_URL)); + } + + /** + * Test Beeline will hide the query progress when silent option is set. + * @throws Throwable + */ + @Test + public void testQueryProgressHidden() throws Throwable { + final String TEST_NAME = "testQueryProgress"; + final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" + + "!set silent true\n" + + "select count(*) from " + tableName + ";\n"; + final String EXPECTED_PATTERN = "Parsing command"; + testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, false, getBaseArgs(JDBC_URL)); + } } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index ae128a9..06250df 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -2126,4 +2126,84 @@ public void testNonAsciiReturnValues() throws Exception { } stmt.close(); } + + /** + * Test getting query log method in Jdbc + * @throws Exception + */ + @Test + public void testGetQueryLog() throws Exception { + // Prepare + String[] expectedLogs = { + "Parsing command", + "Parse Completed", + "Starting Semantic Analysis", + "Semantic Analysis Completed", + "Starting command" + }; + String sql = "select count(*) from " + tableName; + + // Verify the fetched log (from the beginning of log file) + HiveStatement stmt = (HiveStatement)con.createStatement(); + assertNotNull("Statement is null", stmt); + stmt.setFetchSize(10000); + stmt.executeQuery(sql); + List logs = stmt.getQueryLog(false); + stmt.close(); + verifyFetchedLog(logs, expectedLogs); + + // Verify the fetched log (incrementally) + final HiveStatement statement = (HiveStatement)con.createStatement(); + assertNotNull("Statement is null", statement); + statement.setFetchSize(10000); + final List incrementalLogs = new ArrayList(); + + Runnable logThread = new Runnable() { + @Override + public void run() { + long timeout = System.currentTimeMillis() + 100000; + try { + while (statement.getQueryStatus() == HiveStatement.QueryStatus.IDLE) { + if (System.currentTimeMillis() > timeout) { + break; + } + Thread.sleep(500); + } + + while (statement.getQueryStatus() == HiveStatement.QueryStatus.STARTED) { + incrementalLogs.addAll(statement.getQueryLog()); + Thread.sleep(500); + } + + incrementalLogs.addAll(statement.getQueryLog()); + } catch (SQLException e) { + LOG.error("Failed getQueryLog. Error message: " + e.getMessage()); + fail("error in getting log thread"); + } catch (InterruptedException e) { + LOG.error("Getting log thread is interrupted. Error message: " + e.getMessage()); + fail("error in getting log thread"); + } + } + }; + + Thread thread = new Thread(logThread); + thread.start(); + statement.executeQuery(sql); + thread.join(100000); + statement.close(); + verifyFetchedLog(incrementalLogs, expectedLogs); + } + + private void verifyFetchedLog(List logs, String[] expectedLogs) { + StringBuilder stringBuilder = new StringBuilder(); + + for (String log : logs) { + stringBuilder.append(log); + } + + String accumulatedLogs = stringBuilder.toString(); + for (String expectedLog : expectedLogs) { + assertTrue(accumulatedLogs.contains(expectedLog)); + } + } } diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 2cbf58c..1aaccbe 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -23,21 +23,16 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; -import org.apache.hive.service.cli.thrift.TCLIService; -import org.apache.hive.service.cli.thrift.TCancelOperationReq; -import org.apache.hive.service.cli.thrift.TCancelOperationResp; -import org.apache.hive.service.cli.thrift.TCloseOperationReq; -import org.apache.hive.service.cli.thrift.TCloseOperationResp; -import org.apache.hive.service.cli.thrift.TExecuteStatementReq; -import org.apache.hive.service.cli.thrift.TExecuteStatementResp; -import org.apache.hive.service.cli.thrift.TGetOperationStatusReq; -import org.apache.hive.service.cli.thrift.TGetOperationStatusResp; -import org.apache.hive.service.cli.thrift.TOperationHandle; -import org.apache.hive.service.cli.thrift.TSessionHandle; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.RowSetFactory; +import org.apache.hive.service.cli.thrift.*; +import sun.net.www.content.image.x_xbitmap; /** * HiveStatement. @@ -80,6 +75,23 @@ // A fair reentrant lock private ReentrantLock transportLock = new ReentrantLock(true); + /** + * The given SQL execution status in the statement. Since the SQL execution in HIVE always + * takes long time and is blocking, JDBC user could get the query status in another thread. + * It is also used to sync up execute() and getQueryLog() method. + * + */ + private volatile QueryStatus queryStatus = QueryStatus.IDLE; + + public enum QueryStatus { + IDLE, + STARTED, + START_ERROR, + FINISHED, + FINISH_ERROR, + CANCELED + } + public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle) { this(connection, client, sessHandle, false); @@ -121,6 +133,7 @@ public void cancel() throws SQLException { TCancelOperationResp cancelResp = client.CancelOperation(cancelReq); Utils.verifySuccessWithInfo(cancelResp.getStatus()); } + setQueryStatus(QueryStatus.CANCELED); } catch (SQLException e) { throw e; } catch (Exception e) { @@ -160,6 +173,7 @@ void closeClientOperation() throws SQLException { TCloseOperationResp closeResp = client.CloseOperation(closeReq); Utils.verifySuccessWithInfo(closeResp.getStatus()); } + setQueryStatus(QueryStatus.IDLE); } catch (SQLException e) { throw e; } catch (Exception e) { @@ -218,9 +232,12 @@ public boolean execute(String sql) throws SQLException { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); + setQueryStatus(QueryStatus.STARTED); } catch (SQLException eS) { + setQueryStatus(QueryStatus.START_ERROR); throw eS; } catch (Exception ex) { + setQueryStatus(QueryStatus.START_ERROR); throw new SQLException(ex.toString(), "08S01", ex); } finally { transportLock.unlock(); @@ -266,20 +283,28 @@ public boolean execute(String sql) throws SQLException { } } } catch (SQLException e) { + setQueryStatus(QueryStatus.FINISH_ERROR); throw e; } catch (Exception e) { + setQueryStatus(QueryStatus.FINISH_ERROR); throw new SQLException(e.toString(), "08S01", e); } } // The query should be completed by now + setQueryStatus(QueryStatus.FINISHED); if (!stmtHandle.isHasResultSet()) { return false; } - resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) - .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) - .setScrollable(isScrollableResultset) - .build(); + + transportLock.lock(); + try { + resultSet = new HiveQueryResultSet.Builder(this).setClient(client) + .setSessionHandle(sessHandle).setStmtHandle(stmtHandle).setMaxRows(maxRows) + .setFetchSize(fetchSize).setScrollable(isScrollableResultset).build(); + } finally { + transportLock.unlock(); + } return true; } @@ -713,4 +738,76 @@ public boolean isWrapperFor(Class iface) throws SQLException { throw new SQLException("Cannot unwrap to " + iface); } + public synchronized QueryStatus getQueryStatus() { + return queryStatus; + } + + private synchronized void setQueryStatus(QueryStatus queryStatus) { + this.queryStatus = queryStatus; + } + + /** + * Get the execution logs of the given SQL statement. + * This method get the incremental logs during SQL execution. + * @return a list of logs + * @throws SQLException + */ + public List getQueryLog() throws SQLException { + return getQueryLog(true); + } + + /** + * Get the execution logs of the given SQL statement. + * @param incremental indicate getting logs either incrementally or from the beginning, + * when it is true or false. + * @return a list of logs + * @throws SQLException + */ + public List getQueryLog(boolean incremental) throws SQLException { + checkConnection("getQueryLog"); + checkQueryStatus("getQueryLog"); + + List logs = new ArrayList(); + TFetchResultsResp tFetchResultsResp = null; + transportLock.lock(); + try { + if (stmtHandle != null) { + TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle, + getFetchOrientation(incremental), fetchSize); + tFetchResultsReq.setFetchType((short)1); + tFetchResultsResp = client.FetchResults(tFetchResultsReq); + Utils.verifySuccessWithInfo(tFetchResultsResp.getStatus()); + } else { + throw new SQLException("The stmtHandle is null. The statement might be not executed or " + + "already be closed."); + } + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new SQLException("Error when getting query log", e); + } finally { + transportLock.unlock(); + } + + RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), + connection.getProtocol()); + for (Object[] row : rowSet) { + logs.add((String)row[0]); + } + return logs; + } + + private TFetchOrientation getFetchOrientation(boolean incremental) { + if (incremental) { + return TFetchOrientation.FETCH_NEXT; + } else { + return TFetchOrientation.FETCH_FIRST; + } + } + + private void checkQueryStatus(String action) throws SQLException{ + if (queryStatus == QueryStatus.IDLE) { + throw new SQLException("Cannot " + action + " when query is in IDLE status"); + } + } }