commit a12977c91875b47352e6cd66820f14809a3c17d7 Author: Bharath Krishna Date: Thu Jul 19 15:51:12 2018 -0700 HIVE-19766 : Show the number of rows inserted when execution engine is Spark diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index ad5049a3e95211e54df0650361bea92bc6304a85..92775107bc0f3d48230fe98b4aad2b29f92520a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.exec.spark.status.impl.SparkMetricsUtils; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; +import org.apache.hive.spark.counter.SparkCounter; +import org.apache.hive.spark.counter.SparkCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,6 +165,17 @@ public int execute(DriverContext driverContext) { if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); + if (SessionState.get() != null) { + //Set the number of rows written in case of insert queries, to print in the client(beeline). + SparkCounters counters = sparkJobStatus.getCounter(); + if (counters != null) { + SparkCounter counter = counters.getCounter(HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP), + FileSinkOperator.TOTAL_TABLE_ROWS_WRITTEN); + if (counter != null) { + queryState.setNumModifiedRows(counter.getValue()); + } + } + } printConsoleMetrics(); printExcessiveGCWarning(); if (LOG.isInfoEnabled() && sparkStatistics != null) { @@ -500,6 +513,7 @@ private void printConfigInfo() throws IOException { List hiveCounters = new LinkedList(); counters.put(groupName, hiveCounters); hiveCounters.add(Operator.HIVE_COUNTER_CREATED_FILES); + hiveCounters.add(FileSinkOperator.TOTAL_TABLE_ROWS_WRITTEN); // MapOperator is out of SparkWork, SparkMapRecordHandler use it to bridge // Spark transformation and Hive operators in SparkWork. for (MapOperator.Counter counter : MapOperator.Counter.values()) {