diff --git ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java index 949f873..30201c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java @@ -18,8 +18,12 @@ package org.apache.hadoop.hive.ql; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hive.ql.plan.ColStatistics; + /** * A subset of compilation context that is passed to operators to get rid of some globals. * Perhaps this should be rolled into main Context; however, some code necessitates storing the @@ -29,8 +33,15 @@ */ public class CompilationOpContext { private final AtomicInteger opSeqId = new AtomicInteger(0); + private final Map> colStatsCache = + new HashMap<>(); public int nextOperatorId() { return opSeqId.getAndIncrement(); } -} \ No newline at end of file + + public Map> getColStatsCache() { + return colStatsCache; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index 5dff242..24ac0b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -64,6 +65,7 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -285,7 +287,7 @@ private static RelNode createTableScan(Table viewTable) { RelOptHiveTable optTable = new RelOptHiveTable(null, fullyQualifiedTabName, rowType, viewTable, nonPartitionColumns, partitionColumns, new ArrayList(), SessionState.get().getConf(), new HashMap(), - new AtomicInteger()); + new HashMap>(), new AtomicInteger()); RelNode tableRel; // 3. Build operator diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index 1d49568..d551f38 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -77,9 +77,10 @@ final HiveConf hiveConf; private double rowCount = -1; - Map hiveColStatsMap = new HashMap(); + Map hiveColStatsMap = new HashMap<>(); PrunedPartitionList partitionList; Map partitionCache; + Map> colStatsCache; AtomicInteger noColsMissingStats; protected static final Logger LOG = LoggerFactory @@ -89,7 +90,8 @@ public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, RelDataType rowType, Table hiveTblMetadata, List hiveNonPartitionCols, List hivePartitionCols, List hiveVirtualCols, HiveConf hconf, - Map partitionCache, AtomicInteger noColsMissingStats) { + Map partitionCache, Map> colStatsCache, + AtomicInteger noColsMissingStats) { super(calciteSchema, qualifiedTblName, rowType); this.hiveTblMetadata = hiveTblMetadata; this.hiveNonPartitionCols = ImmutableList.copyOf(hiveNonPartitionCols); @@ -100,6 +102,7 @@ public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, this.hiveVirtualCols = ImmutableList.copyOf(hiveVirtualCols); this.hiveConf = hconf; this.partitionCache = partitionCache; + this.colStatsCache = colStatsCache; this.noColsMissingStats = noColsMissingStats; } @@ -136,7 +139,7 @@ public RelOptHiveTable copy(RelDataType newRowType) { // 3. Build new Table return new RelOptHiveTable(this.schema, this.name, newRowType, this.hiveTblMetadata, newHiveNonPartitionCols, newHivePartitionCols, newHiveVirtualCols, - this.hiveConf, this.partitionCache, this.noColsMissingStats); + this.hiveConf, this.partitionCache, this.colStatsCache, this.noColsMissingStats); } @Override @@ -295,6 +298,12 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor computePartitionList(hiveConf, null, new HashSet()); } + Map colStatsCached = colStatsCache.get(partitionList.getKey()); + if (colStatsCached == null) { + colStatsCached = new HashMap<>(); + colStatsCache.put(partitionList.getKey(), colStatsCached); + } + // 2. Obtain Col Stats for Non Partition Cols if (nonPartColNamesThatRqrStats.size() > 0) { List hiveColStats; @@ -302,7 +311,7 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor if (!hiveTblMetadata.isPartitioned()) { // 2.1 Handle the case for unpartitioned table. hiveColStats = StatsUtils.getTableColumnStats(hiveTblMetadata, hiveNonPartitionCols, - nonPartColNamesThatRqrStats); + nonPartColNamesThatRqrStats, colStatsCached); // 2.1.1 Record Column Names that we needed stats for but couldn't if (hiveColStats == null) { @@ -345,7 +354,7 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor colNamesFailedStats.clear(); } else { Statistics stats = StatsUtils.collectStatistics(hiveConf, partitionList, - hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, + hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, colStatsCached, nonPartColNamesThatRqrStats, true, true); rowCount = stats.getNumRows(); hiveColStats = new ArrayList(); @@ -370,6 +379,7 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor // the columns in nonPartColIndxsThatRqrStats/nonPartColNamesThatRqrStats/hiveColStats // are in same order hiveColStatsMap.put(nonPartColIndxsThatRqrStats.get(i), hiveColStats.get(i)); + colStatsCached.put(hiveColStats.get(i).getColumnName(), hiveColStats.get(i)); } } } @@ -383,6 +393,7 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor cStats.setCountDistint(getDistinctCount(partitionList.getPartitions(), partColNamesThatRqrStats.get(i))); hiveColStatsMap.put(partColIndxsThatRqrStats.get(i), cStats); + colStatsCached.put(cStats.getColumnName(), cStats); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index 6624865..8704b0d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -206,24 +206,26 @@ public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, String oldFilter = prunerExpr.getExprString(); if (compactExpr == null || isBooleanExpr(compactExpr)) { if (isFalseExpr(compactExpr)) { - return new PrunedPartitionList( - tab, new LinkedHashSet(0), new ArrayList(0), false); + return new PrunedPartitionList(tab, key + compactExpr.getExprString(), + new LinkedHashSet(0), new ArrayList(0), false); } // For null and true values, return every partition return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap); } + + String compactExprString = compactExpr.getExprString(); if (LOG.isDebugEnabled()) { - LOG.debug("Filter w/ compacting: " + compactExpr.getExprString() + LOG.debug("Filter w/ compacting: " + compactExprString + "; filter w/o compacting: " + oldFilter); } - - key = key + compactExpr.getExprString(); + key = key + compactExprString; PrunedPartitionList ppList = prunedPartitionsMap.get(key); if (ppList != null) { return ppList; } - ppList = getPartitionsFromServer(tab, (ExprNodeGenericFuncDesc)compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString())); + ppList = getPartitionsFromServer(tab, key, (ExprNodeGenericFuncDesc)compactExpr, + conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString())); prunedPartitionsMap.put(key, ppList); return ppList; } @@ -240,7 +242,7 @@ private static PrunedPartitionList getAllPartsFromCacheOrServer(Table tab, Strin } catch (HiveException e) { throw new SemanticException(e); } - ppList = new PrunedPartitionList(tab, parts, null, unknownPartitions); + ppList = new PrunedPartitionList(tab, key, parts, null, unknownPartitions); if (partsCache != null) { partsCache.put(key, ppList); } @@ -430,8 +432,8 @@ static private boolean hasUserFunctions(ExprNodeDesc expr) { return false; } - private static PrunedPartitionList getPartitionsFromServer(Table tab, - final ExprNodeGenericFuncDesc compactExpr, HiveConf conf, String alias, Set partColsUsedInFilter, boolean isPruningByExactFilter) throws SemanticException { + private static PrunedPartitionList getPartitionsFromServer(Table tab, final String key, final ExprNodeGenericFuncDesc compactExpr, + HiveConf conf, String alias, Set partColsUsedInFilter, boolean isPruningByExactFilter) throws SemanticException { try { // Finally, check the filter for non-built-in UDFs. If these are present, we cannot @@ -462,7 +464,8 @@ private static PrunedPartitionList getPartitionsFromServer(Table tab, // The partitions are "unknown" if the call says so due to the expression // evaluator returning null for a partition, or if we sent a partial expression to // metastore and so some partitions may have no data based on other filters. - return new PrunedPartitionList(tab, new LinkedHashSet(partitions), + return new PrunedPartitionList(tab, key, + new LinkedHashSet(partitions), new ArrayList(partColsUsedInFilter), hasUnknownPartitions || !isPruningByExactFilter); } catch (SemanticException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 6cb0559..fff1bd9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -123,11 +123,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, TableScanOperator tsop = (TableScanOperator) nd; AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx; PrunedPartitionList partList = aspCtx.getParseContext().getPrunedPartitions(tsop); + Map colStatsCached = aspCtx.getParseContext().getColStatsCached(partList); Table table = tsop.getConf().getTableMetadata(); try { // gather statistics for the first time and the attach it to table scan operator - Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop); + Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, colStatsCached, table, tsop); tsop.setStatistics(stats.clone()); if (LOG.isDebugEnabled()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index d6695cc..b9a8ceb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -232,6 +232,7 @@ import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -1092,7 +1093,10 @@ RelNode logicalPlan() throws SemanticException { if (this.columnAccessInfo == null) { this.columnAccessInfo = new ColumnAccessInfo(); } - calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, this.columnAccessInfo); + calcitePlannerAction = new CalcitePlannerAction( + prunedPartitions, + ctx.getOpContext().getColStatsCache(), + this.columnAccessInfo); try { optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks @@ -1129,7 +1133,10 @@ Operator getOptimizedHiveOPDag() throws SemanticException { if (this.columnAccessInfo == null) { this.columnAccessInfo = new ColumnAccessInfo(); } - calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, this.columnAccessInfo); + calcitePlannerAction = new CalcitePlannerAction( + prunedPartitions, + ctx.getOpContext().getColStatsCache(), + this.columnAccessInfo); try { optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks @@ -1291,6 +1298,7 @@ private RowResolver genRowResolver(Operator op, QB qb) { private RelOptCluster cluster; private RelOptSchema relOptSchema; private final Map partitionCache; + private final Map> colStatsCache; private final ColumnAccessInfo columnAccessInfo; private Map viewProjectToTableSchema; @@ -1308,8 +1316,12 @@ private RowResolver genRowResolver(Operator op, QB qb) { LinkedHashMap relToHiveRR = new LinkedHashMap(); LinkedHashMap> relToHiveColNameCalcitePosMap = new LinkedHashMap>(); - CalcitePlannerAction(Map partitionCache, ColumnAccessInfo columnAccessInfo) { + CalcitePlannerAction( + Map partitionCache, + Map> colStatsCache, + ColumnAccessInfo columnAccessInfo) { this.partitionCache = partitionCache; + this.colStatsCache = colStatsCache; this.columnAccessInfo = columnAccessInfo; } @@ -2331,7 +2343,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, - partitionCache, noColsMissingStats); + partitionCache, colStatsCache, noColsMissingStats); // Build Druid query String address = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); @@ -2376,7 +2388,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, - partitionCache, noColsMissingStats); + partitionCache, colStatsCache, noColsMissingStats); // Build Hive Table Scan Rel tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable, null == tableAlias ? tabMetaData.getTableName() : tableAlias, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 565fbef..4193e1d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.CreateViewDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -426,6 +427,24 @@ public void setOpToSamplePruner( } /** + * @param partList + * @param tsop + * @return col statistics map + */ + public Map> getColStatsCache() { + return ctx.getOpContext().getColStatsCache(); + } + + /** + * @param tsop + * @param tsop + * @return col statistics map + */ + public Map getColStatsCached(PrunedPartitionList partList) { + return ctx.getOpContext().getColStatsCache().get(partList.getKey()); + } + + /** * @return pruned partition map */ public Map getPrunedPartitions() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java index da2e1e2..656455f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java @@ -30,6 +30,9 @@ */ public class PrunedPartitionList { + /** Key to identify this partition list. */ + private final String ppListKey; + /** Source table. */ private final Table source; @@ -42,9 +45,19 @@ /** Whether there are partitions in the list that may or may not satisfy the criteria. */ private boolean hasUnknowns; - public PrunedPartitionList(Table source, Set partitions, List referred, - boolean hasUnknowns) { + public PrunedPartitionList(Table source, Set partitions, + List referred, boolean hasUnknowns) { this.source = source; + this.ppListKey = null; + this.referred = referred; + this.partitions = partitions; + this.hasUnknowns = hasUnknowns; + } + + public PrunedPartitionList(Table source, String key, Set partitions, + List referred, boolean hasUnknowns) { + this.source = source; + this.ppListKey = key; this.referred = referred; this.partitions = partitions; this.hasUnknowns = hasUnknowns; @@ -54,6 +67,10 @@ public Table getSourceTable() { return source; } + public String getKey() { + return ppListKey; + } + /** * @return partitions */ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 4faec05..bc6e0d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -402,10 +402,13 @@ public SemanticAnalyzer(QueryState queryState) throws SemanticException { } @Override - protected void reset(boolean clearPartsCache) { + protected void reset(boolean clearCache) { super.reset(true); - if(clearPartsCache) { + if(clearCache) { prunedPartitions.clear(); + if (ctx != null) { + ctx.getOpContext().getColStatsCache().clear(); + } //When init(true) combine with genResolvedParseTree, it will generate Resolved Parse tree from syntax tree //ReadEntity created under these conditions should be all relevant to the syntax tree even the ones without parents diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 3b9ab41..eb26c2d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -22,7 +22,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -57,7 +56,6 @@ import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.ColStatistics.Range; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -135,7 +133,7 @@ * @return statistics object * @throws HiveException */ - public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, + public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, Map colStatsCache, Table table, TableScanOperator tableScanOperator) throws HiveException { // column level statistics are required only for the columns that are needed @@ -143,20 +141,22 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa List neededColumns = tableScanOperator.getNeededColumns(); List referencedColumns = tableScanOperator.getReferencedColumns(); - return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns); + return collectStatistics(conf, partList, table, schema, neededColumns, colStatsCache, referencedColumns); } private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, - Table table, List schema, List neededColumns, + Table table, List schema, List neededColumns, Map colStatsCache, List referencedColumns) throws HiveException { boolean fetchColStats = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS); boolean fetchPartStats = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_PARTITION_STATS); + boolean testMode = + HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST); - return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns, - fetchColStats, fetchPartStats); + return collectStatistics(conf, partList, table, schema, neededColumns, colStatsCache, referencedColumns, + fetchColStats, fetchPartStats, testMode); } private static long getDataSize(HiveConf conf, Table table) { @@ -193,9 +193,17 @@ private static long getNumRows(HiveConf conf, List schema, List schema, List neededColumns, + Table table, List schema, List neededColumns, Map colStatsCache, List referencedColumns, boolean fetchColStats, boolean fetchPartStats) throws HiveException { + return collectStatistics(conf, partList, table, schema, neededColumns, colStatsCache, + referencedColumns, fetchColStats, fetchPartStats, false); + } + + private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, + Table table, List schema, List neededColumns, Map colStatsCache, + List referencedColumns, boolean fetchColStats, boolean fetchPartStats, boolean failIfCacheMiss) + throws HiveException { Statistics stats = new Statistics(); @@ -209,11 +217,11 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa stats.setNumRows(nr); List colStats = Lists.newArrayList(); if (fetchColStats) { - colStats = getTableColumnStats(table, schema, neededColumns); + colStats = getTableColumnStats(table, schema, neededColumns, colStatsCache); long betterDS = getDataSizeFromColumnStats(nr, colStats); ds = (betterDS < 1 || colStats.isEmpty()) ? ds : betterDS; } - stats.setDataSize(ds); + stats.setDataSize(ds); // infer if any column can be primary key based on column statistics inferAndSetPrimaryKey(stats.getNumRows(), colStats); @@ -274,54 +282,104 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa stats.setBasicStatsState(State.PARTIAL); } if (fetchColStats) { + List partitionCols = getPartitionColumns( + schema, neededColumns, referencedColumns); + + // We will retrieve stats from the metastore only for columns that are not cached + List neededColsToRetrieve; + List partitionColsToRetrieve; + List columnStats = new ArrayList<>(); + if (colStatsCache != null) { + neededColsToRetrieve = new ArrayList(neededColumns.size()); + for (String colName : neededColumns) { + ColStatistics colStats = colStatsCache.get(colName); + if (colStats == null) { + neededColsToRetrieve.add(colName); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + colName + + " in table " + table.getCompleteName() + " could not be retrieved from cache"); + } + if (failIfCacheMiss) { + throw new HiveException("Cache has been loaded in logical planning phase for all columns; " + + "however, stats for column " + colName + " in table " + table.getCompleteName() + " could not be retrieved"); + } + } else { + columnStats.add(colStats); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + colName + + " in table " + table.getCompleteName() + " retrieved from cache"); + } + } + } + partitionColsToRetrieve = new ArrayList<>(partitionCols.size()); + for (String colName : partitionCols) { + ColStatistics colStats = colStatsCache.get(colName); + if (colStats == null) { + partitionColsToRetrieve.add(colName); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + colName + + " in table " + table.getCompleteName() + " could not be retrieved from cache"); + } + if (failIfCacheMiss) { + throw new HiveException("Cache has been loaded in logical planning phase for all columns; " + + "however, stats for column " + colName + " in table " + table.getCompleteName() + " could not be retrieved"); + } + } else { + columnStats.add(colStats); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + colName + + " in table " + table.getCompleteName() + " retrieved from cache"); + } + } + } + } else { + neededColsToRetrieve = neededColumns; + partitionColsToRetrieve = partitionCols; + } + List partNames = new ArrayList(partList.getNotDeniedPartns().size()); for (Partition part : partList.getNotDeniedPartns()) { partNames.add(part.getName()); } - neededColumns = processNeededColumns(schema, neededColumns); AggrStats aggrStats = null; // We check the sizes of neededColumns and partNames here. If either // size is 0, aggrStats is null after several retries. Thus, we can // skip the step to connect to the metastore. - if (neededColumns.size() > 0 && partNames.size() > 0) { + if (neededColsToRetrieve.size() > 0 && partNames.size() > 0) { aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(), - neededColumns, partNames); + neededColsToRetrieve, partNames); } - if (null == aggrStats || null == aggrStats.getColStats() - || aggrStats.getColStatsSize() == 0) { + boolean statsRetrieved = aggrStats != null && + aggrStats.getColStats() != null && aggrStats.getColStatsSize() != 0; + if (columnStats.isEmpty() && !statsRetrieved) { // There are some partitions with no state (or we didn't fetch any state). // Update the stats with empty list to reflect that in the // state/initialize structures. - List emptyStats = Lists.newArrayList(); - - // add partition column stats - addParitionColumnStats(conf, neededColumns, referencedColumns, schema, table, partList, - emptyStats); - stats.addToColumnStats(emptyStats); - stats.addToDataSize(getDataSizeFromColumnStats(nr, emptyStats)); - stats.updateColumnStatsState(deriveStatType(emptyStats, referencedColumns)); + addPartitionColumnStats(conf, partitionColsToRetrieve, schema, table, partList, columnStats); + stats.addToColumnStats(columnStats); + stats.addToDataSize(getDataSizeFromColumnStats(nr, columnStats)); + stats.updateColumnStatsState(deriveStatType(columnStats, referencedColumns)); } else { - List colStats = aggrStats.getColStats(); - if (colStats.size() != neededColumns.size()) { - LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to" + - " retrieve for " + colStats.size() + " columns"); + if (statsRetrieved) { + columnStats.addAll(convertColStats(aggrStats.getColStats(), table.getTableName())); + if (aggrStats.getColStats().size() != neededColsToRetrieve.size()) { + LOG.debug("Column stats requested for : {} columns. Able to retrieve for {} columns", + neededColsToRetrieve.size(), aggrStats.getColStats().size()); + } } - List columnStats = convertColStats(colStats, table.getTableName()); - - addParitionColumnStats(conf, neededColumns, referencedColumns, schema, table, partList, - columnStats); + addPartitionColumnStats(conf, partitionColsToRetrieve, schema, table, partList, columnStats); long betterDS = getDataSizeFromColumnStats(nr, columnStats); stats.setDataSize((betterDS < 1 || columnStats.isEmpty()) ? ds : betterDS); // infer if any column can be primary key based on column statistics inferAndSetPrimaryKey(stats.getNumRows(), columnStats); - stats.addToColumnStats(columnStats); State colState = deriveStatType(columnStats, referencedColumns); - if (aggrStats.getPartsFound() != partNames.size() && colState != State.NONE) { - LOG.debug("Column stats requested for : " + partNames.size() + " partitions. " - + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions"); + + if (aggrStats != null && aggrStats.getPartsFound() != partNames.size() && colState != State.NONE) { colState = State.PARTIAL; + LOG.debug("Column stats requested for : {} partitions. Able to retrieve for {} partitions", + partNames.size(), aggrStats.getPartsFound()); } stats.setColumnStatsState(colState); } @@ -415,13 +473,25 @@ private static boolean isWithin(ColStatistics.Range range1, ColStatistics.Range return false; } - private static void addParitionColumnStats(HiveConf conf, List neededColumns, - List referencedColumns, List schema, Table table, - PrunedPartitionList partList, List colStats) - throws HiveException { + private static void addPartitionColumnStats(HiveConf conf, List partitionCols, + List schema, Table table, PrunedPartitionList partList, List colStats) + throws HiveException { + for (String col : partitionCols) { + for (ColumnInfo ci : schema) { + // conditions for being partition column + if (col.equals(ci.getInternalName())) { + colStats.add(getColStatsForPartCol(ci, new PartitionIterable(partList.getPartitions()), conf)); + } + } + } + } + private static List getPartitionColumns(List schema, + List neededColumns, + List referencedColumns) { // extra columns is difference between referenced columns vs needed // columns. The difference could be partition columns. + List partitionCols = new ArrayList<>(referencedColumns.size()); List extraCols = Lists.newArrayList(referencedColumns); if (referencedColumns.size() > neededColumns.size()) { extraCols.removeAll(neededColumns); @@ -430,11 +500,12 @@ private static void addParitionColumnStats(HiveConf conf, List neededCol // conditions for being partition column if (col.equals(ci.getInternalName()) && ci.getIsVirtualCol() && !ci.isHiddenVirtualCol()) { - colStats.add(getColStatsForPartCol(ci, new PartitionIterable(partList.getPartitions()), conf)); + partitionCols.add(col); } } } } + return partitionCols; } public static ColStatistics getColStatsForPartCol(ColumnInfo ci,PartitionIterable partList, HiveConf conf) { @@ -791,23 +862,49 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String tab * @return column statistics */ public static List getTableColumnStats( - Table table, List schema, List neededColumns) { + Table table, List schema, List neededColumns, + Map colStatsCache) { if (table.isMaterializedTable()) { LOG.debug("Materialized table does not contain table statistics"); return null; } + // We will retrieve stats from the metastore only for columns that are not cached + List colStatsToRetrieve; + if (colStatsCache != null) { + colStatsToRetrieve = new ArrayList<>(neededColumns.size()); + for (String colName : neededColumns) { + if (!colStatsCache.containsKey(colName)) { + colStatsToRetrieve.add(colName); + } + } + } else { + colStatsToRetrieve = neededColumns; + } + // Retrieve stats from metastore String dbName = table.getDbName(); String tabName = table.getTableName(); - List neededColsInTable = processNeededColumns(schema, neededColumns); List stats = null; try { List colStat = Hive.get().getTableColumnStatistics( - dbName, tabName, neededColsInTable); + dbName, tabName, colStatsToRetrieve); stats = convertColStats(colStat, tabName); } catch (HiveException e) { LOG.error("Failed to retrieve table statistics: ", e); stats = null; } + // Merge stats from cache with metastore cache + if (colStatsCache != null) { + for (int i = 0; i < neededColumns.size(); i++) { + ColStatistics cs = colStatsCache.get(neededColumns.get(i)); + if (cs != null) { + stats.add(i, cs); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + cs.getColumnName() + + " in table " + table.getCompleteName() + " retrieved from cache"); + } + } + } + } return stats; } @@ -824,22 +921,6 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String tab } return stats; } - private static List processNeededColumns(List schema, - List neededColumns) { - // Remove hidden virtual columns, as well as needed columns that are not - // part of the table. TODO: the latter case should not really happen... - List neededColsInTable = null; - int limit = neededColumns.size(); - for (int i = 0; i < limit; ++i) { - if (neededColsInTable == null) { - neededColsInTable = Lists.newArrayList(neededColumns); - } - neededColsInTable.remove(i--); - --limit; - } - return (neededColsInTable == null || neededColsInTable.size() == 0) ? neededColumns - : neededColsInTable; - } /** * Get the raw data size of variable length data types