diff --git ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java index 949f873..41fa1f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java @@ -18,8 +18,12 @@ package org.apache.hadoop.hive.ql; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hive.ql.parse.ColumnStatsList; + /** * A subset of compilation context that is passed to operators to get rid of some globals. * Perhaps this should be rolled into main Context; however, some code necessitates storing the @@ -29,8 +33,15 @@ */ public class CompilationOpContext { private final AtomicInteger opSeqId = new AtomicInteger(0); + private final Map colStatsCache = + new HashMap<>(); public int nextOperatorId() { return opSeqId.getAndIncrement(); } -} \ No newline at end of file + + public Map getColStatsCache() { + return colStatsCache; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index 5dff242..526d1dd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.CalcitePlanner; +import org.apache.hadoop.hive.ql.parse.ColumnStatsList; import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.RowResolver; @@ -285,7 +286,7 @@ private static RelNode createTableScan(Table viewTable) { RelOptHiveTable optTable = new RelOptHiveTable(null, fullyQualifiedTabName, rowType, viewTable, nonPartitionColumns, partitionColumns, new ArrayList(), SessionState.get().getConf(), new HashMap(), - new AtomicInteger()); + new HashMap(), new AtomicInteger()); RelNode tableRel; // 3. Build operator diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index 1d49568..22790de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -46,16 +46,18 @@ import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.ColumnStatsList; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.hive.ql.plan.Statistics.State; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.StatsUtils; @@ -77,9 +79,10 @@ final HiveConf hiveConf; private double rowCount = -1; - Map hiveColStatsMap = new HashMap(); + Map hiveColStatsMap = new HashMap<>(); PrunedPartitionList partitionList; Map partitionCache; + Map colStatsCache; AtomicInteger noColsMissingStats; protected static final Logger LOG = LoggerFactory @@ -89,7 +92,8 @@ public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, RelDataType rowType, Table hiveTblMetadata, List hiveNonPartitionCols, List hivePartitionCols, List hiveVirtualCols, HiveConf hconf, - Map partitionCache, AtomicInteger noColsMissingStats) { + Map partitionCache, Map colStatsCache, + AtomicInteger noColsMissingStats) { super(calciteSchema, qualifiedTblName, rowType); this.hiveTblMetadata = hiveTblMetadata; this.hiveNonPartitionCols = ImmutableList.copyOf(hiveNonPartitionCols); @@ -100,6 +104,7 @@ public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, this.hiveVirtualCols = ImmutableList.copyOf(hiveVirtualCols); this.hiveConf = hconf; this.partitionCache = partitionCache; + this.colStatsCache = colStatsCache; this.noColsMissingStats = noColsMissingStats; } @@ -136,7 +141,7 @@ public RelOptHiveTable copy(RelDataType newRowType) { // 3. Build new Table return new RelOptHiveTable(this.schema, this.name, newRowType, this.hiveTblMetadata, newHiveNonPartitionCols, newHivePartitionCols, newHiveVirtualCols, - this.hiveConf, this.partitionCache, this.noColsMissingStats); + this.hiveConf, this.partitionCache, this.colStatsCache, this.noColsMissingStats); } @Override @@ -295,6 +300,12 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor computePartitionList(hiveConf, null, new HashSet()); } + ColumnStatsList colStatsCached = colStatsCache.get(partitionList.getKey()); + if (colStatsCached == null) { + colStatsCached = new ColumnStatsList(); + colStatsCache.put(partitionList.getKey(), colStatsCached); + } + // 2. Obtain Col Stats for Non Partition Cols if (nonPartColNamesThatRqrStats.size() > 0) { List hiveColStats; @@ -302,11 +313,12 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor if (!hiveTblMetadata.isPartitioned()) { // 2.1 Handle the case for unpartitioned table. hiveColStats = StatsUtils.getTableColumnStats(hiveTblMetadata, hiveNonPartitionCols, - nonPartColNamesThatRqrStats); + nonPartColNamesThatRqrStats, colStatsCached); // 2.1.1 Record Column Names that we needed stats for but couldn't if (hiveColStats == null) { colNamesFailedStats.addAll(nonPartColNamesThatRqrStats); + colStatsCached.updateState(State.NONE); } else if (hiveColStats.size() != nonPartColNamesThatRqrStats.size()) { Set setOfFiledCols = new HashSet(nonPartColNamesThatRqrStats); @@ -317,6 +329,8 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor setOfFiledCols.removeAll(setOfObtainedColStats); colNamesFailedStats.addAll(setOfFiledCols); + + colStatsCached.updateState(State.PARTIAL); } else { // Column stats in hiveColStats might not be in the same order as the columns in // nonPartColNamesThatRqrStats. reorder hiveColStats so we can build hiveColStatsMap @@ -330,6 +344,8 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor for (String colName : nonPartColNamesThatRqrStats) { hiveColStats.add(columnStatsMap.get(colName)); } + + colStatsCached.updateState(State.COMPLETE); } } else { // 2.2 Obtain col stats for partitioned table. @@ -338,14 +354,18 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor // no need to make a metastore call rowCount = 0; hiveColStats = new ArrayList(); - for (String c : nonPartColNamesThatRqrStats) { + for (int i = 0; i < nonPartColNamesThatRqrStats.size(); i++) { // add empty stats object for each column - hiveColStats.add(new ColStatistics(c, null)); + hiveColStats.add( + new ColStatistics( + nonPartColNamesThatRqrStats.get(i), + hiveNonPartitionColsMap.get(nonPartColIndxsThatRqrStats.get(i)).getTypeName())); } colNamesFailedStats.clear(); + colStatsCached.updateState(State.COMPLETE); } else { Statistics stats = StatsUtils.collectStatistics(hiveConf, partitionList, - hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, + hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, colStatsCached, nonPartColNamesThatRqrStats, true, true); rowCount = stats.getNumRows(); hiveColStats = new ArrayList(); @@ -357,6 +377,7 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor colNamesFailedStats.add(c); } } + colStatsCached.updateState(stats.getColumnStatsState()); } } catch (HiveException e) { String logMsg = "Collecting stats failed."; @@ -370,6 +391,12 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor // the columns in nonPartColIndxsThatRqrStats/nonPartColNamesThatRqrStats/hiveColStats // are in same order hiveColStatsMap.put(nonPartColIndxsThatRqrStats.get(i), hiveColStats.get(i)); + colStatsCached.put(hiveColStats.get(i).getColumnName(), hiveColStats.get(i)); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + hiveColStats.get(i).getColumnName() + + " in table " + hiveTblMetadata.getTableName() + " stored in cache"); + LOG.debug(hiveColStats.get(i).toString()); + } } } } @@ -378,11 +405,15 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor if (colNamesFailedStats.isEmpty() && !partColNamesThatRqrStats.isEmpty()) { ColStatistics cStats = null; for (int i = 0; i < partColNamesThatRqrStats.size(); i++) { - cStats = new ColStatistics(partColNamesThatRqrStats.get(i), - hivePartitionColsMap.get(partColIndxsThatRqrStats.get(i)).getTypeName()); - cStats.setCountDistint(getDistinctCount(partitionList.getPartitions(), - partColNamesThatRqrStats.get(i))); + cStats = StatsUtils.getColStatsForPartCol(hivePartitionColsMap.get(partColIndxsThatRqrStats.get(i)), + new PartitionIterable(partitionList.getNotDeniedPartns()), hiveConf); hiveColStatsMap.put(partColIndxsThatRqrStats.get(i), cStats); + colStatsCached.put(cStats.getColumnName(), cStats); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + cStats.getColumnName() + + " in table " + hiveTblMetadata.getTableName() + " stored in cache"); + LOG.debug(cStats.toString()); + } } } @@ -405,14 +436,6 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor } } - private int getDistinctCount(Set partitions, String partColName) { - Set distinctVals = new HashSet(partitions.size()); - for (Partition partition : partitions) { - distinctVals.add(partition.getSpec().get(partColName)); - } - return distinctVals.size(); - } - public List getColStat(List projIndxLst) { return getColStat(projIndxLst, false); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java index fed1664..bf07a17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java @@ -36,7 +36,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; @@ -62,7 +62,8 @@ private final String tblAlias; private final String concatQbIDAlias; private final boolean useQBIdInDigest; - private final ImmutableSet viurtualOrPartColIndxsInTS; + private final ImmutableSet virtualOrPartColIndxsInTS; + private final ImmutableSet virtualColIndxsInTS; // insiderView will tell this TableScan is inside a view or not. private final boolean insideView; @@ -98,9 +99,11 @@ private HiveTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptHiveTab this.tblAlias = alias; this.concatQbIDAlias = concatQbIDAlias; this.hiveTableScanRowType = newRowtype; - Pair, ImmutableSet> colIndxPair = buildColIndxsFrmReloptHT(table, newRowtype); - this.neededColIndxsFrmReloptHT = colIndxPair.getKey(); - this.viurtualOrPartColIndxsInTS = colIndxPair.getValue(); + Triple, ImmutableSet, ImmutableSet> colIndxPair = + buildColIndxsFrmReloptHT(table, newRowtype); + this.neededColIndxsFrmReloptHT = colIndxPair.getLeft(); + this.virtualOrPartColIndxsInTS = colIndxPair.getMiddle(); + this.virtualColIndxsInTS = colIndxPair.getRight(); this.useQBIdInDigest = useQBIdInDigest; this.insideView = insideView; } @@ -203,36 +206,47 @@ public RelDataType getPrunedRowType() { } public Set getPartOrVirtualCols() { - return viurtualOrPartColIndxsInTS; + return virtualOrPartColIndxsInTS; } - private static Pair, ImmutableSet> buildColIndxsFrmReloptHT( + public Set getVirtualCols() { + return virtualColIndxsInTS; + } + + private static Triple, ImmutableSet, ImmutableSet> buildColIndxsFrmReloptHT( RelOptHiveTable relOptHTable, RelDataType scanRowType) { RelDataType relOptHtRowtype = relOptHTable.getRowType(); - ImmutableList neededColIndxsFrmReloptHT; Builder neededColIndxsFrmReloptHTBldr = new ImmutableList.Builder(); - ImmutableSet viurtualOrPartColIndxsInTS; - ImmutableSet.Builder viurtualOrPartColIndxsInTSBldr = new ImmutableSet.Builder(); + ImmutableSet.Builder virtualOrPartColIndxsInTSBldr = + new ImmutableSet.Builder(); + ImmutableSet.Builder virtualColIndxsInTSBldr = + new ImmutableSet.Builder(); Map colNameToPosInReloptHT = HiveCalciteUtil .getRowColNameIndxMap(relOptHtRowtype.getFieldList()); List colNamesInScanRowType = scanRowType.getFieldNames(); - int partOrVirtualColStartPosInrelOptHtRowtype = relOptHTable.getNonPartColumns().size(); + int partColStartPosInrelOptHtRowtype = relOptHTable.getNonPartColumns().size(); + int virtualColStartPosInrelOptHtRowtype = + relOptHTable.getNonPartColumns().size() + relOptHTable.getPartColumns().size(); int tmp; for (int i = 0; i < colNamesInScanRowType.size(); i++) { tmp = colNameToPosInReloptHT.get(colNamesInScanRowType.get(i)); neededColIndxsFrmReloptHTBldr.add(tmp); - if (tmp >= partOrVirtualColStartPosInrelOptHtRowtype) { - viurtualOrPartColIndxsInTSBldr.add(i); + if (tmp >= partColStartPosInrelOptHtRowtype) { + // Part or virtual + virtualOrPartColIndxsInTSBldr.add(i); + if (tmp >= virtualColStartPosInrelOptHtRowtype) { + // Virtual + virtualColIndxsInTSBldr.add(i); + } } } - neededColIndxsFrmReloptHT = neededColIndxsFrmReloptHTBldr.build(); - viurtualOrPartColIndxsInTS = viurtualOrPartColIndxsInTSBldr.build(); - - return new Pair, ImmutableSet>(neededColIndxsFrmReloptHT, - viurtualOrPartColIndxsInTS); + return Triple.of( + neededColIndxsFrmReloptHTBldr.build(), + virtualOrPartColIndxsInTSBldr.build(), + virtualColIndxsInTSBldr.build()); } public boolean isInsideView() { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java index 1801b83..68d9057 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelFieldTrimmer.java @@ -301,7 +301,7 @@ private void fetchColStats(RelNode key, TableScan tableAccessRel, ImmutableBitSe //Remove any virtual cols if (tableAccessRel instanceof HiveTableScan) { - iRefSet.removeAll(((HiveTableScan)tableAccessRel).getPartOrVirtualCols()); + iRefSet.removeAll(((HiveTableScan)tableAccessRel).getVirtualCols()); } if (!iRefSet.isEmpty()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index 6624865..8704b0d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -206,24 +206,26 @@ public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, String oldFilter = prunerExpr.getExprString(); if (compactExpr == null || isBooleanExpr(compactExpr)) { if (isFalseExpr(compactExpr)) { - return new PrunedPartitionList( - tab, new LinkedHashSet(0), new ArrayList(0), false); + return new PrunedPartitionList(tab, key + compactExpr.getExprString(), + new LinkedHashSet(0), new ArrayList(0), false); } // For null and true values, return every partition return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap); } + + String compactExprString = compactExpr.getExprString(); if (LOG.isDebugEnabled()) { - LOG.debug("Filter w/ compacting: " + compactExpr.getExprString() + LOG.debug("Filter w/ compacting: " + compactExprString + "; filter w/o compacting: " + oldFilter); } - - key = key + compactExpr.getExprString(); + key = key + compactExprString; PrunedPartitionList ppList = prunedPartitionsMap.get(key); if (ppList != null) { return ppList; } - ppList = getPartitionsFromServer(tab, (ExprNodeGenericFuncDesc)compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString())); + ppList = getPartitionsFromServer(tab, key, (ExprNodeGenericFuncDesc)compactExpr, + conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString())); prunedPartitionsMap.put(key, ppList); return ppList; } @@ -240,7 +242,7 @@ private static PrunedPartitionList getAllPartsFromCacheOrServer(Table tab, Strin } catch (HiveException e) { throw new SemanticException(e); } - ppList = new PrunedPartitionList(tab, parts, null, unknownPartitions); + ppList = new PrunedPartitionList(tab, key, parts, null, unknownPartitions); if (partsCache != null) { partsCache.put(key, ppList); } @@ -430,8 +432,8 @@ static private boolean hasUserFunctions(ExprNodeDesc expr) { return false; } - private static PrunedPartitionList getPartitionsFromServer(Table tab, - final ExprNodeGenericFuncDesc compactExpr, HiveConf conf, String alias, Set partColsUsedInFilter, boolean isPruningByExactFilter) throws SemanticException { + private static PrunedPartitionList getPartitionsFromServer(Table tab, final String key, final ExprNodeGenericFuncDesc compactExpr, + HiveConf conf, String alias, Set partColsUsedInFilter, boolean isPruningByExactFilter) throws SemanticException { try { // Finally, check the filter for non-built-in UDFs. If these are present, we cannot @@ -462,7 +464,8 @@ private static PrunedPartitionList getPartitionsFromServer(Table tab, // The partitions are "unknown" if the call says so due to the expression // evaluator returning null for a partition, or if we sent a partial expression to // metastore and so some partitions may have no data based on other filters. - return new PrunedPartitionList(tab, new LinkedHashSet(partitions), + return new PrunedPartitionList(tab, key, + new LinkedHashSet(partitions), new ArrayList(partColsUsedInFilter), hasUnknownPartitions || !isPruningByExactFilter); } catch (SemanticException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 6cb0559..ad29d65 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ColumnStatsList; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; @@ -123,11 +124,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, TableScanOperator tsop = (TableScanOperator) nd; AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx; PrunedPartitionList partList = aspCtx.getParseContext().getPrunedPartitions(tsop); + ColumnStatsList colStatsCached = aspCtx.getParseContext().getColStatsCached(partList); Table table = tsop.getConf().getTableMetadata(); try { // gather statistics for the first time and the attach it to table scan operator - Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop); + Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, colStatsCached, table, tsop); tsop.setStatistics(stats.clone()); if (LOG.isDebugEnabled()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index c9cb298..2645fab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse; -import org.antlr.runtime.tree.Tree; - import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; @@ -47,6 +45,7 @@ import org.antlr.runtime.ClassicToken; import org.antlr.runtime.CommonToken; +import org.antlr.runtime.tree.Tree; import org.antlr.runtime.tree.TreeVisitor; import org.antlr.runtime.tree.TreeVisitorAction; import org.apache.calcite.adapter.druid.DruidQuery; @@ -54,6 +53,7 @@ import org.apache.calcite.adapter.druid.DruidSchema; import org.apache.calcite.adapter.druid.DruidTable; import org.apache.calcite.adapter.druid.LocalInterval; +import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.config.CalciteConnectionConfigImpl; import org.apache.calcite.config.CalciteConnectionProperty; import org.apache.calcite.plan.RelOptCluster; @@ -142,8 +142,8 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; -import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSubquerySemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSubquerySemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteViewSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider; @@ -263,8 +263,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; -import org.apache.calcite.config.CalciteConnectionConfig; - public class CalcitePlanner extends SemanticAnalyzer { private final AtomicInteger noColsMissingStats = new AtomicInteger(0); @@ -1093,7 +1091,10 @@ RelNode logicalPlan() throws SemanticException { if (this.columnAccessInfo == null) { this.columnAccessInfo = new ColumnAccessInfo(); } - calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, this.columnAccessInfo); + calcitePlannerAction = new CalcitePlannerAction( + prunedPartitions, + ctx.getOpContext().getColStatsCache(), + this.columnAccessInfo); try { optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks @@ -1130,7 +1131,10 @@ Operator getOptimizedHiveOPDag() throws SemanticException { if (this.columnAccessInfo == null) { this.columnAccessInfo = new ColumnAccessInfo(); } - calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, this.columnAccessInfo); + calcitePlannerAction = new CalcitePlannerAction( + prunedPartitions, + ctx.getOpContext().getColStatsCache(), + this.columnAccessInfo); try { optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks @@ -1292,6 +1296,7 @@ private RowResolver genRowResolver(Operator op, QB qb) { private RelOptCluster cluster; private RelOptSchema relOptSchema; private final Map partitionCache; + private final Map colStatsCache; private final ColumnAccessInfo columnAccessInfo; private Map viewProjectToTableSchema; @@ -1309,8 +1314,12 @@ private RowResolver genRowResolver(Operator op, QB qb) { LinkedHashMap relToHiveRR = new LinkedHashMap(); LinkedHashMap> relToHiveColNameCalcitePosMap = new LinkedHashMap>(); - CalcitePlannerAction(Map partitionCache, ColumnAccessInfo columnAccessInfo) { + CalcitePlannerAction( + Map partitionCache, + Map colStatsCache, + ColumnAccessInfo columnAccessInfo) { this.partitionCache = partitionCache; + this.colStatsCache = colStatsCache; this.columnAccessInfo = columnAccessInfo; } @@ -2333,7 +2342,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, - partitionCache, noColsMissingStats); + partitionCache, colStatsCache, noColsMissingStats); // Build Druid query String address = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); @@ -2378,7 +2387,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, - partitionCache, noColsMissingStats); + partitionCache, colStatsCache, noColsMissingStats); // Build Hive Table Scan Rel tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable, null == tableAlias ? tabMetaData.getTableName() : tableAlias, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsList.java ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsList.java new file mode 100644 index 0000000..6930967 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsList.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.ql.plan.ColStatistics; +import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.hive.ql.plan.Statistics.State; + +/** + * The list of pruned partitions. + */ +public class ColumnStatsList { + + /** State. */ + private State state; + + /** Column stats. */ + private final Map colStats; + + public ColumnStatsList() { + this.state = State.NONE; + this.colStats = new HashMap<>(); + } + + public State getState() { + return state; + } + + public void updateState(State newState) { + this.state = Statistics.inferColumnStatsState(this.state, newState); + } + + public Map getColStats() { + return colStats; + } + + public void put(String columnName, ColStatistics cStats) { + this.colStats.put(columnName, cStats); + } + + @Override + public String toString() { + return "{ " + state + ", " + colStats + " }"; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 565fbef..adf7f84 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -29,18 +29,27 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.*; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.ListSinkOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; -import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.CreateViewDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -426,6 +435,21 @@ public void setOpToSamplePruner( } /** + * @return col stats + */ + public Map getColStatsCache() { + return ctx.getOpContext().getColStatsCache(); + } + + /** + * @param partList + * @return col stats + */ + public ColumnStatsList getColStatsCached(PrunedPartitionList partList) { + return ctx.getOpContext().getColStatsCache().get(partList.getKey()); + } + + /** * @return pruned partition map */ public Map getPrunedPartitions() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java index da2e1e2..656455f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java @@ -30,6 +30,9 @@ */ public class PrunedPartitionList { + /** Key to identify this partition list. */ + private final String ppListKey; + /** Source table. */ private final Table source; @@ -42,9 +45,19 @@ /** Whether there are partitions in the list that may or may not satisfy the criteria. */ private boolean hasUnknowns; - public PrunedPartitionList(Table source, Set partitions, List referred, - boolean hasUnknowns) { + public PrunedPartitionList(Table source, Set partitions, + List referred, boolean hasUnknowns) { this.source = source; + this.ppListKey = null; + this.referred = referred; + this.partitions = partitions; + this.hasUnknowns = hasUnknowns; + } + + public PrunedPartitionList(Table source, String key, Set partitions, + List referred, boolean hasUnknowns) { + this.source = source; + this.ppListKey = key; this.referred = referred; this.partitions = partitions; this.hasUnknowns = hasUnknowns; @@ -54,6 +67,10 @@ public Table getSourceTable() { return source; } + public String getKey() { + return ppListKey; + } + /** * @return partitions */ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 4faec05..bc6e0d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -402,10 +402,13 @@ public SemanticAnalyzer(QueryState queryState) throws SemanticException { } @Override - protected void reset(boolean clearPartsCache) { + protected void reset(boolean clearCache) { super.reset(true); - if(clearPartsCache) { + if(clearCache) { prunedPartitions.clear(); + if (ctx != null) { + ctx.getOpContext().getColStatsCache().clear(); + } //When init(true) combine with genResolvedParseTree, it will generate Resolved Parse tree from syntax tree //ReadEntity created under these conditions should be all relevant to the syntax tree even the ones without parents diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java index c46ea70..8ffb4ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.hadoop.hive.ql.plan.Explain.Level; -import org.apache.hadoop.hive.ql.stats.StatsUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -215,6 +214,10 @@ public void addToColumnStats(List colStats) { } } + public void updateColumnStatsState(State newState) { + this.columnStatsState = inferColumnStatsState(columnStatsState, newState); + } + // newState // ----------------------------------------- // columnStatsState | COMPLETE PARTIAL NONE | @@ -223,26 +226,28 @@ public void addToColumnStats(List colStats) { // PARTIAL | PARTIAL PARTIAL PARTIAL | // NONE | COMPLETE PARTIAL NONE | // ----------------------------------------- - public void updateColumnStatsState(State newState) { + public static State inferColumnStatsState(State prevState, State newState) { if (newState.equals(State.PARTIAL)) { - columnStatsState = State.PARTIAL; + return State.PARTIAL; } if (newState.equals(State.NONE)) { - if (columnStatsState.equals(State.NONE)) { - columnStatsState = State.NONE; + if (prevState.equals(State.NONE)) { + return State.NONE; } else { - columnStatsState = State.PARTIAL; + return State.PARTIAL; } } if (newState.equals(State.COMPLETE)) { - if (columnStatsState.equals(State.PARTIAL)) { - columnStatsState = State.PARTIAL; + if (prevState.equals(State.PARTIAL)) { + return State.PARTIAL; } else { - columnStatsState = State.COMPLETE; + return State.COMPLETE; } } + + return prevState; } public long getAvgRowSize() { diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 3b9ab41..eb02a91 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -22,7 +22,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -56,8 +55,8 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ColumnStatsList; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.ColStatistics.Range; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -135,7 +134,7 @@ * @return statistics object * @throws HiveException */ - public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, + public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, ColumnStatsList colStatsCache, Table table, TableScanOperator tableScanOperator) throws HiveException { // column level statistics are required only for the columns that are needed @@ -143,20 +142,22 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa List neededColumns = tableScanOperator.getNeededColumns(); List referencedColumns = tableScanOperator.getReferencedColumns(); - return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns); + return collectStatistics(conf, partList, table, schema, neededColumns, colStatsCache, referencedColumns); } private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, - Table table, List schema, List neededColumns, + Table table, List schema, List neededColumns, ColumnStatsList colStatsCache, List referencedColumns) throws HiveException { boolean fetchColStats = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS); boolean fetchPartStats = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_PARTITION_STATS); + boolean testMode = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST); - return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns, - fetchColStats, fetchPartStats); + return collectStatistics(conf, partList, table, schema, neededColumns, colStatsCache, referencedColumns, + fetchColStats, fetchPartStats, testMode); } private static long getDataSize(HiveConf conf, Table table) { @@ -178,7 +179,7 @@ private static long getDataSize(HiveConf conf, Table table) { private static long getNumRows(HiveConf conf, List schema, List neededColumns, Table table, long ds) { long nr = getNumRows(table); - // number of rows -1 means that statistics from metastore is not reliable + // number of rows -1 means that statistics from metastore is not reliable // and 0 means statistics gathering is disabled if (nr <= 0) { int avgRowSize = estimateRowSizeFromSchema(conf, schema, neededColumns); @@ -193,9 +194,17 @@ private static long getNumRows(HiveConf conf, List schema, List schema, List neededColumns, + Table table, List schema, List neededColumns, ColumnStatsList colStatsCache, List referencedColumns, boolean fetchColStats, boolean fetchPartStats) throws HiveException { + return collectStatistics(conf, partList, table, schema, neededColumns, colStatsCache, + referencedColumns, fetchColStats, fetchPartStats, false); + } + + private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, + Table table, List schema, List neededColumns, ColumnStatsList colStatsCache, + List referencedColumns, boolean fetchColStats, boolean fetchPartStats, boolean failIfCacheMiss) + throws HiveException { Statistics stats = new Statistics(); @@ -209,11 +218,11 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa stats.setNumRows(nr); List colStats = Lists.newArrayList(); if (fetchColStats) { - colStats = getTableColumnStats(table, schema, neededColumns); + colStats = getTableColumnStats(table, schema, neededColumns, colStatsCache); long betterDS = getDataSizeFromColumnStats(nr, colStats); ds = (betterDS < 1 || colStats.isEmpty()) ? ds : betterDS; } - stats.setDataSize(ds); + stats.setDataSize(ds); // infer if any column can be primary key based on column statistics inferAndSetPrimaryKey(stats.getNumRows(), colStats); @@ -262,6 +271,8 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa nr = ds / avgRowSize; } } + + // Minimum values if (nr == 0) { nr = 1; } @@ -274,56 +285,121 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa stats.setBasicStatsState(State.PARTIAL); } if (fetchColStats) { + List partitionCols = getPartitionColumns( + schema, neededColumns, referencedColumns); + + // We will retrieve stats from the metastore only for columns that are not cached + List neededColsToRetrieve; + List partitionColsToRetrieve; + List columnStats = new ArrayList<>(); + if (colStatsCache != null) { + neededColsToRetrieve = new ArrayList(neededColumns.size()); + for (String colName : neededColumns) { + ColStatistics colStats = colStatsCache.getColStats().get(colName); + if (colStats == null) { + neededColsToRetrieve.add(colName); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + colName + + " in table " + table.getCompleteName() + " could not be retrieved from cache"); + } + } else { + columnStats.add(colStats); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + colName + + " in table " + table.getCompleteName() + " retrieved from cache"); + } + } + } + partitionColsToRetrieve = new ArrayList<>(partitionCols.size()); + for (String colName : partitionCols) { + ColStatistics colStats = colStatsCache.getColStats().get(colName); + if (colStats == null) { + partitionColsToRetrieve.add(colName); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + colName + + " in table " + table.getCompleteName() + " could not be retrieved from cache"); + } + } else { + columnStats.add(colStats); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + colName + + " in table " + table.getCompleteName() + " retrieved from cache"); + } + } + } + } else { + neededColsToRetrieve = neededColumns; + partitionColsToRetrieve = partitionCols; + } + + // List of partitions List partNames = new ArrayList(partList.getNotDeniedPartns().size()); for (Partition part : partList.getNotDeniedPartns()) { partNames.add(part.getName()); } - neededColumns = processNeededColumns(schema, neededColumns); + AggrStats aggrStats = null; // We check the sizes of neededColumns and partNames here. If either // size is 0, aggrStats is null after several retries. Thus, we can // skip the step to connect to the metastore. - if (neededColumns.size() > 0 && partNames.size() > 0) { + if (neededColsToRetrieve.size() > 0 && partNames.size() > 0) { aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(), - neededColumns, partNames); + neededColsToRetrieve, partNames); } - if (null == aggrStats || null == aggrStats.getColStats() - || aggrStats.getColStatsSize() == 0) { + + boolean statsRetrieved = aggrStats != null && + aggrStats.getColStats() != null && aggrStats.getColStatsSize() != 0; + if (neededColumns.size() == 0 || + (neededColsToRetrieve.size() > 0 && !statsRetrieved)) { // There are some partitions with no state (or we didn't fetch any state). // Update the stats with empty list to reflect that in the // state/initialize structures. - List emptyStats = Lists.newArrayList(); - - // add partition column stats - addParitionColumnStats(conf, neededColumns, referencedColumns, schema, table, partList, - emptyStats); - stats.addToColumnStats(emptyStats); - stats.addToDataSize(getDataSizeFromColumnStats(nr, emptyStats)); - stats.updateColumnStatsState(deriveStatType(emptyStats, referencedColumns)); + addPartitionColumnStats(conf, partitionColsToRetrieve, schema, table, partList, columnStats); + stats.addToColumnStats(columnStats); + stats.addToDataSize(getDataSizeFromColumnStats(nr, columnStats)); + stats.updateColumnStatsState(deriveStatType(columnStats, referencedColumns)); } else { - List colStats = aggrStats.getColStats(); - if (colStats.size() != neededColumns.size()) { - LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to" + - " retrieve for " + colStats.size() + " columns"); + if (statsRetrieved) { + columnStats.addAll(convertColStats(aggrStats.getColStats(), table.getTableName())); + } + int colStatsAvailable = neededColumns.size() + partitionCols.size() - partitionColsToRetrieve.size(); + if (columnStats.size() != colStatsAvailable) { + LOG.debug("Column stats requested for : {} columns. Able to retrieve for {} columns", + columnStats.size(), colStatsAvailable); } - List columnStats = convertColStats(colStats, table.getTableName()); - - addParitionColumnStats(conf, neededColumns, referencedColumns, schema, table, partList, - columnStats); + addPartitionColumnStats(conf, partitionColsToRetrieve, schema, table, partList, columnStats); long betterDS = getDataSizeFromColumnStats(nr, columnStats); stats.setDataSize((betterDS < 1 || columnStats.isEmpty()) ? ds : betterDS); // infer if any column can be primary key based on column statistics inferAndSetPrimaryKey(stats.getNumRows(), columnStats); stats.addToColumnStats(columnStats); - State colState = deriveStatType(columnStats, referencedColumns); - if (aggrStats.getPartsFound() != partNames.size() && colState != State.NONE) { - LOG.debug("Column stats requested for : " + partNames.size() + " partitions. " - + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions"); - colState = State.PARTIAL; + + // Infer column stats state + stats.setColumnStatsState(deriveStatType(columnStats, referencedColumns)); + if (neededColumns.size() != neededColsToRetrieve.size() || + partitionCols.size() != partitionColsToRetrieve.size()) { + // Include state for cached columns + stats.updateColumnStatsState(colStatsCache.getState()); + } + // Change if we could not retrieve for all partitions + if (aggrStats != null && aggrStats.getPartsFound() != partNames.size() && stats.getColumnStatsState() != State.NONE) { + stats.updateColumnStatsState(State.PARTIAL); + LOG.debug("Column stats requested for : {} partitions. Able to retrieve for {} partitions", + partNames.size(), aggrStats.getPartsFound()); } - stats.setColumnStatsState(colState); + } + + // This block exists for debugging purposes: we want to check whether + // the col stats cache is working properly and we are retrieving the + // stats from metastore only once. + if (colStatsCache != null && failIfCacheMiss && + stats.getColumnStatsState().equals(State.COMPLETE) && + (!neededColsToRetrieve.isEmpty() || !partitionColsToRetrieve.isEmpty())) { + throw new HiveException("Cache has been loaded in logical planning phase for all columns; " + + "however, stats for column some columns could not be retrieved from it " + + "(see messages above)"); } } } @@ -415,13 +491,25 @@ private static boolean isWithin(ColStatistics.Range range1, ColStatistics.Range return false; } - private static void addParitionColumnStats(HiveConf conf, List neededColumns, - List referencedColumns, List schema, Table table, - PrunedPartitionList partList, List colStats) - throws HiveException { + private static void addPartitionColumnStats(HiveConf conf, List partitionCols, + List schema, Table table, PrunedPartitionList partList, List colStats) + throws HiveException { + for (String col : partitionCols) { + for (ColumnInfo ci : schema) { + // conditions for being partition column + if (col.equals(ci.getInternalName())) { + colStats.add(getColStatsForPartCol(ci, new PartitionIterable(partList.getPartitions()), conf)); + } + } + } + } + private static List getPartitionColumns(List schema, + List neededColumns, + List referencedColumns) { // extra columns is difference between referenced columns vs needed // columns. The difference could be partition columns. + List partitionCols = new ArrayList<>(referencedColumns.size()); List extraCols = Lists.newArrayList(referencedColumns); if (referencedColumns.size() > neededColumns.size()) { extraCols.removeAll(neededColumns); @@ -430,11 +518,12 @@ private static void addParitionColumnStats(HiveConf conf, List neededCol // conditions for being partition column if (col.equals(ci.getInternalName()) && ci.getIsVirtualCol() && !ci.isHiddenVirtualCol()) { - colStats.add(getColStatsForPartCol(ci, new PartitionIterable(partList.getPartitions()), conf)); + partitionCols.add(col); } } } } + return partitionCols; } public static ColStatistics getColStatsForPartCol(ColumnInfo ci,PartitionIterable partList, HiveConf conf) { @@ -791,23 +880,49 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String tab * @return column statistics */ public static List getTableColumnStats( - Table table, List schema, List neededColumns) { + Table table, List schema, List neededColumns, + ColumnStatsList colStatsCache) { if (table.isMaterializedTable()) { LOG.debug("Materialized table does not contain table statistics"); return null; } + // We will retrieve stats from the metastore only for columns that are not cached + List colStatsToRetrieve; + if (colStatsCache != null) { + colStatsToRetrieve = new ArrayList<>(neededColumns.size()); + for (String colName : neededColumns) { + if (!colStatsCache.getColStats().containsKey(colName)) { + colStatsToRetrieve.add(colName); + } + } + } else { + colStatsToRetrieve = neededColumns; + } + // Retrieve stats from metastore String dbName = table.getDbName(); String tabName = table.getTableName(); - List neededColsInTable = processNeededColumns(schema, neededColumns); List stats = null; try { List colStat = Hive.get().getTableColumnStatistics( - dbName, tabName, neededColsInTable); + dbName, tabName, colStatsToRetrieve); stats = convertColStats(colStat, tabName); } catch (HiveException e) { LOG.error("Failed to retrieve table statistics: ", e); stats = null; } + // Merge stats from cache with metastore cache + if (colStatsCache != null) { + for (int i = 0; i < neededColumns.size(); i++) { + ColStatistics cs = colStatsCache.getColStats().get(neededColumns.get(i)); + if (cs != null) { + stats.add(i, cs); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + cs.getColumnName() + + " in table " + table.getCompleteName() + " retrieved from cache"); + } + } + } + } return stats; } @@ -824,22 +939,6 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String tab } return stats; } - private static List processNeededColumns(List schema, - List neededColumns) { - // Remove hidden virtual columns, as well as needed columns that are not - // part of the table. TODO: the latter case should not really happen... - List neededColsInTable = null; - int limit = neededColumns.size(); - for (int i = 0; i < limit; ++i) { - if (neededColsInTable == null) { - neededColsInTable = Lists.newArrayList(neededColumns); - } - neededColsInTable.remove(i--); - --limit; - } - return (neededColsInTable == null || neededColsInTable.size() == 0) ? neededColumns - : neededColsInTable; - } /** * Get the raw data size of variable length data types diff --git ql/src/test/results/clientpositive/llap/tez_smb_empty.q.out ql/src/test/results/clientpositive/llap/tez_smb_empty.q.out index e4c246a..17860f4 100644 --- ql/src/test/results/clientpositive/llap/tez_smb_empty.q.out +++ ql/src/test/results/clientpositive/llap/tez_smb_empty.q.out @@ -551,14 +551,14 @@ STAGE PLANS: Map Operator Tree: TableScan alias: s3 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Filter Operator predicate: key is not null (type: boolean) - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Select Operator expressions: key (type: int) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Map Operator Tree: TableScan alias: s1 @@ -646,14 +646,14 @@ STAGE PLANS: Map Operator Tree: TableScan alias: s2 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Filter Operator predicate: key is not null (type: boolean) - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Select Operator expressions: key (type: int) outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Map Operator Tree: TableScan alias: s3 diff --git ql/src/test/results/clientpositive/llap/vector_mr_diff_schema_alias.q.out ql/src/test/results/clientpositive/llap/vector_mr_diff_schema_alias.q.out index 03c6d3f..93c8715 100644 --- ql/src/test/results/clientpositive/llap/vector_mr_diff_schema_alias.q.out +++ ql/src/test/results/clientpositive/llap/vector_mr_diff_schema_alias.q.out @@ -255,19 +255,19 @@ STAGE PLANS: Map Operator Tree: TableScan alias: store_sales - Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE Filter Operator predicate: (ss_store_sk is not null and ss_sold_date_sk is not null) (type: boolean) - Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: ss_sold_date_sk (type: int), ss_store_sk (type: int) outputColumnNames: _col0, _col1 - Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col1 (type: int) sort order: + Map-reduce partition columns: _col1 (type: int) - Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col0 (type: int) Execution mode: vectorized, llap LLAP IO: unknown