diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 4a8ca47..acdc438 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -1135,7 +1135,9 @@ private void publishStats() throws HiveException { String postfix=null; if (taskIndependent) { // key = "database.table/SP/DP/"LB/ - prefix = conf.getTableInfo().getTableName(); + // Hive ignore table name character case, so we need to take lowercase table name as stat key here, + // as StatsTask would use lowercase to aggregate later. + prefix = conf.getTableInfo().getTableName().toLowerCase(); } else { // key = "prefix/SP/DP/"LB/taskID/ prefix = conf.getStatsAggPrefix(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index b38ef7d..04323bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -19,15 +19,21 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedList; import java.util.List; import com.google.common.collect.Lists; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; @@ -37,12 +43,19 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; @@ -78,10 +91,14 @@ public int execute(DriverContext driverContext) { sparkSession = sparkSessionManager.getSession(sparkSession, conf, true); SessionState.get().setSparkSession(sparkSession); SparkWork sparkWork = getWork(); + + // We need to pre register spark counters for table statistic collection related query. String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); - if (statsImpl.equalsIgnoreCase("counter")) { - sparkWork.setRequiredCounterPrefix(SparkUtilities.getRequiredCounterPrefix(this, db)); + StatsTask statsTask = getStatsTaskInChildTasks(this); + if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) { + sparkWork.setRequiredCounterPrefix(getRequiredCounterPrefix(statsTask)); } + SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); sparkCounters = jobRef.getSparkJobStatus().getCounter(); SparkJobMonitor monitor = new SparkJobMonitor(jobRef.getSparkJobStatus()); @@ -183,4 +200,106 @@ private void printConfigInfo() throws IOException { console.printInfo("In order to set a constant number of reducers:"); console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "="); } + + private List getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException { + List prefixs = new LinkedList(); + StatsWork statsWork = statsTask.getWork(); + String prefix = getPrefix(statsWork); + List partitions = getPartitionsList(statsWork); + int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf); + + if (partitions == null) { + prefixs.add(Utilities.getHashedStatsPrefix(prefix, maxPrefixLength)); + } else { + for (Partition partition : partitions) { + String prefixWithPartition = Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec())); + prefixs.add(Utilities.getHashedStatsPrefix(prefixWithPartition, maxPrefixLength)); + } + } + + return prefixs; + } + + private String getPrefix(StatsWork work) throws HiveException { + String tableName; + if (work.getLoadTableDesc() != null) { + tableName = work.getLoadTableDesc().getTable().getTableName(); + } else if (work.getTableSpecs() != null) { + tableName = work.getTableSpecs().tableName; + } else { + tableName = work.getLoadFileDesc().getDestinationCreateTable(); + } + Table table = null; + try { + table = db.getTable(tableName); + } catch (HiveException e) { + LOG.warn("Failed to get table:" + tableName); + // For CTAS query, table does not exist in this period, just use table name as prefix. + return tableName.toLowerCase(); + } + return table.getDbName() + "." + table.getTableName(); + } + + private static StatsTask getStatsTaskInChildTasks(Task rootTask) { + + List> childTasks = rootTask.getChildTasks(); + if (childTasks == null) { + return null; + } + for (Task task : childTasks) { + if (task instanceof StatsTask) { + return (StatsTask)task; + } else { + Task childTask = getStatsTaskInChildTasks(task); + if (childTask instanceof StatsTask) { + return (StatsTask)childTask; + } else { + continue; + } + } + } + + return null; + } + + private List getPartitionsList(StatsWork work) throws HiveException { + if (work.getLoadFileDesc() != null) { + return null; //we are in CTAS, so we know there are no partitions + } + Table table; + List list = new ArrayList(); + + if (work.getTableSpecs() != null) { + + // ANALYZE command + BaseSemanticAnalyzer.tableSpec tblSpec = work.getTableSpecs(); + table = tblSpec.tableHandle; + if (!table.isPartitioned()) { + return null; + } + // get all partitions that matches with the partition spec + List partitions = tblSpec.partitions; + if (partitions != null) { + for (Partition partn : partitions) { + list.add(partn); + } + } + } else if (work.getLoadTableDesc() != null) { + + // INSERT OVERWRITE command + LoadTableDesc tbd = work.getLoadTableDesc(); + table = db.getTable(tbd.getTable().getTableName()); + if (!table.isPartitioned()) { + return null; + } + DynamicPartitionCtx dpCtx = tbd.getDPCtx(); + if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions + // we could not get dynamic partition information before SparkTask execution. + } else { // static partition + Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); + list.add(partn); + } + } + return list; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index cc8afa7..e3e6d16 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -17,35 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.MoveTask; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.StatsTask; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveKey; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; -import org.apache.hadoop.hive.ql.plan.LoadTableDesc; -import org.apache.hadoop.hive.ql.plan.MoveWork; -import org.apache.hadoop.hive.ql.plan.StatsWork; -import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.io.BytesWritable; /** @@ -69,109 +41,4 @@ public static BytesWritable copyBytesWritable(BytesWritable bw) { copy.set(bw); return copy; } - - public static List getRequiredCounterPrefix(SparkTask sparkTask, Hive db) - throws HiveException, MetaException { - - List prefixs = new LinkedList(); - List works = sparkTask.getWork().getAllWork(); - for (BaseWork baseWork : works) { - Set> operators = baseWork.getAllOperators(); - for (Operator operator : operators) { - if (operator instanceof TableScanOperator) { - TableScanOperator tableScanOperator = (TableScanOperator) operator; - TableScanDesc tableScanDesc = tableScanOperator.getConf(); - - if (tableScanDesc.isGatherStats()) { - List> childTasks = getChildTasks(sparkTask); - for (Task task : childTasks) { - if (task instanceof StatsTask) { - StatsTask statsTask = (StatsTask) task; - StatsWork statsWork = statsTask.getWork(); - // ANALYZE command - BaseSemanticAnalyzer.tableSpec tblSpec = statsWork.getTableSpecs(); - Table table = tblSpec.tableHandle; - if (!table.isPartitioned()) { - prefixs.add(tableScanDesc.getStatsAggPrefix()); // non-partitioned - } else { - for (Partition partition : tblSpec.partitions) { - String aggrPrefix = getAggregationPrefix( - table, partition.getSpec(), tableScanDesc.getMaxStatsKeyPrefixLength()); - prefixs.add(aggrPrefix); - } - } - } - } - } - } else if (operator instanceof FileSinkOperator) { - FileSinkOperator fileSinkOperator = (FileSinkOperator) operator; - FileSinkDesc fileSinkDesc = fileSinkOperator.getConf(); - - if (fileSinkDesc.isGatherStats()) { - List> childTasks = getChildTasks(sparkTask); - for (Task task : childTasks) { - if (task instanceof MoveTask) { - MoveTask moveTask = (MoveTask) task; - MoveWork moveWork = moveTask.getWork(); - - // INSERT OVERWRITE command - LoadTableDesc tbd = moveWork.getLoadTableWork(); - Table table = db.getTable(tbd.getTable().getTableName()); - if (!table.isPartitioned()) { - prefixs.add( - getAggregationPrefix(table, null, fileSinkDesc.getMaxStatsKeyPrefixLength())); - } else { - DynamicPartitionCtx dpCtx = tbd.getDPCtx(); - if (dpCtx == null || dpCtx.getNumDPCols() == 0) { - // static partition - Map partitionSpec = tbd.getPartitionSpec(); - if (partitionSpec != null && !partitionSpec.isEmpty()) { - String aggrPrefix = getAggregationPrefix( - table, partitionSpec, fileSinkDesc.getMaxStatsKeyPrefixLength()); - prefixs.add(aggrPrefix); - } - } else { - // dynamic partition - } - } - } - } - } - } - } - } - return prefixs; - } - - private static String getAggregationPrefix(Table table, Map partitionSpec, int maxKeyLength) - throws MetaException { - StringBuilder prefix = new StringBuilder(); - // prefix is of the form dbName.tblName - prefix.append(table.getDbName()).append('.').append(table.getTableName()); - if (partitionSpec != null) { - return Utilities.join(prefix.toString(), Warehouse.makePartPath(partitionSpec)); - } - return Utilities.getHashedStatsPrefix(prefix.toString(), maxKeyLength); - } - - private static List> getChildTasks( - Task rootTask) { - - List> tasks = new ArrayList>(); - fillChildTasks(tasks, rootTask); - return tasks; - } - - private static void fillChildTasks( - List> tasks, - Task rootTask) { - - List> childTasks = rootTask.getChildTasks(); - tasks.add(rootTask); - if (childTasks != null) { - for (Task task : childTasks) { - fillChildTasks(tasks, task); - } - } - } } diff --git ql/src/test/results/clientpositive/spark/ctas.q.out ql/src/test/results/clientpositive/spark/ctas.q.out index 158ec95..d047613 100644 --- ql/src/test/results/clientpositive/spark/ctas.q.out +++ ql/src/test/results/clientpositive/spark/ctas.q.out @@ -151,8 +151,8 @@ Table Type: MANAGED_TABLE Table Parameters: COLUMN_STATS_ACCURATE true numFiles 1 - numRows 0 - rawDataSize 0 + numRows 10 + rawDataSize 96 totalSize 106 #### A masked pattern was here ####