diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 838d6b1..1e4901f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -67,8 +67,10 @@ import org.apache.hadoop.hive.serde.serdeConstants; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Stack; public class StatsRulesProcFactory { @@ -803,7 +805,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // statistics object that is combination of statistics from all // relations involved in JOIN Statistics stats = new Statistics(); - List rowCountParents = Lists.newArrayList(); + HashMap rowCountParents = new HashMap(); List distinctVals = Lists.newArrayList(); // 2 relations, multiple attributes @@ -818,9 +820,20 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos); Statistics parentStats = parent.getStatistics(); - rowCountParents.add(parentStats.getNumRows()); List keyExprs = parent.getConf().getKeyCols(); + // Parent RS may have column statistics from multiple parents. + // Populate table alias to row count map, this will be used later to + // scale down/up column statistics based on new row count + // NOTE: JOIN with UNION as parent of RS will not have table alias + // propagated properly. UNION operator does not propagate the table + // alias of subqueries properly to expression nodes. Hence union20.q + // will have wrong number of rows. + Set tableAliases = StatsUtils.getAllTableAlias(parent.getColumnExprMap()); + for (String tabAlias : tableAliases) { + rowCountParents.put(tabAlias, parentStats.getNumRows()); + } + // multi-attribute join key if (keyExprs.size() > 1) { multiAttr = true; @@ -890,6 +903,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Map colExprMap = jop.getColumnExprMap(); RowSchema rs = jop.getSchema(); List outColStats = Lists.newArrayList(); + HashMap outInTabAlias = new HashMap(); for (ColumnInfo ci : rs.getSignature()) { String key = ci.getInternalName(); ExprNodeDesc end = colExprMap.get(key); @@ -901,6 +915,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ColStatistics cs = joinedColStats.get(fqColName); String outColName = key; String outTabAlias = ci.getTabAlias(); + outInTabAlias.put(outTabAlias, tabAlias); if (cs != null) { cs.setColumnName(outColName); cs.setTableAlias(outTabAlias); @@ -911,7 +926,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // update join statistics stats.setColumnStats(outColStats); - long newRowCount = computeNewRowCount(rowCountParents, denom); + long newRowCount = computeNewRowCount( + Lists.newArrayList(rowCountParents.values()), denom); if (newRowCount <= 0 && LOG.isDebugEnabled()) { newRowCount = 0; @@ -920,7 +936,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + " #Rows of parents: " + rowCountParents.toString() + ". Denominator: " + denom); } - updateStatsForJoinType(stats, newRowCount, true, jop.getConf()); + updateStatsForJoinType(stats, newRowCount, jop.getConf(), + rowCountParents, outInTabAlias); jop.setStatistics(stats); if (LOG.isDebugEnabled()) { @@ -967,36 +984,39 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } private void updateStatsForJoinType(Statistics stats, long newNumRows, - boolean useColStats, JoinDesc conf) { - long oldRowCount = stats.getNumRows(); - double ratio = (double) newNumRows / (double) oldRowCount; + JoinDesc conf, HashMap rowCountParents, + HashMap outInTabAlias) { stats.setNumRows(newNumRows); - if (useColStats) { - List colStats = stats.getColumnStats(); - for (ColStatistics cs : colStats) { - long oldDV = cs.getCountDistint(); - long newDV = oldDV; - - // if ratio is greater than 1, then number of rows increases. This can happen - // when some operators like GROUPBY duplicates the input rows in which case - // number of distincts should not change. Update the distinct count only when - // the output number of rows is less than input number of rows. - if (ratio <= 1.0) { - newDV = (long) Math.ceil(ratio * oldDV); - } - // Assumes inner join - // TODO: HIVE-5579 will handle different join types - cs.setNumNulls(0); - cs.setCountDistint(newDV); + // scale down/up the column statistics based on the changes in number of + // rows from each parent. For ex: If there are 2 parents for JOIN operator + // with 1st parent having 200 rows and 2nd parent having 2000 rows. Now if + // the new number of rows after applying join rule is 10, then the column + // stats for columns from 1st parent should be scaled down by 200/10 = 20x + // and stats for columns from 2nd parent should be scaled down by 200x + List colStats = stats.getColumnStats(); + for (ColStatistics cs : colStats) { + long oldRowCount = rowCountParents.get(outInTabAlias.get(cs.getTableAlias())); + double ratio = (double) newNumRows / (double) oldRowCount; + long oldDV = cs.getCountDistint(); + long newDV = oldDV; + + // if ratio is greater than 1, then number of rows increases. This can happen + // when some operators like GROUPBY duplicates the input rows in which case + // number of distincts should not change. Update the distinct count only when + // the output number of rows is less than input number of rows. + if (ratio <= 1.0) { + newDV = (long) Math.ceil(ratio * oldDV); } - stats.setColumnStats(colStats); - long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(newDataSize); - } else { - long newDataSize = (long) (ratio * stats.getDataSize()); - stats.setDataSize(newDataSize); + // Assumes inner join + // TODO: HIVE-5579 will handle different join types + cs.setNumNulls(0); + cs.setCountDistint(newDV); } + stats.setColumnStats(colStats); + long newDataSize = StatsUtils + .getDataSizeFromColumnStats(newNumRows, colStats); + stats.setDataSize(newDataSize); } private long computeNewRowCount(List rowCountParents, long denom) { diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 99b26bd..8100b39 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -18,11 +18,8 @@ package org.apache.hadoop.hive.ql.stats; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -79,8 +76,12 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector; import org.apache.hadoop.io.BytesWritable; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; public class StatsUtils { @@ -1216,4 +1217,33 @@ private static String getFullyQualifiedName(String... names) { } return result; } + + /** + * Returns all table aliases from expression nodes + * @param columnExprMap - column expression map + * @return + */ + public static Set getAllTableAlias( + Map columnExprMap) { + Set result = new HashSet(); + if (columnExprMap != null) { + for (ExprNodeDesc end : columnExprMap.values()) { + getTableAliasFromExprNode(end, result); + } + } + return result; + } + + private static void getTableAliasFromExprNode(ExprNodeDesc end, + Set output) { + + if (end instanceof ExprNodeColumnDesc) { + output.add(((ExprNodeColumnDesc) end).getTabAlias()); + } else if (end instanceof ExprNodeGenericFuncDesc) { + for (ExprNodeDesc child : end.getChildren()) { + getTableAliasFromExprNode(child, output); + } + } + + } } diff --git ql/src/test/results/clientpositive/union20.q.out ql/src/test/results/clientpositive/union20.q.out index 663d128..8151859 100644 --- ql/src/test/results/clientpositive/union20.q.out +++ ql/src/test/results/clientpositive/union20.q.out @@ -130,14 +130,14 @@ STAGE PLANS: 0 {KEY.reducesinkkey0} {VALUE._col0} 1 {KEY.reducesinkkey0} {VALUE._col0} outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 36 Data size: 9792 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 6 Data size: 1632 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 36 Data size: 9792 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 6 Data size: 1632 Basic stats: COMPLETE Column stats: PARTIAL File Output Operator compressed: false - Statistics: Num rows: 36 Data size: 9792 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 6 Data size: 1632 Basic stats: COMPLETE Column stats: PARTIAL table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat