diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 7b48b8b..5d58839 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -140,6 +140,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; @@ -1095,6 +1096,7 @@ protected synchronized Kryo initialValue() { kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); removeField(kryo, Operator.class, "colExprMap"); removeField(kryo, ColumnInfo.class, "objectInspector"); + removeField(kryo, AbstractOperatorDesc.class, "statistics"); return kryo; }; }; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 571c050..bbf2fdb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -77,6 +77,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.Stack; @@ -1061,7 +1062,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, numAttr = keyExprs.size(); // infer PK-FK relationship in single attribute join case - pkfkInferred = false; inferPKFKRelationship(); // get the join keys from parent ReduceSink operators for (int pos = 0; pos < parents.size(); pos++) { @@ -1197,53 +1197,42 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, private void inferPKFKRelationship() { if (numAttr == 1) { - List parentsWithPK = getPrimaryKeyCandidates(parents); - - // in case of fact to many dimensional tables join, the join key in fact table will be - // mostly foreign key which will have corresponding primary key in dimension table. - // The selectivity of fact table in that case will be product of all selectivities of - // dimension tables (assumes conjunctivity) - for (Integer id : parentsWithPK) { - ColStatistics csPK = null; - Operator parent = parents.get(id); - for (ColStatistics cs : parent.getStatistics().getColumnStats()) { - if (cs.isPrimaryKey()) { - csPK = cs; - break; - } - } + // If numAttr is 1, this means we join on one single key column. + Map parentsWithPK = getPrimaryKeyCandidates(parents); - // infer foreign key candidates positions - List parentsWithFK = getForeignKeyCandidates(parents, csPK); - if (parentsWithFK.size() == 1 && - parentsWithFK.size() + parentsWithPK.size() == parents.size()) { - Operator parentWithFK = parents.get(parentsWithFK.get(0)); - List parentsSel = getSelectivity(parents, parentsWithPK); - Float prodSelectivity = 1.0f; - for (Float selectivity : parentsSel) { - prodSelectivity *= selectivity; - } - newNumRows = (long) Math.ceil( - parentWithFK.getStatistics().getNumRows() * prodSelectivity); - pkfkInferred = true; + // We only allow one single PK. + if (parentsWithPK.size() != 1) { + LOG.debug("STATS-" + jop.toString() + ": detects multiple PK parents."); + return; + } + Integer pkPos = parentsWithPK.keySet().iterator().next(); + ColStatistics csPK = parentsWithPK.values().iterator().next(); - // some debug information - if (isDebugEnabled) { - List parentIds = Lists.newArrayList(); + // infer foreign key candidates positions + Map csFKs = getForeignKeyCandidates(parents, csPK); - // print primary key containing parents - for (Integer i : parentsWithPK) { - parentIds.add(parents.get(i).toString()); - } - LOG.debug("STATS-" + jop.toString() + ": PK parent id(s) - " + parentIds); - parentIds.clear(); + // we allow multiple foreign keys (snowflake schema) + // csfKs.size() + 1 == parents.size() means we have a single PK and all + // the rest ops are FKs. + if (csFKs.size() + 1 == parents.size()) { + getSelectivity(parents, pkPos, csPK, csFKs); - // print foreign key containing parents - for (Integer i : parentsWithFK) { - parentIds.add(parents.get(i).toString()); - } - LOG.debug("STATS-" + jop.toString() + ": FK parent id(s) - " + parentIds); + // some debug information + if (isDebugEnabled) { + List parentIds = Lists.newArrayList(); + + // print primary key containing parents + for (Integer i : parentsWithPK.keySet()) { + parentIds.add(parents.get(i).toString()); } + LOG.debug("STATS-" + jop.toString() + ": PK parent id(s) - " + parentIds); + parentIds.clear(); + + // print foreign key containing parents + for (Integer i : csFKs.keySet()) { + parentIds.add(parents.get(i).toString()); + } + LOG.debug("STATS-" + jop.toString() + ": FK parent id(s) - " + parentIds); } } } @@ -1251,19 +1240,55 @@ private void inferPKFKRelationship() { /** * Get selectivity of reduce sink operators. - * @param ops - reduce sink operators - * @param opsWithPK - reduce sink operators with primary keys - * @return - list of selectivity for primary key containing operators + * @param csPK - ColStatistics for a single primary key + * @param csFKs - ColStatistics for multiple foreign keys */ - private List getSelectivity(List> ops, - List opsWithPK) { - List result = Lists.newArrayList(); - for (Integer idx : opsWithPK) { - Operator op = ops.get(idx); - float selectivity = getSelectivitySimpleTree(op); - result.add(selectivity); + private void getSelectivity(List> ops, Integer pkPos, ColStatistics csPK, + Map csFKs) { + this.pkfkInferred = true; + double pkfkSelectivity = Double.MAX_VALUE; + int fkInd = -1; + // We iterate through all the operators that have candidate FKs and choose + // the one that has the minimum selectivity. This is heuristic and can be + // improved later. + for (Entry entry : csFKs.entrySet()) { + int pos = entry.getKey(); + Operator opWithPK = ops.get(pkPos); + double selectivity = getSelectivitySimpleTree(opWithPK); + double selectivityAdjustment = StatsUtils.getScaledSelectivity(csPK, entry.getValue()); + selectivity = selectivityAdjustment * selectivity > 1 ? selectivity : selectivityAdjustment + * selectivity; + if (selectivity < pkfkSelectivity) { + pkfkSelectivity = selectivity; + fkInd = pos; + } + } + long newrows = 1; + List rowCounts = Lists.newArrayList(); + List distinctVals = Lists.newArrayList(); + for (Entry entry : csFKs.entrySet()) { + int pos = entry.getKey(); + ColStatistics csFK = entry.getValue(); + ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos); + Statistics parentStats = parent.getStatistics(); + if (fkInd == pos) { + // this is the new number of rows after pk is joining with fk + newrows = (long) Math.ceil(parentStats.getNumRows() * pkfkSelectivity); + rowCounts.add(newrows); + // the ndv is the minimun of the pk and the fk. + distinctVals.add(Math.min(csFK.getCountDistint(), csPK.getCountDistint())); + } else { + rowCounts.add(parentStats.getNumRows()); + distinctVals.add(csFK.getCountDistint()); + } + } + if (csFKs.size() == 1) { + // there is only one FK + this.newNumRows = newrows; + } else { + // there is more than one FK + this.newNumRows = this.computeNewRowCount(rowCounts, getDenominator(distinctVals)); } - return result; } private float getSelectivitySimpleTree(Operator op) { @@ -1323,11 +1348,11 @@ private float getSelectivityComplexTree(Operator op) { * primary key range (inferred as foreign keys). * @param ops - operators * @param csPK - column statistics of primary key - * @return - list of foreign key containing parent ids + * @return - a map which contains position ids and the corresponding column statistics */ - private List getForeignKeyCandidates(List> ops, + private Map getForeignKeyCandidates(List> ops, ColStatistics csPK) { - List result = Lists.newArrayList(); + Map result = new HashMap(); if (csPK == null || ops == null) { return result; } @@ -1343,7 +1368,7 @@ private float getSelectivityComplexTree(Operator op) { ColStatistics cs = rsOp.getStatistics().getColumnStatisticsFromColName(joinCol); if (cs != null && !cs.isPrimaryKey()) { if (StatsUtils.inferForeignKey(csPK, cs)) { - result.add(i); + result.put(i,cs); } } } @@ -1358,8 +1383,8 @@ private float getSelectivityComplexTree(Operator op) { * @param ops - operators * @return - list of primary key containing parent ids */ - private List getPrimaryKeyCandidates(List> ops) { - List result = Lists.newArrayList(); + private Map getPrimaryKeyCandidates(List> ops) { + Map result = new HashMap(); if (ops != null && !ops.isEmpty()) { for (int i = 0; i < ops.size(); i++) { Operator op = ops.get(i); @@ -1371,7 +1396,7 @@ private float getSelectivityComplexTree(Operator op) { if (rsOp.getStatistics() != null) { ColStatistics cs = rsOp.getStatistics().getColumnStatisticsFromColName(joinCol); if (cs != null && cs.isPrimaryKey()) { - result.add(i); + result.put(i, cs); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index 4cd9120..ad481bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -300,7 +300,10 @@ public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList pa public static void inferAndSetPrimaryKey(long numRows, List colStats) { if (colStats != null) { for (ColStatistics cs : colStats) { - if (cs != null && cs.getRange() != null && cs.getRange().minValue != null && + if (cs != null && cs.getCountDistint() >= numRows) { + cs.setPrimaryKey(true); + } + else if (cs != null && cs.getRange() != null && cs.getRange().minValue != null && cs.getRange().maxValue != null) { if (numRows == ((cs.getRange().maxValue.longValue() - cs.getRange().minValue.longValue()) + 1)) { @@ -330,6 +333,36 @@ public static boolean inferForeignKey(ColStatistics csPK, ColStatistics csFK) { return false; } + /** + * Scale selectivity based on key range ratio. + * @param csPK - column statistics of primary key + * @param csFK - column statistics of potential foreign key + * @return + */ + public static float getScaledSelectivity(ColStatistics csPK, ColStatistics csFK) { + float scaledSelectivity = 1.0f; + if (csPK != null && csFK != null) { + if (csPK.isPrimaryKey()) { + // Use Max-Min Range as NDV gets scaled by selectivity. + if (csPK.getRange() != null && csFK.getRange() != null) { + long pkRangeDelta = getRangeDelta(csPK.getRange()); + long fkRangeDelta = getRangeDelta(csFK.getRange()); + if (fkRangeDelta > 0 && pkRangeDelta > 0 && fkRangeDelta < pkRangeDelta) { + scaledSelectivity = (float) pkRangeDelta / (float) fkRangeDelta; + } + } + } + } + return scaledSelectivity; + } + + private static long getRangeDelta(ColStatistics.Range range) { + if (range.minValue != null && range.maxValue != null) { + return (range.maxValue.longValue() - range.minValue.longValue()); + } + return 0; + } + private static boolean isWithin(ColStatistics.Range range1, ColStatistics.Range range2) { if (range1.minValue != null && range2.minValue != null && range1.maxValue != null && range2.maxValue != null) {