diff --git beeline/pom.xml beeline/pom.xml
index 45fa02b..9a1c5dc 100644
--- beeline/pom.xml
+++ beeline/pom.xml
@@ -49,6 +49,11 @@
hive-shims
${project.version}
+
+ org.apache.hive
+ hive-jdbc
+ ${project.version}
+
commons-cli
@@ -88,12 +93,6 @@
org.apache.hive
- hive-jdbc
- ${project.version}
- test
-
-
- org.apache.hive
hive-exec
${project.version}
tests
diff --git beeline/src/java/org/apache/hive/beeline/Commands.java beeline/src/java/org/apache/hive/beeline/Commands.java
index a92d69f..ca62c2c 100644
--- beeline/src/java/org/apache/hive/beeline/Commands.java
+++ beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -47,10 +47,13 @@
import java.util.TreeSet;
import org.apache.hadoop.hive.common.cli.ShellCmdExecutor;
+import org.apache.hive.jdbc.HiveStatement;
public class Commands {
private final BeeLine beeLine;
+ private static final int DEFAULT_QUERY_PROGRESS_INTERVAL = 1000;
+ private static final int DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT = 10 * 1000;
/**
* @param beeLine
@@ -758,6 +761,7 @@ private boolean execute(String line, boolean call) {
try {
Statement stmnt = null;
boolean hasResults;
+ Thread logThread = null;
try {
long start = System.currentTimeMillis();
@@ -767,7 +771,15 @@ private boolean execute(String line, boolean call) {
hasResults = ((CallableStatement) stmnt).execute();
} else {
stmnt = beeLine.createStatement();
- hasResults = stmnt.execute(sql);
+ if (beeLine.getOpts().isSilent()) {
+ hasResults = stmnt.execute(sql);
+ } else {
+ logThread = new Thread(createLogRunnable(stmnt));
+ logThread.setDaemon(true);
+ logThread.start();
+ hasResults = stmnt.execute(sql);
+ logThread.interrupt();
+ }
}
beeLine.showWarnings();
@@ -782,6 +794,11 @@ private boolean execute(String line, boolean call) {
beeLine.info(beeLine.loc("rows-selected", count) + " "
+ beeLine.locElapsedTime(end - start));
} finally {
+ if (logThread != null) {
+ logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
+ showRemainingLogsIfAny(stmnt);
+ logThread = null;
+ }
rs.close();
}
} while (BeeLine.getMoreResults(stmnt));
@@ -792,6 +809,13 @@ private boolean execute(String line, boolean call) {
+ " " + beeLine.locElapsedTime(end - start));
}
} finally {
+ if (logThread != null) {
+ if (!logThread.isInterrupted()) {
+ logThread.interrupt();
+ }
+ logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
+ showRemainingLogsIfAny(stmnt);
+ }
if (stmnt != null) {
stmnt.close();
}
@@ -803,6 +827,61 @@ private boolean execute(String line, boolean call) {
return true;
}
+ private Runnable createLogRunnable(Statement statement) {
+ if (statement instanceof HiveStatement) {
+ final HiveStatement hiveStatement = (HiveStatement) statement;
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ while (hiveStatement.hasMoreLogs()) {
+ try {
+ // fetch the log periodically and output to beeline console
+ for (String log : hiveStatement.getQueryLog()) {
+ beeLine.info(log);
+ }
+ Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL);
+ } catch (SQLException e) {
+ beeLine.error(e);
+ return;
+ } catch (InterruptedException e) {
+ beeLine.debug("Getting log thread is interrupted, since query is done!");
+ return;
+ }
+ }
+ }
+ };
+ return runnable;
+ } else {
+ beeLine.debug("The statement instance is not HiveStatement type: " + statement.getClass());
+ return new Runnable() {
+ @Override
+ public void run() {
+ // do nothing.
+ }
+ };
+ }
+ }
+
+ private void showRemainingLogsIfAny(Statement statement) {
+ if (statement instanceof HiveStatement) {
+ HiveStatement hiveStatement = (HiveStatement) statement;
+ List logs;
+ do {
+ try {
+ logs = hiveStatement.getQueryLog();
+ } catch (SQLException e) {
+ beeLine.error(e);
+ return;
+ }
+ for (String log : logs) {
+ beeLine.info(log);
+ }
+ } while (logs.size() > 0);
+ } else {
+ beeLine.debug("The statement instance is not HiveStatement type: " + statement.getClass());
+ }
+ }
public boolean quit(String line) {
beeLine.setExit(true);
diff --git itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
index 1e66542..6561743 100644
--- itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
+++ itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
@@ -477,4 +477,31 @@ public void testEmbeddedBeelineConnection() throws Throwable{
final String EXPECTED_PATTERN = "embedded_table";
testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, true, argList);
}
+
+ /**
+ * Test Beeline could show the query progress for time-consuming query.
+ * @throws Throwable
+ */
+ @Test
+ public void testQueryProgress() throws Throwable {
+ final String TEST_NAME = "testQueryProgress";
+ final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" +
+ "select count(*) from " + tableName + ";\n";
+ final String EXPECTED_PATTERN = "Parsing command";
+ testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, true, getBaseArgs(JDBC_URL));
+ }
+
+ /**
+ * Test Beeline will hide the query progress when silent option is set.
+ * @throws Throwable
+ */
+ @Test
+ public void testQueryProgressHidden() throws Throwable {
+ final String TEST_NAME = "testQueryProgress";
+ final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" +
+ "!set silent true\n" +
+ "select count(*) from " + tableName + ";\n";
+ final String EXPECTED_PATTERN = "Parsing command";
+ testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, false, getBaseArgs(JDBC_URL));
+ }
}
diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index daf8e9e..bfa8c46 100644
--- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -2130,4 +2130,82 @@ public void testNonAsciiReturnValues() throws Exception {
}
stmt.close();
}
+
+ /**
+ * Test getting query log method in Jdbc
+ * @throws Exception
+ */
+ @Test
+ public void testGetQueryLog() throws Exception {
+ // Prepare
+ String[] expectedLogs = {
+ "Parsing command",
+ "Parse Completed",
+ "Starting Semantic Analysis",
+ "Semantic Analysis Completed",
+ "Starting command"
+ };
+ String sql = "select count(*) from " + tableName;
+
+ // Verify the fetched log (from the beginning of log file)
+ HiveStatement stmt = (HiveStatement)con.createStatement();
+ assertNotNull("Statement is null", stmt);
+ stmt.executeQuery(sql);
+ List logs = stmt.getQueryLog(false, 10000);
+ stmt.close();
+ verifyFetchedLog(logs, expectedLogs);
+
+ // Verify the fetched log (incrementally)
+ final HiveStatement statement = (HiveStatement)con.createStatement();
+ assertNotNull("Statement is null", statement);
+ statement.setFetchSize(10000);
+ final List incrementalLogs = new ArrayList();
+
+ Runnable logThread = new Runnable() {
+ @Override
+ public void run() {
+ while (statement.hasMoreLogs()) {
+ try {
+ incrementalLogs.addAll(statement.getQueryLog());
+ Thread.sleep(500);
+ } catch (SQLException e) {
+ LOG.error("Failed getQueryLog. Error message: " + e.getMessage());
+ fail("error in getting log thread");
+ } catch (InterruptedException e) {
+ LOG.error("Getting log thread is interrupted. Error message: " + e.getMessage());
+ fail("error in getting log thread");
+ }
+ }
+ }
+ };
+
+ Thread thread = new Thread(logThread);
+ thread.setDaemon(true);
+ thread.start();
+ statement.executeQuery(sql);
+ thread.interrupt();
+ thread.join(10000);
+ // fetch remaining logs
+ List remainingLogs;
+ do {
+ remainingLogs = statement.getQueryLog();
+ incrementalLogs.addAll(remainingLogs);
+ } while (remainingLogs.size() > 0);
+ statement.close();
+
+ verifyFetchedLog(incrementalLogs, expectedLogs);
+ }
+
+ private void verifyFetchedLog(List logs, String[] expectedLogs) {
+ StringBuilder stringBuilder = new StringBuilder();
+
+ for (String log : logs) {
+ stringBuilder.append(log);
+ }
+
+ String accumulatedLogs = stringBuilder.toString();
+ for (String expectedLog : expectedLogs) {
+ assertTrue(accumulatedLogs.contains(expectedLog));
+ }
+ }
}
diff --git jdbc/src/java/org/apache/hive/jdbc/ClosedOrCancelledStatementException.java jdbc/src/java/org/apache/hive/jdbc/ClosedOrCancelledStatementException.java
new file mode 100644
index 0000000..9880208
--- /dev/null
+++ jdbc/src/java/org/apache/hive/jdbc/ClosedOrCancelledStatementException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.sql.SQLException;
+
+public class ClosedOrCancelledStatementException extends SQLException{
+
+ private static final long serialVersionUID = 0;
+
+ /**
+ * @param msg (exception message)
+ */
+ public ClosedOrCancelledStatementException(String msg) {
+ super(msg);
+ }
+}
diff --git jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
index 86bc580..a7dbc82 100644
--- jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
+++ jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -75,6 +76,7 @@
private boolean fetchFirst = false;
private final TProtocolVersion protocol;
+ private ReentrantLock transportLock;
public static class Builder {
@@ -98,6 +100,7 @@
private int fetchSize = 50;
private boolean emptyResultSet = false;
private boolean isScrollable = false;
+ private ReentrantLock transportLock = null;
public Builder(Statement statement) throws SQLException {
this.statement = statement;
@@ -166,6 +169,11 @@ public Builder setScrollable(boolean setScrollable) {
return this;
}
+ public Builder setTransportLock(ReentrantLock transportLock) {
+ this.transportLock = transportLock;
+ return this;
+ }
+
public HiveQueryResultSet build() throws SQLException {
return new HiveQueryResultSet(this);
}
@@ -181,6 +189,7 @@ protected HiveQueryResultSet(Builder builder) throws SQLException {
this.stmtHandle = builder.stmtHandle;
this.sessHandle = builder.sessHandle;
this.fetchSize = builder.fetchSize;
+ this.transportLock = builder.transportLock;
columnNames = new ArrayList();
columnTypes = new ArrayList();
columnAttributes = new ArrayList();
@@ -239,7 +248,17 @@ private void retrieveSchema() throws SQLException {
try {
TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(stmtHandle);
// TODO need session handle
- TGetResultSetMetadataResp metadataResp = client.GetResultSetMetadata(metadataReq);
+ TGetResultSetMetadataResp metadataResp;
+ if (transportLock == null) {
+ metadataResp = client.GetResultSetMetadata(metadataReq);
+ } else {
+ transportLock.lock();
+ try {
+ metadataResp = client.GetResultSetMetadata(metadataReq);
+ } finally {
+ transportLock.unlock();
+ }
+ }
Utils.verifySuccess(metadataResp.getStatus());
StringBuilder namesSb = new StringBuilder();
@@ -326,7 +345,17 @@ public boolean next() throws SQLException {
if (fetchedRows == null || !fetchedRowsItr.hasNext()) {
TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
orientation, fetchSize);
- TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
+ TFetchResultsResp fetchResp;
+ if (transportLock == null) {
+ fetchResp = client.FetchResults(fetchReq);
+ } else {
+ transportLock.lock();
+ try {
+ fetchResp = client.FetchResults(fetchReq);
+ } finally {
+ transportLock.unlock();
+ }
+ }
Utils.verifySuccessWithInfo(fetchResp.getStatus());
TRowSet results = fetchResp.getResults();
diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index 2cbf58c..ef205d9 100644
--- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -23,10 +23,14 @@
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.cli.thrift.TCLIService;
import org.apache.hive.service.cli.thrift.TCancelOperationReq;
import org.apache.hive.service.cli.thrift.TCancelOperationResp;
@@ -38,6 +42,9 @@
import org.apache.hive.service.cli.thrift.TGetOperationStatusResp;
import org.apache.hive.service.cli.thrift.TOperationHandle;
import org.apache.hive.service.cli.thrift.TSessionHandle;
+import org.apache.hive.service.cli.thrift.TFetchResultsReq;
+import org.apache.hive.service.cli.thrift.TFetchResultsResp;
+import org.apache.hive.service.cli.thrift.TFetchOrientation;
/**
* HiveStatement.
@@ -77,6 +84,27 @@
*/
private boolean isClosed = false;
+ /**
+ * Keep state so we can fail certain calls made after cancel().
+ */
+ private boolean isCancelled = false;
+
+ /**
+ * Keep this state so we can know whether the query in this statement is closed.
+ */
+ private boolean isQueryClosed = false;
+
+ /**
+ * Keep this state so we can know whether the query logs are being generated in HS2.
+ */
+ private boolean isLogBeingGenerated = true;
+
+ /**
+ * Keep this state so we can know whether the statement is submitted to HS2 and start execution
+ * successfully.
+ */
+ private boolean isExecuteStatementFailed = false;
+
// A fair reentrant lock
private ReentrantLock transportLock = new ReentrantLock(true);
@@ -113,6 +141,9 @@ public void addBatch(String sql) throws SQLException {
@Override
public void cancel() throws SQLException {
checkConnection("cancel");
+ if (isCancelled) {
+ return;
+ }
transportLock.lock();
try {
@@ -128,6 +159,7 @@ public void cancel() throws SQLException {
} finally {
transportLock.unlock();
}
+ isCancelled = true;
}
/*
@@ -167,6 +199,8 @@ void closeClientOperation() throws SQLException {
} finally {
transportLock.unlock();
}
+ isQueryClosed = true;
+ isExecuteStatementFailed = false;
stmtHandle = null;
}
@@ -202,6 +236,7 @@ public boolean execute(String sql) throws SQLException {
checkConnection("execute");
closeClientOperation();
+ initFlags();
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
/**
@@ -218,9 +253,12 @@ public boolean execute(String sql) throws SQLException {
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
stmtHandle = execResp.getOperationHandle();
+ isExecuteStatementFailed = false;
} catch (SQLException eS) {
+ isExecuteStatementFailed = true;
throw eS;
} catch (Exception ex) {
+ isExecuteStatementFailed = true;
throw new SQLException(ex.toString(), "08S01", ex);
} finally {
transportLock.unlock();
@@ -266,11 +304,14 @@ public boolean execute(String sql) throws SQLException {
}
}
} catch (SQLException e) {
+ isLogBeingGenerated = false;
throw e;
} catch (Exception e) {
+ isLogBeingGenerated = false;
throw new SQLException(e.toString(), "08S01", e);
}
}
+ isLogBeingGenerated = false;
// The query should be completed by now
if (!stmtHandle.isHasResultSet()) {
@@ -278,7 +319,7 @@ public boolean execute(String sql) throws SQLException {
}
resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
.setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
- .setScrollable(isScrollableResultset)
+ .setScrollable(isScrollableResultset).setTransportLock(transportLock)
.build();
return true;
}
@@ -289,6 +330,13 @@ private void checkConnection(String action) throws SQLException {
}
}
+ private void initFlags() {
+ isCancelled = false;
+ isQueryClosed = false;
+ isLogBeingGenerated = true;
+ isExecuteStatementFailed = false;
+ }
+
/*
* (non-Javadoc)
*
@@ -713,4 +761,93 @@ public boolean isWrapperFor(Class> iface) throws SQLException {
throw new SQLException("Cannot unwrap to " + iface);
}
+ /**
+ * Check whether query execution might be producing more logs to be fetched.
+ * This method is a public API for usage outside of Hive, although it is not part of the
+ * interface java.sql.Statement.
+ * @return true if query execution might be producing more logs. It does not indicate if last
+ * log lines have been fetched by getQueryLog.
+ */
+ public boolean hasMoreLogs() {
+ return isLogBeingGenerated;
+ }
+
+ /**
+ * Get the execution logs of the given SQL statement.
+ * This method is a public API for usage outside of Hive, although it is not part of the
+ * interface java.sql.Statement.
+ * This method gets the incremental logs during SQL execution, and uses fetchSize holden by
+ * HiveStatement object.
+ * @return a list of logs. It can be empty if there are no new logs to be retrieved at that time.
+ * @throws SQLException
+ * @throws ClosedOrCancelledStatementException if statement has been cancelled or closed
+ */
+ public List getQueryLog() throws SQLException, ClosedOrCancelledStatementException {
+ return getQueryLog(true, fetchSize);
+ }
+
+ /**
+ * Get the execution logs of the given SQL statement.
+ * This method is a public API for usage outside of Hive, although it is not part of the
+ * interface java.sql.Statement.
+ * @param incremental indicate getting logs either incrementally or from the beginning,
+ * when it is true or false.
+ * @param fetchSize the number of lines to fetch
+ * @return a list of logs. It can be empty if there are no new logs to be retrieved at that time.
+ * @throws SQLException
+ * @throws ClosedOrCancelledStatementException if statement has been cancelled or closed
+ */
+ public List getQueryLog(boolean incremental, int fetchSize)
+ throws SQLException, ClosedOrCancelledStatementException {
+ checkConnection("getQueryLog");
+ if (isCancelled) {
+ throw new ClosedOrCancelledStatementException("Method getQueryLog() failed. The " +
+ "statement has been closed or cancelled.");
+ }
+
+ List logs = new ArrayList();
+ TFetchResultsResp tFetchResultsResp = null;
+ transportLock.lock();
+ try {
+ if (stmtHandle != null) {
+ TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle,
+ getFetchOrientation(incremental), fetchSize);
+ tFetchResultsReq.setFetchType((short)1);
+ tFetchResultsResp = client.FetchResults(tFetchResultsReq);
+ Utils.verifySuccessWithInfo(tFetchResultsResp.getStatus());
+ } else {
+ if (isQueryClosed) {
+ throw new ClosedOrCancelledStatementException("Method getQueryLog() failed. The " +
+ "statement has been closed or cancelled.");
+ }
+ if (isExecuteStatementFailed) {
+ throw new SQLException("The stmtHandle is null and the client.ExecuteStatement() method" +
+ " invoked in HiveStatement.execute() method has failed");
+ } else {
+ return logs;
+ }
+ }
+ } catch (SQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SQLException("Error when getting query log: " + e, e);
+ } finally {
+ transportLock.unlock();
+ }
+
+ RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),
+ connection.getProtocol());
+ for (Object[] row : rowSet) {
+ logs.add((String)row[0]);
+ }
+ return logs;
+ }
+
+ private TFetchOrientation getFetchOrientation(boolean incremental) {
+ if (incremental) {
+ return TFetchOrientation.FETCH_NEXT;
+ } else {
+ return TFetchOrientation.FETCH_FIRST;
+ }
+ }
}