diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index d49eddb..bfbab27 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -18,13 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer.stats.annotation; -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -77,8 +73,13 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; public class StatsRulesProcFactory { @@ -170,7 +171,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // in case of select(*) the data size does not change if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) { long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats); - stats.setDataSize(setMaxIfInvalid(dataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(dataSize)); } sop.setStatistics(stats); @@ -828,7 +829,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // for those newly added columns if (!colExprMap.containsKey(ci.getInternalName())) { String colName = ci.getInternalName(); - colName = StatsUtils.stripPrefixFromColumnName(colName); String tabAlias = ci.getTabAlias(); String colType = ci.getTypeName(); ColStatistics cs = new ColStatistics(tabAlias, colName, colType); @@ -956,7 +956,7 @@ private boolean checkMapSideAggregation(GroupByOperator gop, long hashEntrySize = gop.javaHashEntryOverHead + avgKeySize + avgValSize; // estimated hash table size - long estHashTableSize = numEstimatedRows * hashEntrySize; + long estHashTableSize = StatsUtils.getMaxIfOverflow(numEstimatedRows * hashEntrySize); if (estHashTableSize < maxMemHashAgg) { return true; @@ -1065,7 +1065,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // detect if there are multiple attributes in join key ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0); - List keyExprs = rsOp.getConf().getKeyCols(); + List keyExprs = rsOp.getConf().getOutputKeyColumnNames(); numAttr = keyExprs.size(); // infer PK-FK relationship in single attribute join case @@ -1077,7 +1077,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos); Statistics parentStats = parent.getStatistics(); - keyExprs = parent.getConf().getKeyCols(); + keyExprs = parent.getConf().getOutputKeyColumnNames(); // Parent RS may have column statistics from multiple parents. // Populate table alias to row count map, this will be used later to @@ -1096,8 +1096,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // used to quickly look-up for column statistics of join key. // TODO: expressions in join condition will be ignored. assign // internal name for expressions and estimate column statistics for expression. - List fqCols = - StatsUtils.getFullQualifedColNameFromExprs(keyExprs, parent.getColumnExprMap()); + List fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keyExprs, + parent.getColumnExprMap()); joinKeys.put(pos, fqCols); // get column statistics for all output columns @@ -1119,7 +1119,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, for (int idx = 0; idx < numAttr; idx++) { for (Integer i : joinKeys.keySet()) { String col = joinKeys.get(i).get(idx); - col = StatsUtils.stripPrefixFromColumnName(col); ColStatistics cs = joinedColStats.get(col); if (cs != null) { perAttrDVs.add(cs.getCountDistint()); @@ -1142,7 +1141,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } else { for (List jkeys : joinKeys.values()) { for (String jk : jkeys) { - jk = StatsUtils.stripPrefixFromColumnName(jk); ColStatistics cs = joinedColStats.get(jk); if (cs != null) { distinctVals.add(cs.getCountDistint()); @@ -1166,7 +1164,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ExprNodeDesc end = colExprMap.get(key); if (end instanceof ExprNodeColumnDesc) { String colName = ((ExprNodeColumnDesc) end).getColumn(); - colName = StatsUtils.stripPrefixFromColumnName(colName); String tabAlias = ((ExprNodeColumnDesc) end).getTabAlias(); String fqColName = StatsUtils.getFullyQualifiedColumnName(tabAlias, colName); ColStatistics cs = joinedColStats.get(fqColName); @@ -1217,8 +1214,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, long newNumRows = (long) (joinFactor * maxRowCount * (numParents - 1)); long newDataSize = (long) (joinFactor * maxDataSize * (numParents - 1)); Statistics wcStats = new Statistics(); - wcStats.setNumRows(setMaxIfInvalid(newNumRows)); - wcStats.setDataSize(setMaxIfInvalid(newDataSize)); + wcStats.setNumRows(StatsUtils.getMaxIfOverflow(newNumRows)); + wcStats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); jop.setStatistics(wcStats); if (isDebugEnabled) { @@ -1369,8 +1366,8 @@ private float getSelectivityComplexTree(Operator op) { Operator op = ops.get(i); if (op != null && op instanceof ReduceSinkOperator) { ReduceSinkOperator rsOp = (ReduceSinkOperator) op; - List keys = rsOp.getConf().getKeyCols(); - List fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys, + List keys = rsOp.getConf().getOutputKeyColumnNames(); + List fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keys, rsOp.getColumnExprMap()); if (fqCols.size() == 1) { String joinCol = fqCols.get(0); @@ -1400,8 +1397,8 @@ private float getSelectivityComplexTree(Operator op) { Operator op = ops.get(i); if (op instanceof ReduceSinkOperator) { ReduceSinkOperator rsOp = (ReduceSinkOperator) op; - List keys = rsOp.getConf().getKeyCols(); - List fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys, + List keys = rsOp.getConf().getOutputKeyColumnNames(); + List fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keys, rsOp.getColumnExprMap()); if (fqCols.size() == 1) { String joinCol = fqCols.get(0); @@ -1441,7 +1438,7 @@ private void updateStatsForJoinType(Statistics stats, long newNumRows, LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows." + newNumRows + " rows will be set to Long.MAX_VALUE"); } - newNumRows = setMaxIfInvalid(newNumRows); + newNumRows = StatsUtils.getMaxIfOverflow(newNumRows); stats.setNumRows(newNumRows); // scale down/up the column statistics based on the changes in number of @@ -1472,7 +1469,7 @@ private void updateStatsForJoinType(Statistics stats, long newNumRows, stats.setColumnStats(colStats); long newDataSize = StatsUtils .getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(setMaxIfInvalid(newDataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } private long computeNewRowCount(List rowCountParents, long denom) { @@ -1512,7 +1509,6 @@ private void updateJoinColumnsNDV(Map> joinKeys, // find min NDV for joining columns for (Map.Entry> entry : joinKeys.entrySet()) { String key = entry.getValue().get(joinColIdx); - key = StatsUtils.stripPrefixFromColumnName(key); ColStatistics cs = joinedColStats.get(key); if (cs != null && cs.getCountDistint() < minNDV) { minNDV = cs.getCountDistint(); @@ -1523,7 +1519,6 @@ private void updateJoinColumnsNDV(Map> joinKeys, if (minNDV != Long.MAX_VALUE) { for (Map.Entry> entry : joinKeys.entrySet()) { String key = entry.getValue().get(joinColIdx); - key = StatsUtils.stripPrefixFromColumnName(key); ColStatistics cs = joinedColStats.get(key); if (cs != null) { cs.setCountDistint(minNDV); @@ -1617,8 +1612,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, long numRows = limit; long avgRowSize = parentStats.getAvgRowSize(); long dataSize = avgRowSize * limit; - wcStats.setNumRows(setMaxIfInvalid(numRows)); - wcStats.setDataSize(setMaxIfInvalid(dataSize)); + wcStats.setNumRows(StatsUtils.getMaxIfOverflow(numRows)); + wcStats.setDataSize(StatsUtils.getMaxIfOverflow(dataSize)); } lop.setStatistics(wcStats); @@ -1662,26 +1657,26 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (satisfyPrecondition(parentStats)) { List colStats = Lists.newArrayList(); for (String key : outKeyColNames) { - String prefixedKey = "KEY." + key; + String prefixedKey = Utilities.ReduceField.KEY.toString() + "." + key; ExprNodeDesc end = colExprMap.get(prefixedKey); if (end != null) { ColStatistics cs = StatsUtils .getColStatisticsFromExpression(conf, parentStats, end); if (cs != null) { - cs.setColumnName(key); + cs.setColumnName(prefixedKey); colStats.add(cs); } } } for (String val : outValueColNames) { - String prefixedVal = "VALUE." + val; + String prefixedVal = Utilities.ReduceField.VALUE.toString() + "." + val; ExprNodeDesc end = colExprMap.get(prefixedVal); if (end != null) { ColStatistics cs = StatsUtils .getColStatisticsFromExpression(conf, parentStats, end); if (cs != null) { - cs.setColumnName(val); + cs.setColumnName(prefixedVal); colStats.add(cs); } } @@ -1815,7 +1810,7 @@ static void updateStats(Statistics stats, long newNumRows, + newNumRows + " rows will be set to Long.MAX_VALUE"); } - newNumRows = setMaxIfInvalid(newNumRows); + newNumRows = StatsUtils.getMaxIfOverflow(newNumRows); long oldRowCount = stats.getNumRows(); double ratio = (double) newNumRows / (double) oldRowCount; stats.setNumRows(newNumRows); @@ -1842,10 +1837,10 @@ static void updateStats(Statistics stats, long newNumRows, } stats.setColumnStats(colStats); long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(setMaxIfInvalid(newDataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } else { long newDataSize = (long) (ratio * stats.getDataSize()); - stats.setDataSize(setMaxIfInvalid(newDataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } } @@ -1853,14 +1848,4 @@ static boolean satisfyPrecondition(Statistics stats) { return stats != null && stats.getBasicStatsState().equals(Statistics.State.COMPLETE) && !stats.getColumnStatsState().equals(Statistics.State.NONE); } - - /** - * negative number of rows or data sizes are invalid. It could be because of - * long overflow in which case return Long.MAX_VALUE - * @param val - input value - * @return Long.MAX_VALUE if val is negative else val - */ - static long setMaxIfInvalid(long val) { - return val < 0 ? Long.MAX_VALUE : val; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 1b27c31..5a37614 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -18,14 +18,8 @@ package org.apache.hadoop.hive.ql.stats; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -89,8 +84,14 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; public class StatsUtils { @@ -1011,12 +1012,10 @@ public static long getWritableSize(ObjectInspector oi, Object value) { if (colExprMap != null && rowSchema != null) { for (ColumnInfo ci : rowSchema.getSignature()) { String outColName = ci.getInternalName(); - outColName = StatsUtils.stripPrefixFromColumnName(outColName); String outTabAlias = ci.getTabAlias(); ExprNodeDesc end = colExprMap.get(outColName); ColStatistics colStat = getColStatisticsFromExpression(conf, parentStats, end); if (colStat != null) { - outColName = StatsUtils.stripPrefixFromColumnName(outColName); colStat.setColumnName(outColName); colStat.setTableAlias(outTabAlias); } @@ -1070,7 +1069,6 @@ public static ColStatistics getColStatisticsFromExpression(HiveConf conf, Statis ExprNodeColumnDesc encd = (ExprNodeColumnDesc) end; colName = encd.getColumn(); tabAlias = encd.getTabAlias(); - colName = stripPrefixFromColumnName(colName); if (encd.getIsPartitionColOrVirtualCol()) { @@ -1300,21 +1298,6 @@ public static long getDataSizeFromColumnStats(long numRows, List } /** - * Remove KEY/VALUE prefix from column name - * @param colName - * - column name - * @return column name - */ - public static String stripPrefixFromColumnName(String colName) { - String stripedName = colName; - if (colName.startsWith("KEY") || colName.startsWith("VALUE")) { - // strip off KEY./VALUE. from column name - stripedName = colName.split("\\.")[1]; - } - return stripedName; - } - - /** * Returns fully qualified name of column * @param tabName * @param colName @@ -1363,38 +1346,42 @@ private static String getFullyQualifiedName(String... names) { } /** - * Try to get fully qualified column name from expression node + * Get fully qualified column name from output key column names and column expression map * @param keyExprs - * - expression nodes + * - output key names * @param map * - column expression map * @return list of fully qualified names */ - public static List getFullQualifedColNameFromExprs(List keyExprs, + public static List getFullyQualifedReducerKeyNames(List keyExprs, Map map) { List result = Lists.newArrayList(); if (keyExprs != null) { - for (ExprNodeDesc end : keyExprs) { - String outColName = null; - for (Map.Entry entry : map.entrySet()) { - if (entry.getValue().isSame(end)) { - outColName = entry.getKey(); - outColName = stripPrefixFromColumnName(outColName); + for (String key : keyExprs) { + String colName = key; + ExprNodeDesc end = map.get(colName); + // if we couldn't get expression try prepending "KEY." prefix to reducer key column names + if (end == null) { + colName = Utilities.ReduceField.KEY.toString() + "." + key; + end = map.get(colName); + if (end == null) { + continue; } } if (end instanceof ExprNodeColumnDesc) { ExprNodeColumnDesc encd = (ExprNodeColumnDesc) end; - if (outColName == null) { - outColName = encd.getColumn(); - outColName = stripPrefixFromColumnName(outColName); - } String tabAlias = encd.getTabAlias(); - result.add(getFullyQualifiedColumnName(tabAlias, outColName)); + result.add(getFullyQualifiedColumnName(tabAlias, colName)); } else if (end instanceof ExprNodeGenericFuncDesc) { ExprNodeGenericFuncDesc enf = (ExprNodeGenericFuncDesc) end; - List cols = getFullQualifedColNameFromExprs(enf.getChildren(), map); - String joinedStr = Joiner.on(".").skipNulls().join(cols); - result.add(joinedStr); + String tabAlias = ""; + for (ExprNodeDesc childEnd : enf.getChildren()) { + if (childEnd instanceof ExprNodeColumnDesc) { + tabAlias = ((ExprNodeColumnDesc) childEnd).getTabAlias(); + break; + } + } + result.add(getFullyQualifiedColumnName(tabAlias, colName)); } else if (end instanceof ExprNodeConstantDesc) { ExprNodeConstantDesc encd = (ExprNodeConstantDesc) end; result.add(encd.getValue().toString()); @@ -1439,4 +1426,14 @@ public static long getAvailableMemory(Configuration conf) { conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); return memory; } + + /** + * negative number of rows or data sizes are invalid. It could be because of + * long overflow in which case return Long.MAX_VALUE + * @param val - input value + * @return Long.MAX_VALUE if val is negative else val + */ + public static long getMaxIfOverflow(long val) { + return val < 0 ? Long.MAX_VALUE : val; + } } diff --git ql/src/test/results/clientpositive/annotate_stats_groupby.q.out ql/src/test/results/clientpositive/annotate_stats_groupby.q.out index 41a0083..718b43c 100644 --- ql/src/test/results/clientpositive/annotate_stats_groupby.q.out +++ ql/src/test/results/clientpositive/annotate_stats_groupby.q.out @@ -177,17 +177,17 @@ STAGE PLANS: keys: KEY._col0 (type: string), KEY._col1 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 800 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 658 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: _col0 (type: string), _col1 (type: int), _col2 (type: bigint) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 800 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 658 Basic stats: COMPLETE Column stats: PARTIAL Group By Operator aggregations: min(_col1) keys: _col0 (type: string), _col2 (type: bigint) mode: hash outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL File Output Operator compressed: false table: @@ -203,7 +203,7 @@ STAGE PLANS: key expressions: _col0 (type: string), _col1 (type: bigint) sort order: ++ Map-reduce partition columns: _col0 (type: string), _col1 (type: bigint) - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col2 (type: int) Reduce Operator Tree: Group By Operator @@ -211,14 +211,14 @@ STAGE PLANS: keys: KEY._col0 (type: string), KEY._col1 (type: bigint) mode: mergepartial outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: _col0 (type: string), _col1 (type: bigint), _col2 (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL File Output Operator compressed: false - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat