diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index c82bab8..214caa6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.hive.ql.plan.Statistics.State; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; @@ -213,6 +214,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, * satisfy condition2 * *

+ * Worst case: If no column statistics are available, then T(R) = T(R)/2 will be + * used as heuristics. + *

* For more information, refer 'Estimating The Cost Of Operations' chapter in * "Database Systems: The Complete Book" by Garcia-Molina et. al. *

@@ -239,7 +243,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, fop.setStatistics(st); } else { if (parentStats != null) { - fop.setStatistics(parentStats.clone()); + + // worst case, in the absence of column statistics assume half the rows are emitted + Statistics wcStats = getWorstCaseStats(parentStats.clone()); + fop.setStatistics(wcStats); } } @@ -510,6 +517,9 @@ private long evaluateChildExpr(Statistics stats, ExprNodeDesc child, AnnotateSta * assumed. * *

+ * Worst case: If no column statistics are available, then T(R) = T(R)/2 will be + * used as heuristics. + *

* For more information, refer 'Estimating The Cost Of Operations' chapter in * "Database Systems: The Complete Book" by Garcia-Molina et. al. *

@@ -527,13 +537,15 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, HiveConf conf = aspCtx.getConf(); int mapSideParallelism = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_STATS_MAP_SIDE_PARALLELISM); + List aggDesc = gop.getConf().getAggregators(); + Map colExprMap = gop.getColumnExprMap(); + RowSchema rs = gop.getSchema(); + Statistics stats = null; try { if (satisfyPrecondition(parentStats)) { - Statistics stats = parentStats.clone(); - RowSchema rs = gop.getSchema(); - List aggDesc = gop.getConf().getAggregators(); - Map colExprMap = gop.getColumnExprMap(); + stats = parentStats.clone(); + List colStats = StatsUtils.getColStatisticsFromExprMap(conf, parentStats, colExprMap, rs); stats.setColumnStats(colStats); @@ -588,44 +600,54 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, newNumRows = applyGBYRule(stats.getNumRows(), dvProd); updateStats(stats, newNumRows); } + } else { + if (parentStats != null) { - // if UDAFs are present, new columns needs to be added - if (!aggDesc.isEmpty()) { - List aggColStats = Lists.newArrayList(); - for (ColumnInfo ci : rs.getSignature()) { - - // if the columns in row schema is not contained in column - // expression map, then those are the aggregate columns that - // are added GBY operator. we will estimate the column statistics - // for those newly added columns - if (!colExprMap.containsKey(ci.getInternalName())) { - String colName = ci.getInternalName(); - colName = StatsUtils.stripPrefixFromColumnName(colName); - String tabAlias = ci.getTabAlias(); - String colType = ci.getTypeName(); - ColStatistics cs = new ColStatistics(tabAlias, colName, colType); - cs.setCountDistint(stats.getNumRows()); - cs.setNumNulls(0); - cs.setAvgColLen(StatsUtils.getAvgColLenOfFixedLengthTypes(colType)); - aggColStats.add(cs); - } + // worst case, in the absence of column statistics assume half the rows are emitted + if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) { + + // map side + stats = parentStats.clone(); + } else { + + // reduce side + stats = getWorstCaseStats(parentStats); } - stats.addToColumnStats(aggColStats); + } + } + + // if UDAFs are present, new columns needs to be added + if (!aggDesc.isEmpty()) { + List aggColStats = Lists.newArrayList(); + for (ColumnInfo ci : rs.getSignature()) { - // if UDAF present and if column expression map is empty then it must - // be full aggregation query like count(*) in which case number of rows will be 1 - if (colExprMap.isEmpty()) { - stats.setNumRows(1); - updateStats(stats, 1); + // if the columns in row schema is not contained in column + // expression map, then those are the aggregate columns that + // are added GBY operator. we will estimate the column statistics + // for those newly added columns + if (!colExprMap.containsKey(ci.getInternalName())) { + String colName = ci.getInternalName(); + colName = StatsUtils.stripPrefixFromColumnName(colName); + String tabAlias = ci.getTabAlias(); + String colType = ci.getTypeName(); + ColStatistics cs = new ColStatistics(tabAlias, colName, colType); + cs.setCountDistint(stats.getNumRows()); + cs.setNumNulls(0); + cs.setAvgColLen(StatsUtils.getAvgColLenOfFixedLengthTypes(colType)); + aggColStats.add(cs); } } + stats.addToColumnStats(aggColStats); - gop.setStatistics(stats); - } else { - if (parentStats != null) { - gop.setStatistics(parentStats.clone()); + // if UDAF present and if column expression map is empty then it must + // be full aggregation query like count(*) in which case number of rows will be 1 + if (colExprMap.isEmpty()) { + stats.setNumRows(1); + updateStats(stats, 1); } } + + gop.setStatistics(stats); } catch (CloneNotSupportedException e) { throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg()); } @@ -668,6 +690,9 @@ private long applyGBYRule(long numRows, long dvProd) { * attributes * *

+ * Worst case: If no column statistics are available, then T(RXS) = T(R)*T(S)/2 will be + * used as heuristics. + *

* For more information, refer 'Estimating The Cost Of Operations' chapter in * "Database Systems: The Complete Book" by Garcia-Molina et. al. *

@@ -698,7 +723,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } + try { if (allSatisfyPreCondition) { + // statistics object that is combination of statistics from all relations involved in JOIN Statistics stats = new Statistics(); long prodRows = 1; @@ -744,7 +771,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // since new statistics is derived from all relations involved in JOIN, // we need to update the state information accordingly - stats.updateBasicStatsState(parentStats.getBasicStatsState()); stats.updateColumnStatsState(parentStats.getColumnStatsState()); } @@ -812,6 +838,28 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, stats.setNumRows(newRowCount); stats.setDataSize(StatsUtils.getDataSizeFromColumnStats(newRowCount, outColStats)); jop.setStatistics(stats); + } else { + + // worst case, when no column statistics are available + if (parents.size() > 1) { + Statistics wcStats = new Statistics(); + Statistics stp1 = parents.get(0).getStatistics(); + long numRows = stp1.getNumRows(); + long avgRowSize = stp1.getAvgRowSize(); + for (int i = 1; i < parents.size(); i++) { + stp1 = parents.get(i).getStatistics(); + numRows = (numRows * stp1.getNumRows()) / 2; + avgRowSize += stp1.getAvgRowSize(); + } + wcStats.setNumRows(numRows); + wcStats.setDataSize(numRows * avgRowSize); + jop.setStatistics(wcStats); + } else { + jop.setStatistics(parents.get(0).getStatistics().clone()); + } + } + } catch (CloneNotSupportedException e) { + throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg()); } } return null; @@ -857,22 +905,31 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Statistics parentStats = parent.getStatistics(); try { + long limit = -1; + limit = lop.getConf().getLimit(); + if (satisfyPrecondition(parentStats)) { Statistics stats = parentStats.clone(); - long limit = -1; - limit = lop.getConf().getLimit(); - if (limit == -1) { - limit = lop.getConf().getLeastRows(); - } - // if limit is greate than available rows then do not update statistics + // if limit is greater than available rows then do not update statistics if (limit <= parentStats.getNumRows()) { updateStats(stats, limit); } lop.setStatistics(stats); } else { if (parentStats != null) { - lop.setStatistics(parentStats.clone()); + + // in the absence of column statistics, compute data size based on based + // on average row size + Statistics wcStats = parentStats.clone(); + if (limit <= parentStats.getNumRows()) { + long numRows = limit; + long avgRowSize = parentStats.getAvgRowSize(); + long dataSize = avgRowSize * limit; + wcStats.setNumRows(numRows); + wcStats.setDataSize(dataSize); + } + lop.setStatistics(wcStats); } } } catch (CloneNotSupportedException e) { @@ -909,7 +966,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Statistics parentStats = parent.getStatistics(); stats.addToNumRows(parentStats.getNumRows()); stats.addToDataSize(parentStats.getDataSize()); - stats.updateBasicStatsState(parentStats.getBasicStatsState()); stats.updateColumnStatsState(parentStats.getColumnStatsState()); stats.addToColumnStats(parentStats.getColumnStats()); op.getConf().setStatistics(stats); @@ -1001,4 +1057,17 @@ static boolean satisfyPrecondition(Statistics stats) { return stats != null && stats.getBasicStatsState().equals(Statistics.State.COMPLETE) && !stats.getColumnStatsState().equals(Statistics.State.NONE); } + + static Statistics getWorstCaseStats(Statistics stats) throws CloneNotSupportedException { + Statistics wcClone = stats.clone(); + long numRows = wcClone.getNumRows() / 2; + long dataSize = wcClone.getDataSize() / 2; + long avgRowSize = wcClone.getAvgRowSize(); + if (numRows > 0) { + dataSize = avgRowSize * numRows; + } + wcClone.setNumRows(numRows); + wcClone.setDataSize(dataSize); + return wcClone; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java index a16c8ff..baa0b46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java @@ -49,8 +49,8 @@ public Statistics() { } public Statistics(long nr, long ds) { - this.numRows = nr; - this.dataSize = ds; + this.setNumRows(nr); + this.setDataSize(ds); this.basicStatsState = State.NONE; this.columnStats = null; this.columnStatsState = State.NONE; @@ -62,6 +62,7 @@ public long getNumRows() { public void setNumRows(long numRows) { this.numRows = numRows; + updateBasicStatsState(); } public long getDataSize() { @@ -70,6 +71,17 @@ public long getDataSize() { public void setDataSize(long dataSize) { this.dataSize = dataSize; + updateBasicStatsState(); + } + + private void updateBasicStatsState() { + if (numRows <= 0 && dataSize <= 0) { + this.basicStatsState = State.NONE; + } else if (numRows <= 0 || dataSize <= 0) { + this.basicStatsState = State.PARTIAL; + } else { + this.basicStatsState = State.COMPLETE; + } } public State getBasicStatsState() { @@ -120,10 +132,12 @@ public Statistics clone() throws CloneNotSupportedException { public void addToNumRows(long nr) { numRows += nr; + updateBasicStatsState(); } public void addToDataSize(long rds) { dataSize += rds; + updateBasicStatsState(); } public void setColumnStats(Map colStats) { @@ -162,37 +176,14 @@ public void addToColumnStats(List colStats) { } } - // newState + // newState // ----------------------------------------- - // basicStatsState | COMPLETE PARTIAL NONE | + // columnStatsState | COMPLETE PARTIAL NONE | // |________________________________________| // COMPLETE | COMPLETE PARTIAL PARTIAL | // PARTIAL | PARTIAL PARTIAL PARTIAL | // NONE | COMPLETE PARTIAL NONE | // ----------------------------------------- - public void updateBasicStatsState(State newState) { - if (newState.equals(State.PARTIAL)) { - basicStatsState = State.PARTIAL; - } - - if (newState.equals(State.NONE)) { - if (basicStatsState.equals(State.NONE)) { - basicStatsState = State.NONE; - } else { - basicStatsState = State.PARTIAL; - } - } - - if (newState.equals(State.COMPLETE)) { - if (basicStatsState.equals(State.PARTIAL)) { - basicStatsState = State.PARTIAL; - } else { - basicStatsState = State.COMPLETE; - } - } - } - - // similar to the table above for basic stats public void updateColumnStatsState(State newState) { if (newState.equals(State.PARTIAL)) { columnStatsState = State.PARTIAL; @@ -216,11 +207,11 @@ public void updateColumnStatsState(State newState) { } public long getAvgRowSize() { - if (basicStatsState.equals(State.COMPLETE) && numRows != 0) { + if (numRows != 0) { return dataSize / numRows; } - return 0; + return dataSize; } public ColStatistics getColumnStatisticsFromFQColName(String fqColName) { diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 24e7b61..50e1969 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.hive.ql.plan.Statistics.State; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; @@ -98,22 +99,6 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa } } - // if basic stats are not available then return - if (nr <= 0 && rds <= 0) { - stats.setBasicStatsState(Statistics.State.NONE); - return stats; - } - - // if any basic stats is missing, mark it as partial stats - if (nr <= 0 || rds <= 0) { - stats.setBasicStatsState(Statistics.State.PARTIAL); - } - - // if both are available then we have complete basic stats - if (nr > 0 && rds > 0) { - stats.setBasicStatsState(Statistics.State.COMPLETE); - } - // number of rows -1 means that statistics from metastore is not reliable if (nr <= 0) { nr = 0; @@ -177,19 +162,6 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa rds = getSumIgnoreNegatives(dataSizes); } - // basic stats - if (nr <= 0 && rds <= 0) { - stats.updateBasicStatsState(Statistics.State.NONE); - } else if (nr <= 0 || rds <= 0) { - stats.updateBasicStatsState(Statistics.State.PARTIAL); - } else { - if (containsNonPositives(rowCounts) || containsNonPositives(dataSizes)) { - stats.updateBasicStatsState(Statistics.State.PARTIAL); - } else { - stats.updateBasicStatsState(Statistics.State.COMPLETE); - } - } - // number of rows -1 means that statistics from metastore is not reliable if (nr <= 0) { nr = 0; @@ -197,6 +169,11 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa stats.addToNumRows(nr); stats.addToDataSize(rds); + // if atleast a partition does not contain row count then mark basic stats state as PARTIAL + if (containsNonPositives(rowCounts)) { + stats.setBasicStatsState(State.PARTIAL); + } + // column stats for (Partition part : partList.getNotDeniedPartns()) { List colStats = getPartitionColumnStats(table, part, schema, neededColumns); @@ -219,7 +196,6 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa } return stats; - } /**