diff --git jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java index 705a32a..a04689a 100644 --- jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java @@ -116,8 +116,7 @@ public ResultSet executeQuery() throws SQLException { */ public int executeUpdate() throws SQLException { - super.executeUpdate(updateSql(sql, parameters)); - return 0; + return super.executeUpdate(updateSql(sql, parameters)); } /** diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 38ccc78..11b2a9d 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -446,7 +446,7 @@ public ResultSet executeQuery(String sql) throws SQLException { @Override public int executeUpdate(String sql) throws SQLException { execute(sql); - return 0; + return (int) getAffectedRowCount(); } /* @@ -903,6 +903,51 @@ public boolean hasMoreLogs() { return logs; } + /** + * Get the affected row count of the given SQL statement. + */ + private long getAffectedRowCount() + throws SQLException { + checkConnection("getAffectedRowCount"); + if (isCancelled) { + throw new ClosedOrCancelledStatementException("Method getAffectedRowCount() failed. The " + + "statement has been closed or cancelled."); + } + + long affectedRows = 0l; + TFetchResultsResp tFetchResultsResp = null; + try { + if (stmtHandle != null) { + TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle, + getFetchOrientation(false), 1); + tFetchResultsReq.setFetchType((short)2); + tFetchResultsResp = client.FetchResults(tFetchResultsReq); + Utils.verifySuccessWithInfo(tFetchResultsResp.getStatus()); + } else { + if (isQueryClosed) { + throw new ClosedOrCancelledStatementException("Method getAffectedRowCount() failed. The " + + "statement has been closed or cancelled."); + } + if (isExecuteStatementFailed) { + throw new SQLException("Method getAffectedRowCount() failed. Because the stmtHandle in " + + "HiveStatement is null and the statement execution might fail."); + } else { + return affectedRows; + } + } + RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), + connection.getProtocol()); + for (Object[] row : rowSet) { + affectedRows = Long.valueOf(String.valueOf(row[0])); + } + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new SQLException("Error when getting affected row count: " + e, e); + } + return affectedRows; + } + private TFetchOrientation getFetchOrientation(boolean incremental) { if (incremental) { return TFetchOrientation.FETCH_NEXT; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 3ec63ee..0840269 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -442,6 +442,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); statsMap.put(getCounterName(Counter.RECORDS_OUT), row_count); + statsMap.put(Counter.RECORDS_OUT.toString(), row_count); } catch (HiveException e) { throw e; } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 5656f9a..1205e08 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.MapRedStats; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher; @@ -400,6 +402,7 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockEx ss.getHiveHistory().setTaskCounters(queryState.getQueryId(), getId(), ctrs); } success = rj.isSuccessful(); + updateOperationMetrics(ctrs); } } @@ -428,6 +431,19 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockEx return mapRedStats; } + private void updateOperationMetrics(Counters ctrs) { + // check for number of created files + Counters.Group group = ctrs.getGroup(HiveConf.getVar(job, ConfVars.HIVECOUNTERGROUP)); + for (Counter counter : group) { + if(FileSinkOperator.Counter.RECORDS_OUT.toString().equalsIgnoreCase(counter.getDisplayName())){ + if (OperationMetrics.getCurrentOperationMetrics() != null) { + OperationMetrics.getCurrentOperationMetrics().setAffectRowCount(counter.getCounter()); + } + break; + } + } + } + private String getId() { return this.task.getId(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 0b494aa..22604dd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.util.StringUtils; @@ -100,7 +101,7 @@ public int execute(DriverContext driverContext) { SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); if (rc == 0) { SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); - if (LOG.isInfoEnabled() && sparkStatistics != null) { + if (sparkStatistics != null) { LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); logSparkStatistic(sparkStatistics); } @@ -151,6 +152,11 @@ private void logSparkStatistic(SparkStatistics sparkStatistic) { while (statisticIterator.hasNext()) { SparkStatistic statistic = statisticIterator.next(); LOG.info("\t" + statistic.getName() + ": " + statistic.getValue()); + if (FileSinkOperator.Counter.RECORDS_OUT.toString().equalsIgnoreCase(statistic.getName())) { + if (OperationMetrics.getCurrentOperationMetrics() != null) { + OperationMetrics.getCurrentOperationMetrics().setAffectRowCount(Long.valueOf(statistic.getValue())); + } + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 9e114c0..f75b67a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -53,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; @@ -203,13 +205,23 @@ public int execute(DriverContext driverContext) { } TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode()); - if (LOG.isInfoEnabled() && counters != null - && (conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || - Utilities.isPerfOrAboveLogging(conf))) { + if (counters != null) { + boolean isLog = (LOG.isInfoEnabled() && (HiveConf.getBoolVar(conf, + HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || Utilities.isPerfOrAboveLogging(conf))); for (CounterGroup group: counters) { - LOG.info(group.getDisplayName() +":"); + if (isLog) { + LOG.info(group.getDisplayName() +":"); + } for (TezCounter counter: group) { - LOG.info(" "+counter.getDisplayName()+": "+counter.getValue()); + if (isLog) { + LOG.info(" "+counter.getDisplayName()+": "+counter.getValue()); + } + if (FileSinkOperator.Counter.RECORDS_OUT.toString().equalsIgnoreCase( + counter.getDisplayName())) { + if (OperationMetrics.getCurrentOperationMetrics() != null) { + OperationMetrics.getCurrentOperationMetrics().setAffectRowCount(counter.getValue()); + } + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/OperationMetrics.java ql/src/java/org/apache/hadoop/hive/ql/session/OperationMetrics.java new file mode 100644 index 0000000..df036cf --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/session/OperationMetrics.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hive.ql.session; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Metrics for operation ,affect row count ,for example + */ +public class OperationMetrics { + private static final ThreadLocal THREAD_LOCAL_OPERATION_METRICS = new ThreadLocal(){ + @Override + protected synchronized OperationLog initialValue() { + return null; + } + }; + private long affectRowCount; + + public long getAffectRowCount() { + return affectRowCount; + } + + public void setAffectRowCount(long affectRowCount) { + this.affectRowCount = affectRowCount; + } + + public static void setCurrentOperationMetrics(OperationMetrics operationMetrics) { + THREAD_LOCAL_OPERATION_METRICS.set(operationMetrics); + } + + public static void removeCurrentOperationMetrics() { + THREAD_LOCAL_OPERATION_METRICS.remove(); + } + + public static OperationMetrics getCurrentOperationMetrics() { + return THREAD_LOCAL_OPERATION_METRICS.get(); + } +} diff --git service/src/java/org/apache/hive/service/cli/FetchType.java service/src/java/org/apache/hive/service/cli/FetchType.java index a8e7fe1..84facbe 100644 --- service/src/java/org/apache/hive/service/cli/FetchType.java +++ service/src/java/org/apache/hive/service/cli/FetchType.java @@ -24,7 +24,8 @@ */ public enum FetchType { QUERY_OUTPUT((short)0), - LOG((short)1); + LOG((short)1), + ROW_COUNT((short)2); private final short tFetchType; diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 0932884..56b9602 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -72,6 +73,7 @@ protected OperationLog operationLog; protected boolean isOperationLogEnabled; protected Map confOverlay = new HashMap(); + protected OperationMetrics operationMetrics = new OperationMetrics(); private long operationTimeout; private volatile long lastAccessTime; @@ -271,6 +273,10 @@ protected void unregisterOperationLog() { } } + protected void unregisterOperationMetrics() { + OperationMetrics.removeCurrentOperationMetrics(); + } + /** * Invoked before runInternal(). * Set up some preconditions, or configurations. @@ -383,6 +389,10 @@ protected void validateFetchOrientation(FetchOrientation orientation, } } + public OperationMetrics getOperationMetrics() { + return operationMetrics; + } + protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) { HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(), response.getSQLState(), response.getResponseCode()); diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 2f18231..be21331 100644 --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hive.service.AbstractService; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -330,6 +331,23 @@ public RowSet getOperationLogRowSet(OperationHandle opHandle, FetchOrientation o return rowSet; } + public RowSet getOperationMetricsRowSet(OperationHandle opHandle, HiveConf hConf) + throws HiveSQLException { + TableSchema tableSchema = new TableSchema(getRowCountSchema()); + RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion(), + false); + + // get the OperationMetrics object from the operation + OperationMetrics operationMetrics = getOperation(opHandle).getOperationMetrics(); + if (operationMetrics == null) { + throw new HiveSQLException("Couldn't find OperationMetrics associated with operation handle: " + opHandle); + } + // convert row count to RowSet + rowSet.addRow(new String[] {String.valueOf(operationMetrics.getAffectRowCount())}); + + return rowSet; + } + private boolean isFetchFirst(FetchOrientation fetchOrientation) { //TODO: Since OperationLog is moved to package o.a.h.h.ql.session, // we may add a Enum there and map FetchOrientation to it. @@ -348,6 +366,15 @@ private Schema getLogSchema() { return schema; } + private Schema getRowCountSchema() { + Schema schema = new Schema(); + FieldSchema fieldSchema = new FieldSchema(); + fieldSchema.setName("affect_row_count"); + fieldSchema.setType("string"); + schema.addToFieldSchemas(fieldSchema); + return schema; + } + public Collection getOperations() { return Collections.unmodifiableCollection(handleToOperation.values()); } diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 67e0e52..7c2242d 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDe; @@ -273,6 +274,8 @@ public void runInternal() throws HiveSQLException { // Current UGI will get used by metastore when metsatore is in embedded mode // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(); + + final OperationMetrics operationMetrics = getOperationMetrics(); // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { @@ -285,6 +288,7 @@ public Object run() throws HiveSQLException { SessionState.setCurrentSessionState(parentSessionState); // Set current OperationLog in this async thread for keeping on saving query log. registerCurrentOperationLog(); + registerCurrentOperationMetrics(); registerLoggingContext(); try { runQuery(); @@ -294,6 +298,7 @@ public Object run() throws HiveSQLException { } finally { unregisterLoggingContext(); unregisterOperationLog(); + unregisterOperationMetrics(); } return null; } @@ -358,6 +363,10 @@ private void registerCurrentOperationLog() { } } + private void registerCurrentOperationMetrics() { + OperationMetrics.setCurrentOperationMetrics(operationMetrics); + } + private synchronized void cleanup(OperationState state) throws HiveSQLException { setState(state); if (driver != null) { diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index a0015eb..6ac163c 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -38,8 +37,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -785,10 +782,16 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio long maxRows, FetchType fetchType) throws HiveSQLException { acquire(true); try { - if (fetchType == FetchType.QUERY_OUTPUT) { - return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows); + switch (fetchType) + { + case QUERY_OUTPUT: + return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows); + case ROW_COUNT: + return operationManager.getOperationMetricsRowSet(opHandle, sessionConf); + case LOG: + default: + return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, sessionConf); } - return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, sessionConf); } finally { release(true); }