diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 00a1397..b6d88d6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -3119,11 +3119,13 @@ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSu public static int estimateReducers(long totalInputFileSize, long bytesPerReducer, int maxReducers, boolean powersOfTwo) { - int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer); + // data size can overflow long in which case stats annotation sets it to Long.MAX_VALUE. + // If we happen to overflow already adding bytesPerReducer to it will again overflow. + long effectiveFileSize = Math.max(totalInputFileSize, totalInputFileSize + bytesPerReducer - 1); + int reducers = (int) (effectiveFileSize / bytesPerReducer); reducers = Math.max(1, reducers); reducers = Math.min(maxReducers, reducers); - int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; int reducersPowerTwo = (int)Math.pow(2, reducersLog); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java index fef2c29..ce4efd3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java @@ -18,9 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.util.Collection; -import java.util.EnumSet; -import java.util.Stack; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,9 +35,11 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.stats.StatsUtils; -import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL; -import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; +import java.util.Collection; +import java.util.EnumSet; +import java.util.Stack; /** * SetReducerParallelism determines how many reducers should @@ -88,6 +89,7 @@ public Object process(Node nd, Stack stack, } } + numberOfBytes = StatsUtils.getMaxIfOverflow(numberOfBytes); int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer, maxReducers, false); LOG.info("Set parallelism for reduce sink "+sink+" to: "+numReducers); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index d49eddb..305a008 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -170,7 +170,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // in case of select(*) the data size does not change if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) { long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats); - stats.setDataSize(setMaxIfInvalid(dataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(dataSize)); } sop.setStatistics(stats); @@ -828,7 +828,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // for those newly added columns if (!colExprMap.containsKey(ci.getInternalName())) { String colName = ci.getInternalName(); - colName = StatsUtils.stripPrefixFromColumnName(colName); String tabAlias = ci.getTabAlias(); String colType = ci.getTypeName(); ColStatistics cs = new ColStatistics(tabAlias, colName, colType); @@ -956,7 +955,7 @@ private boolean checkMapSideAggregation(GroupByOperator gop, long hashEntrySize = gop.javaHashEntryOverHead + avgKeySize + avgValSize; // estimated hash table size - long estHashTableSize = numEstimatedRows * hashEntrySize; + long estHashTableSize = StatsUtils.getMaxIfOverflow(numEstimatedRows * hashEntrySize); if (estHashTableSize < maxMemHashAgg) { return true; @@ -1065,7 +1064,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // detect if there are multiple attributes in join key ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0); - List keyExprs = rsOp.getConf().getKeyCols(); + List keyExprs = rsOp.getConf().getOutputKeyColumnNames(); numAttr = keyExprs.size(); // infer PK-FK relationship in single attribute join case @@ -1077,7 +1076,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos); Statistics parentStats = parent.getStatistics(); - keyExprs = parent.getConf().getKeyCols(); + keyExprs = parent.getConf().getOutputKeyColumnNames(); // Parent RS may have column statistics from multiple parents. // Populate table alias to row count map, this will be used later to @@ -1096,8 +1095,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // used to quickly look-up for column statistics of join key. // TODO: expressions in join condition will be ignored. assign // internal name for expressions and estimate column statistics for expression. - List fqCols = - StatsUtils.getFullQualifedColNameFromExprs(keyExprs, parent.getColumnExprMap()); + List prefixedKeyExprs = getPrefixedKeyNames(keyExprs); + List fqCols = StatsUtils.getFullyQualifedColNameFromExprs(prefixedKeyExprs, + parent.getColumnExprMap()); joinKeys.put(pos, fqCols); // get column statistics for all output columns @@ -1119,7 +1119,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, for (int idx = 0; idx < numAttr; idx++) { for (Integer i : joinKeys.keySet()) { String col = joinKeys.get(i).get(idx); - col = StatsUtils.stripPrefixFromColumnName(col); ColStatistics cs = joinedColStats.get(col); if (cs != null) { perAttrDVs.add(cs.getCountDistint()); @@ -1142,7 +1141,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } else { for (List jkeys : joinKeys.values()) { for (String jk : jkeys) { - jk = StatsUtils.stripPrefixFromColumnName(jk); ColStatistics cs = joinedColStats.get(jk); if (cs != null) { distinctVals.add(cs.getCountDistint()); @@ -1166,7 +1164,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ExprNodeDesc end = colExprMap.get(key); if (end instanceof ExprNodeColumnDesc) { String colName = ((ExprNodeColumnDesc) end).getColumn(); - colName = StatsUtils.stripPrefixFromColumnName(colName); String tabAlias = ((ExprNodeColumnDesc) end).getTabAlias(); String fqColName = StatsUtils.getFullyQualifiedColumnName(tabAlias, colName); ColStatistics cs = joinedColStats.get(fqColName); @@ -1217,8 +1214,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, long newNumRows = (long) (joinFactor * maxRowCount * (numParents - 1)); long newDataSize = (long) (joinFactor * maxDataSize * (numParents - 1)); Statistics wcStats = new Statistics(); - wcStats.setNumRows(setMaxIfInvalid(newNumRows)); - wcStats.setDataSize(setMaxIfInvalid(newDataSize)); + wcStats.setNumRows(StatsUtils.getMaxIfOverflow(newNumRows)); + wcStats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); jop.setStatistics(wcStats); if (isDebugEnabled) { @@ -1229,6 +1226,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } + private List getPrefixedKeyNames(List keyNames) { + List result = Lists.newArrayList(); + for (String key : keyNames) { + result.add("KEY." + key); + } + return result; + } + private void inferPKFKRelationship() { if (numAttr == 1) { List parentsWithPK = getPrimaryKeyCandidates(parents); @@ -1369,8 +1374,9 @@ private float getSelectivityComplexTree(Operator op) { Operator op = ops.get(i); if (op != null && op instanceof ReduceSinkOperator) { ReduceSinkOperator rsOp = (ReduceSinkOperator) op; - List keys = rsOp.getConf().getKeyCols(); - List fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys, + List keys = rsOp.getConf().getOutputKeyColumnNames(); + List prefixedKeyExprs = getPrefixedKeyNames(keys); + List fqCols = StatsUtils.getFullyQualifedColNameFromExprs(prefixedKeyExprs, rsOp.getColumnExprMap()); if (fqCols.size() == 1) { String joinCol = fqCols.get(0); @@ -1400,8 +1406,9 @@ private float getSelectivityComplexTree(Operator op) { Operator op = ops.get(i); if (op instanceof ReduceSinkOperator) { ReduceSinkOperator rsOp = (ReduceSinkOperator) op; - List keys = rsOp.getConf().getKeyCols(); - List fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys, + List keys = rsOp.getConf().getOutputKeyColumnNames(); + List prefixedKeyExprs = getPrefixedKeyNames(keys); + List fqCols = StatsUtils.getFullyQualifedColNameFromExprs(prefixedKeyExprs, rsOp.getColumnExprMap()); if (fqCols.size() == 1) { String joinCol = fqCols.get(0); @@ -1441,7 +1448,7 @@ private void updateStatsForJoinType(Statistics stats, long newNumRows, LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows." + newNumRows + " rows will be set to Long.MAX_VALUE"); } - newNumRows = setMaxIfInvalid(newNumRows); + newNumRows = StatsUtils.getMaxIfOverflow(newNumRows); stats.setNumRows(newNumRows); // scale down/up the column statistics based on the changes in number of @@ -1472,7 +1479,7 @@ private void updateStatsForJoinType(Statistics stats, long newNumRows, stats.setColumnStats(colStats); long newDataSize = StatsUtils .getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(setMaxIfInvalid(newDataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } private long computeNewRowCount(List rowCountParents, long denom) { @@ -1512,7 +1519,6 @@ private void updateJoinColumnsNDV(Map> joinKeys, // find min NDV for joining columns for (Map.Entry> entry : joinKeys.entrySet()) { String key = entry.getValue().get(joinColIdx); - key = StatsUtils.stripPrefixFromColumnName(key); ColStatistics cs = joinedColStats.get(key); if (cs != null && cs.getCountDistint() < minNDV) { minNDV = cs.getCountDistint(); @@ -1523,7 +1529,6 @@ private void updateJoinColumnsNDV(Map> joinKeys, if (minNDV != Long.MAX_VALUE) { for (Map.Entry> entry : joinKeys.entrySet()) { String key = entry.getValue().get(joinColIdx); - key = StatsUtils.stripPrefixFromColumnName(key); ColStatistics cs = joinedColStats.get(key); if (cs != null) { cs.setCountDistint(minNDV); @@ -1617,8 +1622,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, long numRows = limit; long avgRowSize = parentStats.getAvgRowSize(); long dataSize = avgRowSize * limit; - wcStats.setNumRows(setMaxIfInvalid(numRows)); - wcStats.setDataSize(setMaxIfInvalid(dataSize)); + wcStats.setNumRows(StatsUtils.getMaxIfOverflow(numRows)); + wcStats.setDataSize(StatsUtils.getMaxIfOverflow(dataSize)); } lop.setStatistics(wcStats); @@ -1668,7 +1673,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ColStatistics cs = StatsUtils .getColStatisticsFromExpression(conf, parentStats, end); if (cs != null) { - cs.setColumnName(key); + cs.setColumnName(prefixedKey); colStats.add(cs); } } @@ -1681,7 +1686,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ColStatistics cs = StatsUtils .getColStatisticsFromExpression(conf, parentStats, end); if (cs != null) { - cs.setColumnName(val); + cs.setColumnName(prefixedVal); colStats.add(cs); } } @@ -1815,7 +1820,7 @@ static void updateStats(Statistics stats, long newNumRows, + newNumRows + " rows will be set to Long.MAX_VALUE"); } - newNumRows = setMaxIfInvalid(newNumRows); + newNumRows = StatsUtils.getMaxIfOverflow(newNumRows); long oldRowCount = stats.getNumRows(); double ratio = (double) newNumRows / (double) oldRowCount; stats.setNumRows(newNumRows); @@ -1842,10 +1847,10 @@ static void updateStats(Statistics stats, long newNumRows, } stats.setColumnStats(colStats); long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(setMaxIfInvalid(newDataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } else { long newDataSize = (long) (ratio * stats.getDataSize()); - stats.setDataSize(setMaxIfInvalid(newDataSize)); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); } } @@ -1853,14 +1858,4 @@ static boolean satisfyPrecondition(Statistics stats) { return stats != null && stats.getBasicStatsState().equals(Statistics.State.COMPLETE) && !stats.getColumnStatsState().equals(Statistics.State.NONE); } - - /** - * negative number of rows or data sizes are invalid. It could be because of - * long overflow in which case return Long.MAX_VALUE - * @param val - input value - * @return Long.MAX_VALUE if val is negative else val - */ - static long setMaxIfInvalid(long val) { - return val < 0 ? Long.MAX_VALUE : val; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 1b27c31..27885c2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -1011,12 +1011,10 @@ public static long getWritableSize(ObjectInspector oi, Object value) { if (colExprMap != null && rowSchema != null) { for (ColumnInfo ci : rowSchema.getSignature()) { String outColName = ci.getInternalName(); - outColName = StatsUtils.stripPrefixFromColumnName(outColName); String outTabAlias = ci.getTabAlias(); ExprNodeDesc end = colExprMap.get(outColName); ColStatistics colStat = getColStatisticsFromExpression(conf, parentStats, end); if (colStat != null) { - outColName = StatsUtils.stripPrefixFromColumnName(outColName); colStat.setColumnName(outColName); colStat.setTableAlias(outTabAlias); } @@ -1070,7 +1068,6 @@ public static ColStatistics getColStatisticsFromExpression(HiveConf conf, Statis ExprNodeColumnDesc encd = (ExprNodeColumnDesc) end; colName = encd.getColumn(); tabAlias = encd.getTabAlias(); - colName = stripPrefixFromColumnName(colName); if (encd.getIsPartitionColOrVirtualCol()) { @@ -1300,21 +1297,6 @@ public static long getDataSizeFromColumnStats(long numRows, List } /** - * Remove KEY/VALUE prefix from column name - * @param colName - * - column name - * @return column name - */ - public static String stripPrefixFromColumnName(String colName) { - String stripedName = colName; - if (colName.startsWith("KEY") || colName.startsWith("VALUE")) { - // strip off KEY./VALUE. from column name - stripedName = colName.split("\\.")[1]; - } - return stripedName; - } - - /** * Returns fully qualified name of column * @param tabName * @param colName @@ -1370,31 +1352,26 @@ private static String getFullyQualifiedName(String... names) { * - column expression map * @return list of fully qualified names */ - public static List getFullQualifedColNameFromExprs(List keyExprs, + public static List getFullyQualifedColNameFromExprs(List keyExprs, Map map) { List result = Lists.newArrayList(); if (keyExprs != null) { - for (ExprNodeDesc end : keyExprs) { - String outColName = null; - for (Map.Entry entry : map.entrySet()) { - if (entry.getValue().isSame(end)) { - outColName = entry.getKey(); - outColName = stripPrefixFromColumnName(outColName); - } - } + for (String key : keyExprs) { + ExprNodeDesc end = map.get(key); if (end instanceof ExprNodeColumnDesc) { ExprNodeColumnDesc encd = (ExprNodeColumnDesc) end; - if (outColName == null) { - outColName = encd.getColumn(); - outColName = stripPrefixFromColumnName(outColName); - } String tabAlias = encd.getTabAlias(); - result.add(getFullyQualifiedColumnName(tabAlias, outColName)); + result.add(getFullyQualifiedColumnName(tabAlias, key)); } else if (end instanceof ExprNodeGenericFuncDesc) { ExprNodeGenericFuncDesc enf = (ExprNodeGenericFuncDesc) end; - List cols = getFullQualifedColNameFromExprs(enf.getChildren(), map); - String joinedStr = Joiner.on(".").skipNulls().join(cols); - result.add(joinedStr); + String tabAlias = ""; + for (ExprNodeDesc childEnd : enf.getChildren()) { + if (childEnd instanceof ExprNodeColumnDesc) { + tabAlias = ((ExprNodeColumnDesc) childEnd).getTabAlias(); + break; + } + } + result.add(getFullyQualifiedColumnName(tabAlias, key)); } else if (end instanceof ExprNodeConstantDesc) { ExprNodeConstantDesc encd = (ExprNodeConstantDesc) end; result.add(encd.getValue().toString()); @@ -1439,4 +1416,14 @@ public static long getAvailableMemory(Configuration conf) { conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); return memory; } + + /** + * negative number of rows or data sizes are invalid. It could be because of + * long overflow in which case return Long.MAX_VALUE + * @param val - input value + * @return Long.MAX_VALUE if val is negative else val + */ + public static long getMaxIfOverflow(long val) { + return val < 0 ? Long.MAX_VALUE : val; + } } diff --git ql/src/test/results/clientpositive/annotate_stats_groupby.q.out ql/src/test/results/clientpositive/annotate_stats_groupby.q.out index 41a0083..718b43c 100644 --- ql/src/test/results/clientpositive/annotate_stats_groupby.q.out +++ ql/src/test/results/clientpositive/annotate_stats_groupby.q.out @@ -177,17 +177,17 @@ STAGE PLANS: keys: KEY._col0 (type: string), KEY._col1 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 800 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 658 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: _col0 (type: string), _col1 (type: int), _col2 (type: bigint) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 800 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 658 Basic stats: COMPLETE Column stats: PARTIAL Group By Operator aggregations: min(_col1) keys: _col0 (type: string), _col2 (type: bigint) mode: hash outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL File Output Operator compressed: false table: @@ -203,7 +203,7 @@ STAGE PLANS: key expressions: _col0 (type: string), _col1 (type: bigint) sort order: ++ Map-reduce partition columns: _col0 (type: string), _col1 (type: bigint) - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col2 (type: int) Reduce Operator Tree: Group By Operator @@ -211,14 +211,14 @@ STAGE PLANS: keys: KEY._col0 (type: string), KEY._col1 (type: bigint) mode: mergepartial outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: _col0 (type: string), _col1 (type: bigint), _col2 (type: int) outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL File Output Operator compressed: false - Statistics: Num rows: 8 Data size: 832 Basic stats: COMPLETE Column stats: PARTIAL + Statistics: Num rows: 7 Data size: 686 Basic stats: COMPLETE Column stats: PARTIAL table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat