diff --git beeline/pom.xml beeline/pom.xml
index 45fa02b..9a1c5dc 100644
--- beeline/pom.xml
+++ beeline/pom.xml
@@ -49,6 +49,11 @@
hive-shims
${project.version}
+
+ org.apache.hive
+ hive-jdbc
+ ${project.version}
+
commons-cli
@@ -88,12 +93,6 @@
org.apache.hive
- hive-jdbc
- ${project.version}
- test
-
-
- org.apache.hive
hive-exec
${project.version}
tests
diff --git beeline/src/java/org/apache/hive/beeline/Commands.java beeline/src/java/org/apache/hive/beeline/Commands.java
index a92d69f..85aaedf 100644
--- beeline/src/java/org/apache/hive/beeline/Commands.java
+++ beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -47,10 +47,12 @@
import java.util.TreeSet;
import org.apache.hadoop.hive.common.cli.ShellCmdExecutor;
+import org.apache.hive.jdbc.HiveStatement;
public class Commands {
private final BeeLine beeLine;
+ private static final int DEFAULT_QUERY_PROGRESS_INTERVAL = 500;
/**
* @param beeLine
@@ -758,6 +760,7 @@ private boolean execute(String line, boolean call) {
try {
Statement stmnt = null;
boolean hasResults;
+ Thread logThread = null;
try {
long start = System.currentTimeMillis();
@@ -767,7 +770,14 @@ private boolean execute(String line, boolean call) {
hasResults = ((CallableStatement) stmnt).execute();
} else {
stmnt = beeLine.createStatement();
- hasResults = stmnt.execute(sql);
+ if (beeLine.getOpts().isSilent()) {
+ hasResults = stmnt.execute(sql);
+ } else {
+ logThread = new Thread(createLogRunnable(stmnt));
+ logThread.start();
+ hasResults = stmnt.execute(sql);
+ logThread.interrupt();
+ }
}
beeLine.showWarnings();
@@ -795,6 +805,9 @@ private boolean execute(String line, boolean call) {
if (stmnt != null) {
stmnt.close();
}
+ if (logThread != null && !logThread.isInterrupted()) {
+ logThread.interrupt();
+ }
}
} catch (Exception e) {
return beeLine.error(e);
@@ -803,6 +816,56 @@ private boolean execute(String line, boolean call) {
return true;
}
+ private Runnable createLogRunnable(Statement statement) {
+ if (statement instanceof HiveStatement) {
+ final HiveStatement hiveStatement = (HiveStatement) statement;
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ long timeout = System.currentTimeMillis() + 100000;
+
+ try {
+ // wait util the query leaves IDLE state
+ while (hiveStatement.getQueryStatus() == HiveStatement.QueryStatus.IDLE) {
+ if (System.currentTimeMillis() > timeout) {
+ break;
+ }
+ Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL);
+ }
+
+ // fetch the log periodically and output to beeline console
+ while (hiveStatement.getQueryStatus() == HiveStatement.QueryStatus.STARTED) {
+ showProgress(hiveStatement.getQueryLog());
+ Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL);
+ }
+ } catch (SQLException e) {
+ beeLine.error(e);
+ return;
+ } catch (InterruptedException e) {
+ beeLine.debug("Getting log thread is interrupted, since query is done!");
+ return;
+ }
+ }
+ };
+ return runnable;
+ } else {
+ beeLine.error("The statement instance is not HiveStatement type: " + statement.getClass());
+ return new Runnable() {
+ @Override
+ public void run() {
+ // do nothing.
+ }
+ };
+ }
+ }
+
+ private void showProgress(List logs) {
+ for (String log : logs) {
+ // TODO: add a possible filter to reduce the output logs for beeline users.
+ beeLine.info(log);
+ }
+ }
public boolean quit(String line) {
beeLine.setExit(true);
diff --git itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
index e1d44ec..8aaa694 100644
--- itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
+++ itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
@@ -477,4 +477,31 @@ public void testEmbeddedBeelineConnection() throws Throwable{
final String EXPECTED_PATTERN = "embedded_table";
testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, true, argList);
}
+
+ /**
+ * Test Beeline could show the query progress for time-consuming query.
+ * @throws Throwable
+ */
+ @Test
+ public void testQueryProgress() throws Throwable {
+ final String TEST_NAME = "testQueryProgress";
+ final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" +
+ "select count(*) from " + tableName + ";\n";
+ final String EXPECTED_PATTERN = "Parsing command";
+ testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, true, getBaseArgs(JDBC_URL));
+ }
+
+ /**
+ * Test Beeline will hide the query progress when silent option is set.
+ * @throws Throwable
+ */
+ @Test
+ public void testQueryProgressHidden() throws Throwable {
+ final String TEST_NAME = "testQueryProgress";
+ final String SCRIPT_TEXT = "set hive.support.concurrency = false;\n" +
+ "!set silent true\n" +
+ "select count(*) from " + tableName + ";\n";
+ final String EXPECTED_PATTERN = "Parsing command";
+ testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, false, getBaseArgs(JDBC_URL));
+ }
}
diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index ae128a9..06250df 100644
--- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -2126,4 +2126,84 @@ public void testNonAsciiReturnValues() throws Exception {
}
stmt.close();
}
+
+ /**
+ * Test getting query log method in Jdbc
+ * @throws Exception
+ */
+ @Test
+ public void testGetQueryLog() throws Exception {
+ // Prepare
+ String[] expectedLogs = {
+ "Parsing command",
+ "Parse Completed",
+ "Starting Semantic Analysis",
+ "Semantic Analysis Completed",
+ "Starting command"
+ };
+ String sql = "select count(*) from " + tableName;
+
+ // Verify the fetched log (from the beginning of log file)
+ HiveStatement stmt = (HiveStatement)con.createStatement();
+ assertNotNull("Statement is null", stmt);
+ stmt.setFetchSize(10000);
+ stmt.executeQuery(sql);
+ List logs = stmt.getQueryLog(false);
+ stmt.close();
+ verifyFetchedLog(logs, expectedLogs);
+
+ // Verify the fetched log (incrementally)
+ final HiveStatement statement = (HiveStatement)con.createStatement();
+ assertNotNull("Statement is null", statement);
+ statement.setFetchSize(10000);
+ final List incrementalLogs = new ArrayList();
+
+ Runnable logThread = new Runnable() {
+ @Override
+ public void run() {
+ long timeout = System.currentTimeMillis() + 100000;
+ try {
+ while (statement.getQueryStatus() == HiveStatement.QueryStatus.IDLE) {
+ if (System.currentTimeMillis() > timeout) {
+ break;
+ }
+ Thread.sleep(500);
+ }
+
+ while (statement.getQueryStatus() == HiveStatement.QueryStatus.STARTED) {
+ incrementalLogs.addAll(statement.getQueryLog());
+ Thread.sleep(500);
+ }
+
+ incrementalLogs.addAll(statement.getQueryLog());
+ } catch (SQLException e) {
+ LOG.error("Failed getQueryLog. Error message: " + e.getMessage());
+ fail("error in getting log thread");
+ } catch (InterruptedException e) {
+ LOG.error("Getting log thread is interrupted. Error message: " + e.getMessage());
+ fail("error in getting log thread");
+ }
+ }
+ };
+
+ Thread thread = new Thread(logThread);
+ thread.start();
+ statement.executeQuery(sql);
+ thread.join(100000);
+ statement.close();
+ verifyFetchedLog(incrementalLogs, expectedLogs);
+ }
+
+ private void verifyFetchedLog(List logs, String[] expectedLogs) {
+ StringBuilder stringBuilder = new StringBuilder();
+
+ for (String log : logs) {
+ stringBuilder.append(log);
+ }
+
+ String accumulatedLogs = stringBuilder.toString();
+ for (String expectedLog : expectedLogs) {
+ assertTrue(accumulatedLogs.contains(expectedLog));
+ }
+ }
}
diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index 2cbf58c..1aaccbe 100644
--- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -23,21 +23,16 @@
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hive.service.cli.thrift.TCLIService;
-import org.apache.hive.service.cli.thrift.TCancelOperationReq;
-import org.apache.hive.service.cli.thrift.TCancelOperationResp;
-import org.apache.hive.service.cli.thrift.TCloseOperationReq;
-import org.apache.hive.service.cli.thrift.TCloseOperationResp;
-import org.apache.hive.service.cli.thrift.TExecuteStatementReq;
-import org.apache.hive.service.cli.thrift.TExecuteStatementResp;
-import org.apache.hive.service.cli.thrift.TGetOperationStatusReq;
-import org.apache.hive.service.cli.thrift.TGetOperationStatusResp;
-import org.apache.hive.service.cli.thrift.TOperationHandle;
-import org.apache.hive.service.cli.thrift.TSessionHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.RowSetFactory;
+import org.apache.hive.service.cli.thrift.*;
+import sun.net.www.content.image.x_xbitmap;
/**
* HiveStatement.
@@ -80,6 +75,23 @@
// A fair reentrant lock
private ReentrantLock transportLock = new ReentrantLock(true);
+ /**
+ * The given SQL execution status in the statement. Since the SQL execution in HIVE always
+ * takes long time and is blocking, JDBC user could get the query status in another thread.
+ * It is also used to sync up execute() and getQueryLog() method.
+ *
+ */
+ private volatile QueryStatus queryStatus = QueryStatus.IDLE;
+
+ public enum QueryStatus {
+ IDLE,
+ STARTED,
+ START_ERROR,
+ FINISHED,
+ FINISH_ERROR,
+ CANCELED
+ }
+
public HiveStatement(HiveConnection connection, TCLIService.Iface client,
TSessionHandle sessHandle) {
this(connection, client, sessHandle, false);
@@ -121,6 +133,7 @@ public void cancel() throws SQLException {
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
Utils.verifySuccessWithInfo(cancelResp.getStatus());
}
+ setQueryStatus(QueryStatus.CANCELED);
} catch (SQLException e) {
throw e;
} catch (Exception e) {
@@ -160,6 +173,7 @@ void closeClientOperation() throws SQLException {
TCloseOperationResp closeResp = client.CloseOperation(closeReq);
Utils.verifySuccessWithInfo(closeResp.getStatus());
}
+ setQueryStatus(QueryStatus.IDLE);
} catch (SQLException e) {
throw e;
} catch (Exception e) {
@@ -218,9 +232,12 @@ public boolean execute(String sql) throws SQLException {
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
stmtHandle = execResp.getOperationHandle();
+ setQueryStatus(QueryStatus.STARTED);
} catch (SQLException eS) {
+ setQueryStatus(QueryStatus.START_ERROR);
throw eS;
} catch (Exception ex) {
+ setQueryStatus(QueryStatus.START_ERROR);
throw new SQLException(ex.toString(), "08S01", ex);
} finally {
transportLock.unlock();
@@ -266,20 +283,28 @@ public boolean execute(String sql) throws SQLException {
}
}
} catch (SQLException e) {
+ setQueryStatus(QueryStatus.FINISH_ERROR);
throw e;
} catch (Exception e) {
+ setQueryStatus(QueryStatus.FINISH_ERROR);
throw new SQLException(e.toString(), "08S01", e);
}
}
// The query should be completed by now
+ setQueryStatus(QueryStatus.FINISHED);
if (!stmtHandle.isHasResultSet()) {
return false;
}
- resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
- .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
- .setScrollable(isScrollableResultset)
- .build();
+
+ transportLock.lock();
+ try {
+ resultSet = new HiveQueryResultSet.Builder(this).setClient(client)
+ .setSessionHandle(sessHandle).setStmtHandle(stmtHandle).setMaxRows(maxRows)
+ .setFetchSize(fetchSize).setScrollable(isScrollableResultset).build();
+ } finally {
+ transportLock.unlock();
+ }
return true;
}
@@ -713,4 +738,76 @@ public boolean isWrapperFor(Class> iface) throws SQLException {
throw new SQLException("Cannot unwrap to " + iface);
}
+ public synchronized QueryStatus getQueryStatus() {
+ return queryStatus;
+ }
+
+ private synchronized void setQueryStatus(QueryStatus queryStatus) {
+ this.queryStatus = queryStatus;
+ }
+
+ /**
+ * Get the execution logs of the given SQL statement.
+ * This method get the incremental logs during SQL execution.
+ * @return a list of logs
+ * @throws SQLException
+ */
+ public List getQueryLog() throws SQLException {
+ return getQueryLog(true);
+ }
+
+ /**
+ * Get the execution logs of the given SQL statement.
+ * @param incremental indicate getting logs either incrementally or from the beginning,
+ * when it is true or false.
+ * @return a list of logs
+ * @throws SQLException
+ */
+ public List getQueryLog(boolean incremental) throws SQLException {
+ checkConnection("getQueryLog");
+ checkQueryStatus("getQueryLog");
+
+ List logs = new ArrayList();
+ TFetchResultsResp tFetchResultsResp = null;
+ transportLock.lock();
+ try {
+ if (stmtHandle != null) {
+ TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle,
+ getFetchOrientation(incremental), fetchSize);
+ tFetchResultsReq.setFetchType((short)1);
+ tFetchResultsResp = client.FetchResults(tFetchResultsReq);
+ Utils.verifySuccessWithInfo(tFetchResultsResp.getStatus());
+ } else {
+ throw new SQLException("The stmtHandle is null. The statement might be not executed or " +
+ "already be closed.");
+ }
+ } catch (SQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SQLException("Error when getting query log", e);
+ } finally {
+ transportLock.unlock();
+ }
+
+ RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),
+ connection.getProtocol());
+ for (Object[] row : rowSet) {
+ logs.add((String)row[0]);
+ }
+ return logs;
+ }
+
+ private TFetchOrientation getFetchOrientation(boolean incremental) {
+ if (incremental) {
+ return TFetchOrientation.FETCH_NEXT;
+ } else {
+ return TFetchOrientation.FETCH_FIRST;
+ }
+ }
+
+ private void checkQueryStatus(String action) throws SQLException{
+ if (queryStatus == QueryStatus.IDLE) {
+ throw new SQLException("Cannot " + action + " when query is in IDLE status");
+ }
+ }
}