diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index 56c0163..a721d07 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -63,6 +64,7 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -284,7 +286,7 @@ private static RelNode createTableScan(Table viewTable) { RelOptHiveTable optTable = new RelOptHiveTable(null, fullyQualifiedTabName, rowType, viewTable, nonPartitionColumns, partitionColumns, new ArrayList(), SessionState.get().getConf(), new HashMap(), - new AtomicInteger()); + new HashMap>(), new AtomicInteger()); RelNode tableRel; // 3. Build operator diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index 1d49568..f8b5530 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -77,9 +77,11 @@ final HiveConf hiveConf; private double rowCount = -1; - Map hiveColStatsMap = new HashMap(); + Map hiveColStatsMap = new HashMap<>(); PrunedPartitionList partitionList; Map partitionCache; + Map> colStatsCache; AtomicInteger noColsMissingStats; protected static final Logger LOG = LoggerFactory @@ -89,7 +91,8 @@ public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, RelDataType rowType, Table hiveTblMetadata, List hiveNonPartitionCols, List hivePartitionCols, List hiveVirtualCols, HiveConf hconf, - Map partitionCache, AtomicInteger noColsMissingStats) { + Map partitionCache, Map> colStatsCache, + AtomicInteger noColsMissingStats) { super(calciteSchema, qualifiedTblName, rowType); this.hiveTblMetadata = hiveTblMetadata; this.hiveNonPartitionCols = ImmutableList.copyOf(hiveNonPartitionCols); @@ -100,6 +103,7 @@ public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, this.hiveVirtualCols = ImmutableList.copyOf(hiveVirtualCols); this.hiveConf = hconf; this.partitionCache = partitionCache; + this.colStatsCache = colStatsCache; this.noColsMissingStats = noColsMissingStats; } @@ -136,7 +140,7 @@ public RelOptHiveTable copy(RelDataType newRowType) { // 3. Build new Table return new RelOptHiveTable(this.schema, this.name, newRowType, this.hiveTblMetadata, newHiveNonPartitionCols, newHivePartitionCols, newHiveVirtualCols, - this.hiveConf, this.partitionCache, this.noColsMissingStats); + this.hiveConf, this.partitionCache, this.colStatsCache, this.noColsMissingStats); } @Override @@ -295,6 +299,12 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor computePartitionList(hiveConf, null, new HashSet()); } + Map colStatsCached = colStatsCache.get(partitionList); + if (colStatsCached == null) { + colStatsCached = new HashMap<>(); + colStatsCache.put(partitionList, colStatsCached); + } + // 2. Obtain Col Stats for Non Partition Cols if (nonPartColNamesThatRqrStats.size() > 0) { List hiveColStats; @@ -302,7 +312,7 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor if (!hiveTblMetadata.isPartitioned()) { // 2.1 Handle the case for unpartitioned table. hiveColStats = StatsUtils.getTableColumnStats(hiveTblMetadata, hiveNonPartitionCols, - nonPartColNamesThatRqrStats); + nonPartColNamesThatRqrStats, colStatsCached); // 2.1.1 Record Column Names that we needed stats for but couldn't if (hiveColStats == null) { @@ -345,7 +355,7 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor colNamesFailedStats.clear(); } else { Statistics stats = StatsUtils.collectStatistics(hiveConf, partitionList, - hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, + hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, colStatsCached, nonPartColNamesThatRqrStats, true, true); rowCount = stats.getNumRows(); hiveColStats = new ArrayList(); @@ -370,6 +380,7 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor // the columns in nonPartColIndxsThatRqrStats/nonPartColNamesThatRqrStats/hiveColStats // are in same order hiveColStatsMap.put(nonPartColIndxsThatRqrStats.get(i), hiveColStats.get(i)); + colStatsCached.put(hiveColStats.get(i).getColumnName(), hiveColStats.get(i)); } } } @@ -383,6 +394,7 @@ private void updateColStats(Set projIndxLst, boolean allowNullColumnFor cStats.setCountDistint(getDistinctCount(partitionList.getPartitions(), partColNamesThatRqrStats.get(i))); hiveColStatsMap.put(partColIndxsThatRqrStats.get(i), cStats); + colStatsCached.put(cStats.getColumnName(), cStats); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 6cb0559..fff1bd9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -123,11 +123,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, TableScanOperator tsop = (TableScanOperator) nd; AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx; PrunedPartitionList partList = aspCtx.getParseContext().getPrunedPartitions(tsop); + Map colStatsCached = aspCtx.getParseContext().getColStatsCached(partList); Table table = tsop.getConf().getTableMetadata(); try { // gather statistics for the first time and the attach it to table scan operator - Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop); + Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, colStatsCached, table, tsop); tsop.setStatistics(stats.clone()); if (LOG.isDebugEnabled()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index d6695cc..187f244 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -232,6 +232,7 @@ import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -1092,7 +1093,7 @@ RelNode logicalPlan() throws SemanticException { if (this.columnAccessInfo == null) { this.columnAccessInfo = new ColumnAccessInfo(); } - calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, this.columnAccessInfo); + calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, colStatistics, this.columnAccessInfo); try { optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks @@ -1129,7 +1130,7 @@ Operator getOptimizedHiveOPDag() throws SemanticException { if (this.columnAccessInfo == null) { this.columnAccessInfo = new ColumnAccessInfo(); } - calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, this.columnAccessInfo); + calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, colStatistics, this.columnAccessInfo); try { optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks @@ -1291,6 +1292,7 @@ private RowResolver genRowResolver(Operator op, QB qb) { private RelOptCluster cluster; private RelOptSchema relOptSchema; private final Map partitionCache; + private final Map> colStatsCache; private final ColumnAccessInfo columnAccessInfo; private Map viewProjectToTableSchema; @@ -1308,8 +1310,12 @@ private RowResolver genRowResolver(Operator op, QB qb) { LinkedHashMap relToHiveRR = new LinkedHashMap(); LinkedHashMap> relToHiveColNameCalcitePosMap = new LinkedHashMap>(); - CalcitePlannerAction(Map partitionCache, ColumnAccessInfo columnAccessInfo) { + CalcitePlannerAction( + Map partitionCache, + Map> colStatsCache, + ColumnAccessInfo columnAccessInfo) { this.partitionCache = partitionCache; + this.colStatsCache = colStatsCache; this.columnAccessInfo = columnAccessInfo; } @@ -2331,7 +2337,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, - partitionCache, noColsMissingStats); + partitionCache, colStatsCache, noColsMissingStats); // Build Druid query String address = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); @@ -2376,7 +2382,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, - partitionCache, noColsMissingStats); + partitionCache, colStatsCache, noColsMissingStats); // Build Hive Table Scan Rel tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable, null == tableAlias ? tabMetaData.getTableName() : tableAlias, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 565fbef..ae14275 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.CreateViewDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -86,6 +87,7 @@ // operators with no // reducer private Map prunedPartitions; + private Map> colStatsCache; private Map viewAliasToInput; private Map tabNameToTabObject; @@ -174,6 +176,7 @@ public ParseContext( Context ctx, HashMap idToTableNameMap, int destTableId, UnionProcContext uCtx, List> listMapJoinOpsNoReducer, Map prunedPartitions, + Map> colStats, Map tabNameToTabObject, HashMap opToSamplePruner, GlobalLimitCtx globalLimitCtx, @@ -201,6 +204,7 @@ public ParseContext( this.uCtx = uCtx; this.listMapJoinOpsNoReducer = listMapJoinOpsNoReducer; this.prunedPartitions = prunedPartitions; + this.colStatsCache = colStats; this.tabNameToTabObject = tabNameToTabObject; this.opToSamplePruner = opToSamplePruner; this.nameToSplitSample = nameToSplitSample; @@ -426,6 +430,31 @@ public void setOpToSamplePruner( } /** + * @param partList + * @param tsop + * @return col statistics map + */ + public Map> getColStatsCache() { + return colStatsCache; + } + + /** + * @param tsop + * @param tsop + * @return col statistics map + */ + public Map getColStatsCached(PrunedPartitionList partList) { + return colStatsCache.get(partList); + } + + /** + * @param colStatsCache + */ + public void setColStatsCache(Map> colStatsCache) { + this.colStatsCache = colStatsCache; + } + + /** * @return pruned partition map */ public Map getPrunedPartitions() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 4faec05..5a08af6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -160,6 +160,7 @@ import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; import org.apache.hadoop.hive.ql.plan.CreateViewDesc; @@ -292,6 +293,7 @@ private final HashMap nameToSplitSample; Map> groupOpToInputTables; Map prunedPartitions; + Map> colStatistics; protected List resultSchema; protected CreateViewDesc createVwDesc; protected ArrayList viewsExpanded; @@ -383,6 +385,7 @@ public SemanticAnalyzer(QueryState queryState) throws SemanticException { listMapJoinOpsNoReducer = new ArrayList>(); groupOpToInputTables = new HashMap>(); prunedPartitions = new HashMap(); + colStatistics = new HashMap>(); tabNameToTabObject = new HashMap(); unparseTranslator = new UnparseTranslator(conf); autogenColAliasPrfxLbl = HiveConf.getVar(conf, @@ -402,10 +405,11 @@ public SemanticAnalyzer(QueryState queryState) throws SemanticException { } @Override - protected void reset(boolean clearPartsCache) { + protected void reset(boolean clearCache) { super.reset(true); - if(clearPartsCache) { + if(clearCache) { prunedPartitions.clear(); + colStatistics.clear(); //When init(true) combine with genResolvedParseTree, it will generate Resolved Parse tree from syntax tree //ReadEntity created under these conditions should be all relevant to the syntax tree even the ones without parents @@ -464,6 +468,7 @@ public void initParseCtx(ParseContext pctx) { idToTableNameMap = pctx.getIdToTableNameMap(); uCtx = pctx.getUCtx(); listMapJoinOpsNoReducer = pctx.getListMapJoinOpsNoReducer(); + colStatistics = pctx.getColStatsCache(); prunedPartitions = pctx.getPrunedPartitions(); tabNameToTabObject = pctx.getTabNameToTabObject(); fetchTask = pctx.getFetchTask(); @@ -477,7 +482,7 @@ public ParseContext getParseContext() { new HashSet(joinContext.keySet()), new HashSet(smbMapJoinContext.keySet()), loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx, - listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, + listMapJoinOpsNoReducer, prunedPartitions, colStatistics, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, analyzeRewrite, tableDesc, createVwDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); @@ -11366,7 +11371,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce new HashSet(joinContext.keySet()), new HashSet(smbMapJoinContext.keySet()), loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx, - listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, + listMapJoinOpsNoReducer, prunedPartitions, colStatistics, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, analyzeRewrite, tableDesc, createVwDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 08a8f00..6861c76 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -520,7 +520,7 @@ public ParseContext getParseContext(ParseContext pCtx, List colStatsCache, Table table, TableScanOperator tableScanOperator) throws HiveException { // column level statistics are required only for the columns that are needed @@ -143,11 +141,11 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa List neededColumns = tableScanOperator.getNeededColumns(); List referencedColumns = tableScanOperator.getReferencedColumns(); - return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns); + return collectStatistics(conf, partList, table, schema, neededColumns, colStatsCache, referencedColumns); } private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList, - Table table, List schema, List neededColumns, + Table table, List schema, List neededColumns, Map colStatsCache, List referencedColumns) throws HiveException { boolean fetchColStats = @@ -155,7 +153,7 @@ private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList p boolean fetchPartStats = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_FETCH_PARTITION_STATS); - return collectStatistics(conf, partList, table, schema, neededColumns, referencedColumns, + return collectStatistics(conf, partList, table, schema, neededColumns, colStatsCache, referencedColumns, fetchColStats, fetchPartStats); } @@ -193,7 +191,7 @@ private static long getNumRows(HiveConf conf, List schema, List schema, List neededColumns, + Table table, List schema, List neededColumns, Map colStatsCache, List referencedColumns, boolean fetchColStats, boolean fetchPartStats) throws HiveException { @@ -209,11 +207,11 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa stats.setNumRows(nr); List colStats = Lists.newArrayList(); if (fetchColStats) { - colStats = getTableColumnStats(table, schema, neededColumns); + colStats = getTableColumnStats(table, schema, neededColumns, colStatsCache); long betterDS = getDataSizeFromColumnStats(nr, colStats); ds = (betterDS < 1 || colStats.isEmpty()) ? ds : betterDS; } - stats.setDataSize(ds); + stats.setDataSize(ds); // infer if any column can be primary key based on column statistics inferAndSetPrimaryKey(stats.getNumRows(), colStats); @@ -278,7 +276,6 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa for (Partition part : partList.getNotDeniedPartns()) { partNames.add(part.getName()); } - neededColumns = processNeededColumns(schema, neededColumns); AggrStats aggrStats = null; // We check the sizes of neededColumns and partNames here. If either // size is 0, aggrStats is null after several retries. Thus, we can @@ -791,23 +788,49 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String tab * @return column statistics */ public static List getTableColumnStats( - Table table, List schema, List neededColumns) { + Table table, List schema, List neededColumns, + Map colStatsCache) { if (table.isMaterializedTable()) { LOG.debug("Materialized table does not contain table statistics"); return null; } + // We will retrieve stats from the metastore only for columns that are not cached + List colStatsToRetrieve; + if (colStatsCache != null) { + colStatsToRetrieve = new ArrayList<>(neededColumns.size()); + for (String colName : neededColumns) { + if (!colStatsCache.containsKey(colName)) { + colStatsToRetrieve.add(colName); + } + } + } else { + colStatsToRetrieve = neededColumns; + } + // Retrieve stats from metastore String dbName = table.getDbName(); String tabName = table.getTableName(); - List neededColsInTable = processNeededColumns(schema, neededColumns); List stats = null; try { List colStat = Hive.get().getTableColumnStatistics( - dbName, tabName, neededColsInTable); + dbName, tabName, colStatsToRetrieve); stats = convertColStats(colStat, tabName); } catch (HiveException e) { LOG.error("Failed to retrieve table statistics: ", e); stats = null; } + // Merge stats from cache with metastore cache + if (colStatsCache != null) { + for (int i = 0; i < neededColumns.size(); i++) { + ColStatistics cs = colStatsCache.get(neededColumns.get(i)); + if (cs != null) { + stats.add(i, cs); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats for column " + cs.getColumnName() + + " in table " + table.getCompleteName() + " retrieved from cache"); + } + } + } + } return stats; } @@ -824,22 +847,6 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String tab } return stats; } - private static List processNeededColumns(List schema, - List neededColumns) { - // Remove hidden virtual columns, as well as needed columns that are not - // part of the table. TODO: the latter case should not really happen... - List neededColsInTable = null; - int limit = neededColumns.size(); - for (int i = 0; i < limit; ++i) { - if (neededColsInTable == null) { - neededColsInTable = Lists.newArrayList(neededColumns); - } - neededColsInTable.remove(i--); - --limit; - } - return (neededColsInTable == null || neededColsInTable.size() == 0) ? neededColumns - : neededColsInTable; - } /** * Get the raw data size of variable length data types