diff --git jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java index c28b7d6..8d46668 100644 --- jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java @@ -116,8 +116,7 @@ public ResultSet executeQuery() throws SQLException { */ public int executeUpdate() throws SQLException { - super.executeUpdate(updateSql(sql, parameters)); - return 0; + return super.executeUpdate(updateSql(sql, parameters)); } /** diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 180f99e8..1308887 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -390,7 +390,7 @@ public ResultSet executeQuery(String sql) throws SQLException { @Override public int executeUpdate(String sql) throws SQLException { execute(sql); - return 0; + return (int) getAffectedRowCount(); } /* @@ -838,6 +838,52 @@ public boolean hasMoreLogs() { return logs; } + /** + * Get the affected row count of the given SQL statement. + */ + private long getAffectedRowCount() + throws SQLException { + checkConnection("getAffectedRowCount"); + if (isCancelled) { + throw new ClosedOrCancelledStatementException("Method getAffectedRowCount() failed. The " + + "statement has been closed or cancelled."); + } + + long affectedRows = 0l; + TFetchResultsResp tFetchResultsResp = null; + try { + if (stmtHandle != null) { + TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle, + getFetchOrientation(false), 1); + tFetchResultsReq.setFetchType((short)2); + tFetchResultsResp = client.FetchResults(tFetchResultsReq); + Utils.verifySuccessWithInfo(tFetchResultsResp.getStatus()); + } else { + if (isQueryClosed) { + throw new ClosedOrCancelledStatementException("Method getAffectedRowCount() failed. The " + + "statement has been closed or cancelled."); + } + if (isExecuteStatementFailed) { + throw new SQLException("Method getAffectedRowCount() failed. Because the stmtHandle in " + + "HiveStatement is null and the statement execution might fail."); + } else { + return affectedRows; + } + } + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new SQLException("Error when getting affected row count: " + e, e); + } + + RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), + connection.getProtocol()); + for (Object[] row : rowSet) { + affectedRows = Long.valueOf(String.valueOf(row[0])); + } + return affectedRows; + } + private TFetchOrientation getFetchOrientation(boolean incremental) { if (incremental) { return TFetchOrientation.FETCH_NEXT; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 7459bba..9bc9bb0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -437,6 +437,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { } statsMap.put(Counter.RECORDS_OUT + "_" + suffix, row_count); + statsMap.put(Counter.RECORDS_OUT.toString(), row_count); } catch (HiveException e) { throw e; } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 5f35630..a3e9038 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -30,17 +30,15 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; -import org.apache.hadoop.hive.ql.exec.Heartbeater; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskHandle; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher; @@ -393,6 +391,7 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(), getId(), ctrs); } success = rj.isSuccessful(); + updateOperationMetrics(ctrs); } } @@ -421,6 +420,19 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { return mapRedStats; } + private void updateOperationMetrics(Counters ctrs) { + // check for number of created files + Counters.Group group = ctrs.getGroup(HiveConf.getVar(job, ConfVars.HIVECOUNTERGROUP)); + for (Counter counter : group) { + if(FileSinkOperator.Counter.RECORDS_OUT.toString().equalsIgnoreCase(counter.getDisplayName())){ + if (OperationMetrics.getCurrentOperationMetrics() != null) { + OperationMetrics.getCurrentOperationMetrics().setAffectRowCount(counter.getCounter()); + } + break; + } + } + } + private String getId() { return this.task.getId(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 336d490..e1d283e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.StatsFactory; @@ -116,7 +117,7 @@ public int execute(DriverContext driverContext) { sparkCounters = sparkJobStatus.getCounter(); // for RSC, we should get the counters after job has finished SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); - if (LOG.isInfoEnabled() && sparkStatistics != null) { + if (sparkStatistics != null) { LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); logSparkStatistic(sparkStatistics); } @@ -164,6 +165,11 @@ private void logSparkStatistic(SparkStatistics sparkStatistic) { while (statisticIterator.hasNext()) { SparkStatistic statistic = statisticIterator.next(); LOG.info("\t" + statistic.getName() + ": " + statistic.getValue()); + if (FileSinkOperator.Counter.RECORDS_OUT.toString().equalsIgnoreCase(statistic.getName())) { + if (OperationMetrics.getCurrentOperationMetrics() != null) { + OperationMetrics.getCurrentOperationMetrics().setAffectRowCount(Long.valueOf(statistic.getValue())); + } + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/OperationMetrics.java ql/src/java/org/apache/hadoop/hive/ql/session/OperationMetrics.java index e69de29..1a26c21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/OperationMetrics.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/OperationMetrics.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hive.ql.session; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Metrics for operation ,affect row count ,for example + */ +public class OperationMetrics { + private static final ThreadLocal THREAD_LOCAL_OPERATION_METRICS = new ThreadLocal(){ + @Override + protected synchronized OperationLog initialValue() { + return null; + } + }; + private long affectRowCount; + + public long getAffectRowCount() { + return affectRowCount; + } + + public void setAffectRowCount(long affectRowCount) { + this.affectRowCount = affectRowCount; + } + + public static void setCurrentOperationMetrics(OperationMetrics operationMetrics) { + THREAD_LOCAL_OPERATION_METRICS.set(operationMetrics); + } + + public static void removeCurrentOperationMetrics() { + THREAD_LOCAL_OPERATION_METRICS.remove(); + } + + public static OperationMetrics getCurrentOperationMetrics() { + return THREAD_LOCAL_OPERATION_METRICS.get(); + } +} diff --git service/src/java/org/apache/hive/service/cli/FetchType.java service/src/java/org/apache/hive/service/cli/FetchType.java index a8e7fe1..ff77bff 100644 --- service/src/java/org/apache/hive/service/cli/FetchType.java +++ service/src/java/org/apache/hive/service/cli/FetchType.java @@ -24,7 +24,8 @@ */ public enum FetchType { QUERY_OUTPUT((short)0), - LOG((short)1); + LOG((short)1), + ROW_COUNT((short)2); private final short tFetchType; diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 25cefc2..fef6c39 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -71,6 +72,7 @@ protected OperationLog operationLog; protected boolean isOperationLogEnabled; protected Map confOverlay = new HashMap(); + protected final OperationMetrics operationMetrics = new OperationMetrics(); private long operationTimeout; private long lastAccessTime; @@ -256,6 +258,10 @@ protected void unregisterOperationLog() { } } + protected void unregisterOperationMetrics() { + OperationMetrics.removeCurrentOperationMetrics(); + } + /** * Invoked before runInternal(). * Set up some preconditions, or configurations. @@ -407,4 +413,8 @@ protected void setMetrics(OperationState state) { } } } + + public OperationMetrics getOperationMetrics() { + return operationMetrics; + } } diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index b0bd351..b373f50 100644 --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hive.service.AbstractService; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -274,6 +275,22 @@ public RowSet getOperationLogRowSet(OperationHandle opHandle, return rowSet; } + public RowSet getOperationMetricsRowSet(OperationHandle opHandle, HiveConf hConf) + throws HiveSQLException { + TableSchema tableSchema = new TableSchema(getRowCountSchema()); + RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion()); + + // get the OperationMetrics object from the operation + OperationMetrics operationMetrics = getOperation(opHandle).getOperationMetrics(); + if (operationMetrics == null) { + throw new HiveSQLException("Couldn't find OperationMetrics associated with operation handle: " + opHandle); + } + // convert row count to RowSet + rowSet.addRow(new String[] {String.valueOf(operationMetrics.getAffectRowCount())}); + + return rowSet; + } + private boolean isFetchFirst(FetchOrientation fetchOrientation) { //TODO: Since OperationLog is moved to package o.a.h.h.ql.session, // we may add a Enum there and map FetchOrientation to it. @@ -292,6 +309,15 @@ private Schema getLogSchema() { return schema; } + private Schema getRowCountSchema() { + Schema schema = new Schema(); + FieldSchema fieldSchema = new FieldSchema(); + fieldSchema.setName("affect_row_count"); + fieldSchema.setType("string"); + schema.addToFieldSchemas(fieldSchema); + return schema; + } + public OperationLog getOperationLogByThread() { return OperationLog.getCurrentOperationLog(); } diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 1331a99..4c6ba16 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.OperationMetrics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDe; @@ -213,6 +214,8 @@ public void runInternal() throws HiveSQLException { // Current UGI will get used by metastore when metsatore is in embedded mode // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); + + final OperationMetrics operationMetrics = getOperationMetrics(); // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { @@ -226,7 +229,8 @@ public Object run() throws HiveSQLException { // Set current OperationLog in this async thread for keeping on saving query log. registerCurrentOperationLog(); registerLoggingContext(); - try { + registerCurrentOperationMetrics(); + try { runQuery(opConfig); } catch (HiveSQLException e) { setOperationException(e); @@ -234,6 +238,7 @@ public Object run() throws HiveSQLException { } finally { unregisterLoggingContext(); unregisterOperationLog(); + unregisterOperationMetrics(); } return null; } @@ -298,6 +303,10 @@ private void registerCurrentOperationLog() { } } + private void registerCurrentOperationMetrics() { + OperationMetrics.setCurrentOperationMetrics(operationMetrics); + } + private void cleanup(OperationState state) throws HiveSQLException { setState(state); if (shouldRunAsync()) { diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index a14908b..9b41b77 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -771,10 +771,16 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio long maxRows, FetchType fetchType) throws HiveSQLException { acquire(true); try { - if (fetchType == FetchType.QUERY_OUTPUT) { - return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows); + switch (fetchType) + { + case QUERY_OUTPUT: + return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows); + case ROW_COUNT: + return operationManager.getOperationMetricsRowSet(opHandle, hiveConf); + case LOG: + default: + return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, hiveConf); } - return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, hiveConf); } finally { release(true); }