diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 0cac439..9d78048 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -18,13 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer.stats.annotation; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -72,8 +67,12 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.serde.serdeConstants; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; public class StatsRulesProcFactory { @@ -165,7 +164,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, sop.getSchema()); long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats); stats.setColumnStats(colStats); - stats.setDataSize(dataSize); + stats.setDataSize(setMaxIfInvalid(dataSize)); sop.setStatistics(stats); if (LOG.isDebugEnabled()) { @@ -251,7 +250,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ExprNodeDesc pred = fop.getConf().getPredicate(); // evaluate filter expression and update statistics - long newNumRows = evaluateExpression(parentStats, pred, aspCtx, neededCols); + long newNumRows = evaluateExpression(parentStats, pred, aspCtx, + neededCols, fop); Statistics st = parentStats.clone(); if (satisfyPrecondition(parentStats)) { @@ -261,7 +261,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // result in number of rows getting more than the input rows in // which case stats need not be updated if (newNumRows <= parentStats.getNumRows()) { - updateStats(st, newNumRows, true); + updateStats(st, newNumRows, true, fop); } if (LOG.isDebugEnabled()) { @@ -271,7 +271,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // update only the basic statistics in the absence of column statistics if (newNumRows <= parentStats.getNumRows()) { - updateStats(st, newNumRows, false); + updateStats(st, newNumRows, false, fop); } if (LOG.isDebugEnabled()) { @@ -288,7 +288,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } private long evaluateExpression(Statistics stats, ExprNodeDesc pred, - AnnotateStatsProcCtx aspCtx, List neededCols) throws CloneNotSupportedException { + AnnotateStatsProcCtx aspCtx, List neededCols, + FilterOperator fop) throws CloneNotSupportedException { long newNumRows = 0; Statistics andStats = null; @@ -303,24 +304,26 @@ private long evaluateExpression(Statistics stats, ExprNodeDesc pred, // evaluate children for (ExprNodeDesc child : genFunc.getChildren()) { - newNumRows = evaluateChildExpr(aspCtx.getAndExprStats(), child, aspCtx, neededCols); + newNumRows = evaluateChildExpr(aspCtx.getAndExprStats(), child, + aspCtx, neededCols, fop); if (satisfyPrecondition(aspCtx.getAndExprStats())) { - updateStats(aspCtx.getAndExprStats(), newNumRows, true); + updateStats(aspCtx.getAndExprStats(), newNumRows, true, fop); } else { - updateStats(aspCtx.getAndExprStats(), newNumRows, false); + updateStats(aspCtx.getAndExprStats(), newNumRows, false, fop); } } } else if (udf instanceof GenericUDFOPOr) { // for OR condition independently compute and update stats for (ExprNodeDesc child : genFunc.getChildren()) { - newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols); + newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols, + fop); } } else if (udf instanceof GenericUDFOPNot) { - newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols); + newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols, fop); } else { // single predicate condition - newNumRows = evaluateChildExpr(stats, pred, aspCtx, neededCols); + newNumRows = evaluateChildExpr(stats, pred, aspCtx, neededCols, fop); } } else if (pred instanceof ExprNodeColumnDesc) { @@ -352,8 +355,9 @@ private long evaluateExpression(Statistics stats, ExprNodeDesc pred, return newNumRows; } - private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred, AnnotateStatsProcCtx aspCtx, - List neededCols) throws CloneNotSupportedException { + private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred, + AnnotateStatsProcCtx aspCtx, List neededCols, FilterOperator fop) + throws CloneNotSupportedException { long numRows = stats.getNumRows(); @@ -365,8 +369,9 @@ private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred, AnnotateStatsP // GenericUDF long newNumRows = 0; - for (ExprNodeDesc child : ((ExprNodeGenericFuncDesc) pred).getChildren()) { - newNumRows = evaluateChildExpr(stats, child, aspCtx, neededCols); + for (ExprNodeDesc child : genFunc.getChildren()) { + newNumRows = evaluateChildExpr(stats, child, aspCtx, neededCols, + fop); } return numRows - newNumRows; } else if (leaf instanceof ExprNodeConstantDesc) { @@ -399,8 +404,7 @@ private long evaluateNotExpr(Statistics stats, ExprNodeDesc pred, AnnotateStatsP return numRows / 2; } - private long evaluateColEqualsNullExpr(Statistics stats, ExprNodeDesc pred, - AnnotateStatsProcCtx aspCtx) { + private long evaluateColEqualsNullExpr(Statistics stats, ExprNodeDesc pred) { long numRows = stats.getNumRows(); @@ -426,7 +430,8 @@ private long evaluateColEqualsNullExpr(Statistics stats, ExprNodeDesc pred, } private long evaluateChildExpr(Statistics stats, ExprNodeDesc child, - AnnotateStatsProcCtx aspCtx, List neededCols) throws CloneNotSupportedException { + AnnotateStatsProcCtx aspCtx, List neededCols, + FilterOperator fop) throws CloneNotSupportedException { long numRows = stats.getNumRows(); @@ -435,7 +440,8 @@ private long evaluateChildExpr(Statistics stats, ExprNodeDesc child, ExprNodeGenericFuncDesc genFunc = (ExprNodeGenericFuncDesc) child; GenericUDF udf = genFunc.getGenericUDF(); - if (udf instanceof GenericUDFOPEqual || udf instanceof GenericUDFOPEqualNS) { + if (udf instanceof GenericUDFOPEqual || + udf instanceof GenericUDFOPEqualNS) { String colName = null; String tabAlias = null; boolean isConst = false; @@ -507,13 +513,13 @@ private long evaluateChildExpr(Statistics stats, ExprNodeDesc child, || udf instanceof GenericUDFOPLessThan) { return numRows / 3; } else if (udf instanceof GenericUDFOPNotNull) { - long newNumRows = evaluateColEqualsNullExpr(stats, genFunc, aspCtx); + long newNumRows = evaluateColEqualsNullExpr(stats, genFunc); return stats.getNumRows() - newNumRows; } else if (udf instanceof GenericUDFOPNull) { - return evaluateColEqualsNullExpr(stats, genFunc, aspCtx); + return evaluateColEqualsNullExpr(stats, genFunc); } else if (udf instanceof GenericUDFOPAnd || udf instanceof GenericUDFOPOr || udf instanceof GenericUDFOPNot) { - return evaluateExpression(stats, genFunc, aspCtx, neededCols); + return evaluateExpression(stats, genFunc, aspCtx, neededCols, fop); } } @@ -618,7 +624,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } // map side - if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) { + if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator || + gop.getChildOperators().get(0) instanceof AppMasterEventOperator) { // since we do not know if hash-aggregation will be enabled or disabled // at runtime we will assume that map-side group by does not do any @@ -631,8 +638,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // take into account the map-side parallelism as well, default is 1 multiplier *= mapSideParallelism; - newNumRows = multiplier * stats.getNumRows(); - long dataSize = multiplier * stats.getDataSize(); + newNumRows = setMaxIfInvalid(multiplier * stats.getNumRows()); + long dataSize = setMaxIfInvalid(multiplier * stats.getDataSize()); stats.setNumRows(newNumRows); stats.setDataSize(dataSize); for (ColStatistics cs : colStats) { @@ -646,13 +653,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // map side no grouping set newNumRows = stats.getNumRows() * mapSideParallelism; - updateStats(stats, newNumRows, true); + updateStats(stats, newNumRows, true, gop); } } else { // reduce side newNumRows = applyGBYRule(stats.getNumRows(), dvProd); - updateStats(stats, newNumRows, true); + updateStats(stats, newNumRows, true, gop); } } else { if (parentStats != null) { @@ -668,7 +675,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // reduce side stats = parentStats.clone(); long newNumRows = parentStats.getNumRows() / 2; - updateStats(stats, newNumRows, false); + updateStats(stats, newNumRows, false, gop); } } } @@ -702,7 +709,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // only if the column stats is available, update the data size from // the column stats if (!stats.getColumnStatsState().equals(Statistics.State.NONE)) { - updateStats(stats, stats.getNumRows(), true); + updateStats(stats, stats.getNumRows(), true, gop); } } @@ -711,7 +718,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // rows will be 1 if (colExprMap.isEmpty()) { stats.setNumRows(1); - updateStats(stats, 1, true); + updateStats(stats, 1, true, gop); } } @@ -941,19 +948,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, long newRowCount = computeNewRowCount( Lists.newArrayList(rowCountParents.values()), denom); - if (newRowCount <= 0 && LOG.isDebugEnabled()) { - newRowCount = 0; - LOG.debug("[0] STATS-" + jop.toString() + ": Product of #rows might be greater than" - + " denominator or overflow might have occurred. Resetting row count to 0." - + " #Rows of parents: " + rowCountParents.toString() + ". Denominator: " + denom); - } - - updateStatsForJoinType(stats, newRowCount, jop.getConf(), - rowCountParents, outInTabAlias); + updateStatsForJoinType(stats, newRowCount, jop, rowCountParents, + outInTabAlias); jop.setStatistics(stats); if (LOG.isDebugEnabled()) { - LOG.debug("[1] STATS-" + jop.toString() + ": " + stats.extendedToString()); + LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString()); } } else { @@ -981,14 +981,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, long maxDataSize = parentSizes.get(maxRowIdx); long newNumRows = (long) (joinFactor * maxRowCount * (numParents - 1)); long newDataSize = (long) (joinFactor * maxDataSize * (numParents - 1)); - Statistics wcStats = new Statistics(); - wcStats.setNumRows(newNumRows); - wcStats.setDataSize(newDataSize); + wcStats.setNumRows(setMaxIfInvalid(newNumRows)); + wcStats.setDataSize(setMaxIfInvalid(newDataSize)); jop.setStatistics(wcStats); if (LOG.isDebugEnabled()) { - LOG.debug("[2] STATS-" + jop.toString() + ": " + wcStats.extendedToString()); + LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString()); } } } @@ -1010,8 +1009,15 @@ private Long getEasedOutDenominator(List distinctVals) { } private void updateStatsForJoinType(Statistics stats, long newNumRows, - JoinDesc conf, Map rowCountParents, + CommonJoinOperator jop, + Map rowCountParents, Map outInTabAlias) { + + if (newNumRows <= 0) { + LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows." + + newNumRows + " rows will be set to Long.MAX_VALUE"); + } + newNumRows = setMaxIfInvalid(newNumRows); stats.setNumRows(newNumRows); // scale down/up the column statistics based on the changes in number of @@ -1042,7 +1048,7 @@ private void updateStatsForJoinType(Statistics stats, long newNumRows, stats.setColumnStats(colStats); long newDataSize = StatsUtils .getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(newDataSize); + stats.setDataSize(setMaxIfInvalid(newDataSize)); } private long computeNewRowCount(List rowCountParents, long denom) { @@ -1170,7 +1176,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // if limit is greater than available rows then do not update // statistics if (limit <= parentStats.getNumRows()) { - updateStats(stats, limit, true); + updateStats(stats, limit, true, lop); } lop.setStatistics(stats); @@ -1187,8 +1193,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, long numRows = limit; long avgRowSize = parentStats.getAvgRowSize(); long dataSize = avgRowSize * limit; - wcStats.setNumRows(numRows); - wcStats.setDataSize(dataSize); + wcStats.setNumRows(setMaxIfInvalid(numRows)); + wcStats.setDataSize(setMaxIfInvalid(dataSize)); } lop.setStatistics(wcStats); @@ -1366,7 +1372,15 @@ public static NodeProcessor getDefaultRule() { * @param useColStats * - use column statistics to compute data size */ - static void updateStats(Statistics stats, long newNumRows, boolean useColStats) { + static void updateStats(Statistics stats, long newNumRows, + boolean useColStats, Operator op) { + + if (newNumRows <= 0) { + LOG.info("STATS-" + op.toString() + ": Overflow in number of rows." + + newNumRows + " rows will be set to Long.MAX_VALUE"); + } + + newNumRows = setMaxIfInvalid(newNumRows); long oldRowCount = stats.getNumRows(); double ratio = (double) newNumRows / (double) oldRowCount; stats.setNumRows(newNumRows); @@ -1391,10 +1405,10 @@ static void updateStats(Statistics stats, long newNumRows, boolean useColStats) } stats.setColumnStats(colStats); long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(newDataSize); + stats.setDataSize(setMaxIfInvalid(newDataSize)); } else { long newDataSize = (long) (ratio * stats.getDataSize()); - stats.setDataSize(newDataSize); + stats.setDataSize(setMaxIfInvalid(newDataSize)); } } @@ -1403,4 +1417,13 @@ static boolean satisfyPrecondition(Statistics stats) { && !stats.getColumnStatsState().equals(Statistics.State.NONE); } + /** + * negative number of rows or data sizes are invalid. It could be because of + * long overflow in which case return Long.MAX_VALUE + * @param val - input value + * @return Long.MAX_VALUE if val is negative else val + */ + static long setMaxIfInvalid(long val) { + return val < 0 ? Long.MAX_VALUE : val; + } }