diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6d0cf15..5fb702d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1271,6 +1271,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { // Statistics HIVESTATSAUTOGATHER("hive.stats.autogather", true, "A flag to gather statistics automatically during the INSERT OVERWRITE command."), + HIVESTATSCOLAUTOGATHER("hive.stats.column.autogather", true, + "A flag to gather column statistics automatically during the INSERT OVERWRITE command."), HIVESTATSDBCLASS("hive.stats.dbclass", "fs", new PatternSet("jdbc(:.*)", "hbase", "counter", "custom", "fs"), "The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'), \n" + "each task writes statistics it has collected in a file on the filesystem, which will be aggregated \n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java new file mode 100644 index 0000000..6885f26 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.QueryProperties; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.ListSinkOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.LoadFileDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; + +/** + * ColumnStatsAutoGatherContext: This is passed to the compiler when set + * hive.stats.autogather=true during the INSERT OVERWRITE command. + * + **/ + +public class ColumnStatsAutoGatherContext { + + public AnalyzeRewriteContext analyzeRewrite; + private final List loadFileWork = new ArrayList<>(); + private final LinkedHashMap, OpParseContext> opParseCtx; + private final HiveConf conf; + private final Operator op; + private final FileSinkOperator fsOp; + + public ColumnStatsAutoGatherContext( + LinkedHashMap, OpParseContext> opParseCtx, HiveConf conf, + Operator op, FileSinkOperator fsOp) { + super(); + this.opParseCtx = opParseCtx; + this.conf = conf; + this.op = op; + this.fsOp = fsOp; + } + + public List getLoadFileWork() { + return loadFileWork; + } + + public AnalyzeRewriteContext getAnalyzeRewrite() { + return analyzeRewrite; + } + + public void setAnalyzeRewrite(AnalyzeRewriteContext analyzeRewrite) { + this.analyzeRewrite = analyzeRewrite; + } + + @SuppressWarnings("unchecked") + public void insertAnalyzePipeline() throws SemanticException{ + // 1. Generate the statement of analyze table [tablename] compute statistics for columns + String analyzeCommand = "analyze table " + fsOp.getConf().getTableInfo().getTableName() + + " compute statistics for columns"; + + // 2. Because it will generate TS-SEL-GBY-RS-GBY-SEL-FS operator + // pipeline, we can get the second operator, i.e., SEL. + Operator selOp = null; + try { + selOp = genSelOpForAnalyze(analyzeCommand); + } catch (IOException | ParseException e) { + throw new SemanticException(e); + } + + // 3. attach this SEL to the operator right before FS + op.getChildOperators().add(selOp); + selOp.getParentOperators().clear(); + selOp.getParentOperators().add(op); + + // 4. address the colExp, colList, etc for the SEL + replaceSelectOperatorProcess((SelectOperator)selOp, op); + } + + @SuppressWarnings("rawtypes") + private Operator genSelOpForAnalyze(String analyzeCommand) throws IOException, ParseException, SemanticException{ + //0. initialization + Context ctx = new Context(conf); + ParseDriver pd = new ParseDriver(); + ASTNode tree = pd.parse(analyzeCommand, ctx); + tree = ParseUtils.findRootNonNullToken(tree); + + //1. get the ColumnStatsSemanticAnalyzer + BaseSemanticAnalyzer baseSem = SemanticAnalyzerFactory.get(conf, tree); + if (!(baseSem instanceof ColumnStatsSemanticAnalyzer)) { + throw new SemanticException("Expecting ColumnStatsSemanticAnalyzer but it is " + + baseSem.getClass().getName()); + } + ColumnStatsSemanticAnalyzer colSem = (ColumnStatsSemanticAnalyzer) baseSem; + + //2. get the rewritten AST + ASTNode ast = colSem.getRewriteASTOnly(tree, this); + baseSem = SemanticAnalyzerFactory.get(conf, ast); + SemanticAnalyzer sem = (SemanticAnalyzer) baseSem; + QB qb = new QB(null, null, false); + ASTNode child = ast; + ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext(); + subPCtx.setContext(ctx); + ((SemanticAnalyzer) sem).initParseCtx(subPCtx); + sem.doPhase1(child, qb, sem.initPhase1Ctx(), null); + sem.getMetaData(qb); + Operator operator = sem.genPlan(qb); + + //3. load the result file of column stats + loadFileWork.addAll(sem.getLoadFileWork()); + + //4. walk the operator tree from FS until TS + while(operator.getParentOperators().size()!=0){ + operator = operator.getParentOperators().get(0); + } + + //5. get the SEL right after TS + return operator.getChildOperators().get(0); + } + + private void replaceSelectOperatorProcess(SelectOperator operator, Operator input) + throws SemanticException { + RowSchema selRS = operator.getSchema(); + ArrayList signature = new ArrayList<>(); + OpParseContext inputCtx = opParseCtx.get(input); + RowResolver inputRR = inputCtx.getRowResolver(); + ArrayList columns = inputRR.getColumnInfos(); + ArrayList colList = new ArrayList(); + ArrayList columnNames = new ArrayList(); + Map columnExprMap = + new HashMap(); + for (int i = 0; i < columns.size(); i++) { + ColumnInfo col = columns.get(i); + colList.add(new ExprNodeColumnDesc(col)); + String internalName = selRS.getColumnNames().get(i); + columnNames.add(internalName); + columnExprMap.put(internalName, new ExprNodeColumnDesc(col)); + signature.add(selRS.getSignature().get(i)); + } + operator.setConf(new SelectDesc(colList, columnNames, true)); + operator.setColumnExprMap(columnExprMap); + selRS.setSignature(signature); + operator.setSchema(selRS); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java index a5f0a7f..adb68dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; @@ -450,4 +449,45 @@ public void analyze(ASTNode ast, Context origCtx) throws SemanticException { analyzeInternal(originalTree); } } + + /** + * @param ast is the original analyze ast + * @param qb is the qb that calls this function + * @param sem is the semantic analyzer that calls this function + * @return + * @throws SemanticException + */ + public ASTNode getRewriteASTOnly(ASTNode ast, ColumnStatsAutoGatherContext context) throws SemanticException { + tbl = getTable(ast); + colNames = getColumnName(ast); + // Save away the original AST + originalTree = ast; + boolean isPartitionStats = isPartitionLevelStats(ast); + Map partSpec = null; + checkForPartitionColumns(colNames, + Utilities.getColumnNamesFromFieldSchema(tbl.getPartitionKeys())); + validateSpecifiedColumnNames(colNames); + if (conf.getBoolVar(ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS) && tbl.isPartitioned()) { + isPartitionStats = true; + } + + if (isPartitionStats) { + isTableLevel = false; + partSpec = getPartKeyValuePairsFromAST(tbl, ast, conf); + handlePartialPartitionSpec(partSpec); + } else { + isTableLevel = true; + } + colType = getColumnTypes(colNames); + int numBitVectors = getNumBitVectorsForNDVEstimation(conf); + rewrittenQuery = genRewrittenQuery(colNames, numBitVectors, partSpec, isPartitionStats); + rewrittenTree = genRewrittenTree(rewrittenQuery); + + context.analyzeRewrite = new AnalyzeRewriteContext(); + context.analyzeRewrite.setTableName(tbl.getDbName() + "." + tbl.getTableName()); + context.analyzeRewrite.setTblLvl(isTableLevel); + context.analyzeRewrite.setColName(colNames); + context.analyzeRewrite.setColType(colType); + return rewrittenTree; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 5872e8e..9888ddb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -76,6 +76,7 @@ private HashMap nameToSplitSample; private List loadTableWork; private List loadFileWork; + private ColumnStatsAutoGatherContext columnStatsAutoGatherContext; private Context ctx; private HiveConf conf; private HashMap idToTableNameMap; @@ -154,6 +155,7 @@ public ParseContext( Set joinOps, Set smbMapJoinOps, List loadTableWork, List loadFileWork, + ColumnStatsAutoGatherContext columnStatsAutoGatherContext, Context ctx, HashMap idToTableNameMap, int destTableId, UnionProcContext uCtx, List> listMapJoinOpsNoReducer, Map prunedPartitions, @@ -173,6 +175,7 @@ public ParseContext( this.smbMapJoinOps = smbMapJoinOps; this.loadFileWork = loadFileWork; this.loadTableWork = loadTableWork; + this.columnStatsAutoGatherContext = columnStatsAutoGatherContext; this.topOps = topOps; this.ctx = ctx; this.idToTableNameMap = idToTableNameMap; @@ -531,4 +534,13 @@ public void setCreateTable(CreateTableDesc createTableDesc) { this.createTableDesc = createTableDesc; } + public ColumnStatsAutoGatherContext getColumnStatsAutoGatherContext() { + return columnStatsAutoGatherContext; + } + + public void setColumnStatsAutoGatherContext( + ColumnStatsAutoGatherContext columnStatsAutoGatherContext) { + this.columnStatsAutoGatherContext = columnStatsAutoGatherContext; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 990440c..99ceac0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; @@ -240,6 +241,7 @@ protected LinkedHashMap, OpParseContext> opParseCtx; private List loadTableWork; private List loadFileWork; + private ColumnStatsAutoGatherContext columnStatsAutoGatherContext; private final Map joinContext; private final Map smbMapJoinContext; private final HashMap topToTable; @@ -409,7 +411,7 @@ public ParseContext getParseContext() { return new ParseContext(conf, opToPartPruner, opToPartList, topOps, new HashSet(joinContext.keySet()), new HashSet(smbMapJoinContext.keySet()), - loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + loadTableWork, loadFileWork, columnStatsAutoGatherContext, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, @@ -586,7 +588,7 @@ ASTNode getAST() { return this.ast; } - protected void setAST(ASTNode newAST) { + public void setAST(ASTNode newAST) { this.ast = newAST; } @@ -9001,13 +9003,27 @@ private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb, qb.getParseInfo().setOuterQueryLimit(limit.intValue()); } if (!SessionState.get().getHiveOperation().equals(HiveOperation.CREATEVIEW)) { - curr = genFileSinkPlan(dest, qb, curr); + Operator op = genFileSinkPlan(dest, qb, curr); + // the following code is used to collect column stats when + // hive.stats.autogather=true + // and it is an insert to table + if (op instanceof FileSinkOperator) { + FileSinkOperator fsOp = (FileSinkOperator) op; + if (fsOp.getConf().getTableInfo().getTableName() != null + && qb.getParseInfo().isInsertToTable() + && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) + && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)) { + columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(opParseCtx, conf, curr, fsOp); + columnStatsAutoGatherContext.insertAnalyzePipeline(); + } + } + curr = op; } } return curr; } - + @SuppressWarnings("nls") private Operator genUnionPlan(String unionalias, String leftalias, Operator leftOp, String rightalias, Operator rightOp) @@ -10135,7 +10151,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce ParseContext pCtx = new ParseContext(conf, opToPartPruner, opToPartList, topOps, new HashSet(joinContext.keySet()), new HashSet(smbMapJoinContext.keySet()), - loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + loadTableWork, loadFileWork, columnStatsAutoGatherContext, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, @@ -12238,4 +12254,12 @@ private void warn(String msg) { SessionState.getConsole().printInfo( String.format("Warning: %s", msg)); } + + public List getLoadFileWork() { + return loadFileWork; + } + + public void setLoadFileWork(List loadFileWork) { + this.loadFileWork = loadFileWork; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index ba11e41..ccb8d32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -209,8 +209,12 @@ public void compile(final ParseContext pCtx, final List loadTableWork, + protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite, List loadFileWork, List> rootTasks, int outerQueryLimit) { ColumnStatsTask cStatsTask = null; ColumnStatsWork cStatsWork = null; @@ -394,7 +398,7 @@ public ParseContext getParseContext(ParseContext pCtx, List