diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java index 32fba6c8ff..20571130e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java @@ -113,7 +113,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import scala.math.Numeric; public class StatsRulesProcFactory { @@ -297,7 +296,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // result in number of rows getting more than the input rows in // which case stats need not be updated if (newNumRows <= parentStats.getNumRows()) { - updateStats(st, newNumRows, true, fop, aspCtx.getAffectedColumns()); + StatsUtils.updateStats(st, newNumRows, true, fop, aspCtx.getAffectedColumns()); } if (LOG.isDebugEnabled()) { @@ -307,7 +306,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // update only the basic statistics in the absence of column statistics if (newNumRows <= parentStats.getNumRows()) { - updateStats(st, newNumRows, false, fop); + StatsUtils.updateStats(st, newNumRows, false, fop); } if (LOG.isDebugEnabled()) { @@ -358,9 +357,9 @@ protected long evaluateExpression(Statistics stats, ExprNodeDesc pred, // Ndv is reduced in a conservative manner - only taking affected columns // (which might be a subset of the actual *real* affected columns due to current limitation) // Goal is to not let a situation in which ndv-s asre underestimated happen. - updateStats(aspCtx.getAndExprStats(), newNumRows, true, op, aspCtx.getAffectedColumns()); + StatsUtils.updateStats(aspCtx.getAndExprStats(), newNumRows, true, op, aspCtx.getAffectedColumns()); } else { - updateStats(aspCtx.getAndExprStats(), newNumRows, false, op); + StatsUtils.updateStats(aspCtx.getAndExprStats(), newNumRows, false, op); } } } else if (udf instanceof GenericUDFOPOr) { @@ -1434,7 +1433,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } // update stats, but don't update NDV as it will not change - updateStats(stats, cardinality, true, gop); + StatsUtils.updateStats(stats, cardinality, true, gop); } else { // NO COLUMN STATS @@ -1471,7 +1470,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } - updateStats(stats, cardinality, false, gop); + StatsUtils.updateStats(stats, cardinality, false, gop); } } @@ -1502,7 +1501,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // only if the column stats is available, update the data size from // the column stats if (!stats.getColumnStatsState().equals(Statistics.State.NONE)) { - updateStats(stats, stats.getNumRows(), true, gop); + StatsUtils.updateStats(stats, stats.getNumRows(), true, gop); } } @@ -1510,7 +1509,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // be full aggregation query like count(*) in which case number of // rows will be 1 if (colExprMap.isEmpty()) { - updateStats(stats, 1, true, gop); + StatsUtils.updateStats(stats, 1, true, gop); } } @@ -1870,7 +1869,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // result in number of rows getting more than the input rows in // which case stats need not be updated if (newNumRows <= joinRowCount) { - updateStats(stats, newNumRows, true, jop); + StatsUtils.updateStats(stats, newNumRows, true, jop); } } @@ -1960,7 +1959,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, aspCtx, jop.getSchema().getColumnNames(), jop, wcStats.getNumRows()); // update only the basic statistics in the absence of column statistics if (newNumRows <= joinRowCount) { - updateStats(wcStats, newNumRows, false, jop); + StatsUtils.updateStats(wcStats, newNumRows, false, jop); } } @@ -2582,7 +2581,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // if limit is greater than available rows then do not update // statistics if (limit <= parentStats.getNumRows()) { - updateStats(stats, limit, true, lop); + StatsUtils.updateStats(stats, limit, true, lop); } stats = applyRuntimeStats(aspCtx.getParseContext().getContext(), stats, lop); lop.setStatistics(stats); @@ -2809,72 +2808,6 @@ public static NodeProcessor getDefaultRule() { return new DefaultStatsRule(); } - /** - * Update the basic statistics of the statistics object based on the row number - * @param stats - * - statistics to be updated - * @param newNumRows - * - new number of rows - * @param useColStats - * - use column statistics to compute data size - */ - static void updateStats(Statistics stats, long newNumRows, - boolean useColStats, Operator op) { - updateStats(stats, newNumRows, useColStats, op, Collections.EMPTY_SET); - } - - static void updateStats(Statistics stats, long newNumRows, - boolean useColStats, Operator op, - Set affectedColumns) { - - if (newNumRows < 0) { - LOG.debug("STATS-" + op.toString() + ": Overflow in number of rows. " - + newNumRows + " rows will be set to Long.MAX_VALUE"); - newNumRows = StatsUtils.getMaxIfOverflow(newNumRows); - } - if (newNumRows == 0) { - LOG.debug("STATS-" + op.toString() + ": Equals 0 in number of rows. " - + newNumRows + " rows will be set to 1"); - newNumRows = 1; - } - - long oldRowCount = stats.getNumRows(); - double ratio = (double) newNumRows / (double) oldRowCount; - stats.setNumRows(newNumRows); - - if (useColStats) { - List colStats = stats.getColumnStats(); - for (ColStatistics cs : colStats) { - long oldNumNulls = cs.getNumNulls(); - long oldDV = cs.getCountDistint(); - long newNumNulls = Math.round(ratio * oldNumNulls); - cs.setNumNulls(newNumNulls); - if (affectedColumns.contains(cs.getColumnName())) { - long newDV = oldDV; - - // if ratio is greater than 1, then number of rows increases. This can happen - // when some operators like GROUPBY duplicates the input rows in which case - // number of distincts should not change. Update the distinct count only when - // the output number of rows is less than input number of rows. - if (ratio <= 1.0) { - newDV = (long) Math.ceil(ratio * oldDV); - } - cs.setCountDistint(newDV); - oldDV = newDV; - } - if (oldDV > newNumRows) { - cs.setCountDistint(newNumRows); - } - } - stats.setColumnStats(colStats); - long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats); - stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); - } else { - long newDataSize = (long) (ratio * stats.getDataSize()); - stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); - } - } - static boolean satisfyPrecondition(Statistics stats) { return stats != null && stats.getBasicStatsState().equals(Statistics.State.COMPLETE) && !stats.getColumnStatsState().equals(Statistics.State.NONE); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index fdc963506c..fd4910afe4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Deque; @@ -1344,8 +1346,7 @@ private static boolean canUseNDV(ColStatistics colStats) { } private static double getBloomFilterCost( - SelectOperator sel, - FilterOperator fil) { + SelectOperator sel) { double cost = -1; Statistics selStats = sel.getStatistics(); if (selStats != null) { @@ -1411,10 +1412,9 @@ private static long getCombinedKeyDomainCardinality( private static double getBloomFilterBenefit( SelectOperator sel, ExprNodeDesc selExpr, - FilterOperator fil, ExprNodeDesc tsExpr) { + Statistics filStats, ExprNodeDesc tsExpr) { double benefit = -1; Statistics selStats = sel.getStatistics(); - Statistics filStats = fil.getStatistics(); if (selStats == null || filStats == null) { LOG.debug("No stats available to compute BloomFilter benefit"); return benefit; @@ -1477,12 +1477,11 @@ private static double getBloomFilterBenefit( private static double computeBloomFilterNetBenefit( SelectOperator sel, ExprNodeDesc selExpr, - FilterOperator fil, ExprNodeDesc tsExpr) { + Statistics filStats, ExprNodeDesc tsExpr) { double netBenefit = -1; - double benefit = getBloomFilterBenefit(sel, selExpr, fil, tsExpr); - Statistics filStats = fil.getStatistics(); + double benefit = getBloomFilterBenefit(sel, selExpr, filStats, tsExpr); if (benefit > 0 && filStats != null) { - double cost = getBloomFilterCost(sel, fil); + double cost = getBloomFilterCost(sel); if (cost > 0) { long filDataSize = filStats.getNumRows(); netBenefit = (benefit - cost) / filDataSize; @@ -1498,47 +1497,93 @@ private static double computeBloomFilterNetBenefit( private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx) throws SemanticException { + Map adjustedStatsMap = new HashMap<>(); + Map reductionFactorMap = new HashMap<>(); List semijoinRsToRemove = new ArrayList<>(); Map map = procCtx.parseContext.getRsToSemiJoinBranchInfo(); double semijoinReductionThreshold = procCtx.conf.getFloatVar( HiveConf.ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_THRESHOLD); - for (ReduceSinkOperator rs : map.keySet()) { - SemiJoinBranchInfo sjInfo = map.get(rs); - if (sjInfo.getIsHint() || !sjInfo.getShouldRemove()) { - // Semijoin created using hint or marked useful, skip it - continue; - } - // rs is semijoin optimization branch, which should look like -SEL-GB1-RS1-GB2-RS2 - // Get to the SelectOperator ancestor - SelectOperator sel = null; - for (Operator currOp = rs; currOp.getParentOperators().size() > 0; currOp = currOp.getParentOperators().get(0)) { - if (currOp instanceof SelectOperator) { - sel = (SelectOperator) currOp; - break; + List semiJoinRsOps = Lists.newArrayList(map.keySet()); + while (!semiJoinRsOps.isEmpty()) { + List semiJoinRsOpsNewIter = new ArrayList<>(); + for (ReduceSinkOperator rs : semiJoinRsOps) { + SemiJoinBranchInfo sjInfo = map.get(rs); + if (sjInfo.getIsHint() || !sjInfo.getShouldRemove()) { + // Semijoin created using hint or marked useful, skip it + continue; + } + // rs is semijoin optimization branch, which should look like -SEL-GB1-RS1-GB2-RS2 + // Get to the SelectOperator ancestor + SelectOperator sel = null; + for (Operator currOp = rs; currOp.getParentOperators().size() > 0; currOp = currOp.getParentOperators().get(0)) { + if (currOp instanceof SelectOperator) { + sel = (SelectOperator) currOp; + break; + } + } + if (sel == null) { + throw new SemanticException("Unexpected error - could not find SEL ancestor from semijoin branch of " + rs); } - } - if (sel == null) { - throw new SemanticException("Unexpected error - could not find SEL ancestor from semijoin branch of " + rs); - } - // Check the ndv/rows from the SEL vs the destination tablescan the semijoin opt is going to. - TableScanOperator ts = sjInfo.getTsOp(); - RuntimeValuesInfo rti = procCtx.parseContext.getRsToRuntimeValuesInfoMap().get(rs); - ExprNodeDesc tsExpr = rti.getTsColExpr(); - // In the SEL operator of the semijoin branch, there should be only one column in the operator - ExprNodeDesc selExpr = sel.getConf().getColList().get(0); + // Check the ndv/rows from the SEL vs the destination tablescan the semijoin opt is going to. + TableScanOperator ts = sjInfo.getTsOp(); + RuntimeValuesInfo rti = procCtx.parseContext.getRsToRuntimeValuesInfoMap().get(rs); + ExprNodeDesc tsExpr = rti.getTsColExpr(); + // In the SEL operator of the semijoin branch, there should be only one column in the operator + ExprNodeDesc selExpr = sel.getConf().getColList().get(0); - if (LOG.isDebugEnabled()) { - LOG.debug("Computing BloomFilter cost/benefit for " + OperatorUtils.getOpNamePretty(rs) - + " - " + OperatorUtils.getOpNamePretty(ts) + " (" + tsExpr + ")"); + if (LOG.isDebugEnabled()) { + LOG.debug("Computing BloomFilter cost/benefit for " + OperatorUtils.getOpNamePretty(rs) + + " - " + OperatorUtils.getOpNamePretty(ts) + " (" + tsExpr + ")"); + } + + FilterOperator filterOperator = (FilterOperator) ts.getChildOperators().get(0); + Statistics filterStats = adjustedStatsMap.get(filterOperator); + if (filterStats == null && filterOperator.getStatistics() != null) { + filterStats = filterOperator.getStatistics().clone(); + } + double reductionFactor = computeBloomFilterNetBenefit( + sel, selExpr, filterStats, tsExpr); + if (reductionFactor < semijoinReductionThreshold) { + // This semijoin optimization should be removed. Do it after we're done iterating + semijoinRsToRemove.add(rs); + } else { + // This semijoin qualifies, add it to the result set + if (filterStats != null) { + String colName = ExprNodeDescUtils.getColumnExpr(tsExpr).getColumn(); + SemijoinOperatorInfo prevResult = reductionFactorMap.get(filterOperator); + if (prevResult != null) { + if (prevResult.reductionFactor < reductionFactor) { + reductionFactorMap.put(filterOperator, new SemijoinOperatorInfo(rs, filterOperator, + filterStats, colName, reductionFactor)); + semiJoinRsOpsNewIter.add(prevResult.rsOperator); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + OperatorUtils.getOpNamePretty(prevResult.rsOperator) + + " for re-iteration"); + } + } else { + semiJoinRsOpsNewIter.add(rs); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + OperatorUtils.getOpNamePretty(rs) + " for re-iteration"); + } + } + } else { + reductionFactorMap.put(filterOperator, new SemijoinOperatorInfo(rs, filterOperator, + filterStats, colName, reductionFactor)); + } + } + } } - double reductionFactor = computeBloomFilterNetBenefit(sel, selExpr, - (FilterOperator)ts.getChildOperators().get(0), tsExpr); - if (reductionFactor < semijoinReductionThreshold) { - // This semijoin optimization should be removed. Do it after we're done iterating - semijoinRsToRemove.add(rs); + for (SemijoinOperatorInfo roi : reductionFactorMap.values()) { + // This semijoin will be kept + // We are going to adjust the filter statistics + StatsUtils.updateStats(roi.filterStats, (long) (1.0 - roi.reductionFactor) * roi.filterStats.getNumRows(), + true, roi.filterOperator, Sets.newHashSet(roi.colName)); + adjustedStatsMap.put(roi.filterOperator, roi.filterStats); } + + semiJoinRsOps = semiJoinRsOpsNewIter; } for (ReduceSinkOperator rs : semijoinRsToRemove) { @@ -1552,6 +1597,27 @@ private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx) } } + /** + * Internal class to encapsulate information needed to evaluate stats + * about a SJ that will be kept in the tree. + */ + private class SemijoinOperatorInfo { + ReduceSinkOperator rsOperator; + FilterOperator filterOperator; + String colName; + Statistics filterStats; + double reductionFactor; + + public SemijoinOperatorInfo(ReduceSinkOperator rsOperator, FilterOperator filterOperator, + Statistics filterStats, String colName, double reductionFactor) { + this.rsOperator = rsOperator; + this.filterOperator = filterOperator; + this.colName = colName; + this.filterStats = filterStats; + this.reductionFactor = reductionFactor; + } + } + private void markSemiJoinForDPP(OptimizeTezProcContext procCtx) throws SemanticException { // Stores the Tablescan operators processed to avoid redoing them. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java index b7adc485a7..3d8e67b936 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.Statistics.State; import org.apache.hadoop.hive.ql.stats.BasicStats.Factory; @@ -1928,4 +1930,70 @@ public static boolean areColumnStatsUptoDateForQueryAnswering(Table table, Map op) { + updateStats(stats, newNumRows, useColStats, op, Collections.EMPTY_SET); + } + + public static void updateStats(Statistics stats, long newNumRows, + boolean useColStats, Operator op, + Set affectedColumns) { + + if (newNumRows < 0) { + LOG.debug("STATS-" + op.toString() + ": Overflow in number of rows. " + + newNumRows + " rows will be set to Long.MAX_VALUE"); + newNumRows = StatsUtils.getMaxIfOverflow(newNumRows); + } + if (newNumRows == 0) { + LOG.debug("STATS-" + op.toString() + ": Equals 0 in number of rows. " + + newNumRows + " rows will be set to 1"); + newNumRows = 1; + } + + long oldRowCount = stats.getNumRows(); + double ratio = (double) newNumRows / (double) oldRowCount; + stats.setNumRows(newNumRows); + + if (useColStats) { + List colStats = stats.getColumnStats(); + for (ColStatistics cs : colStats) { + long oldNumNulls = cs.getNumNulls(); + long oldDV = cs.getCountDistint(); + long newNumNulls = Math.round(ratio * oldNumNulls); + cs.setNumNulls(newNumNulls); + if (affectedColumns.contains(cs.getColumnName())) { + long newDV = oldDV; + + // if ratio is greater than 1, then number of rows increases. This can happen + // when some operators like GROUPBY duplicates the input rows in which case + // number of distincts should not change. Update the distinct count only when + // the output number of rows is less than input number of rows. + if (ratio <= 1.0) { + newDV = (long) Math.ceil(ratio * oldDV); + } + cs.setCountDistint(newDV); + oldDV = newDV; + } + if (oldDV > newNumRows) { + cs.setCountDistint(newNumRows); + } + } + stats.setColumnStats(colStats); + long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); + } else { + long newDataSize = (long) (ratio * stats.getDataSize()); + stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize)); + } + } } diff --git a/ql/src/test/results/clientpositive/perf/tez/query65.q.out b/ql/src/test/results/clientpositive/perf/tez/query65.q.out index 3d030f9624..0be13575e7 100644 --- a/ql/src/test/results/clientpositive/perf/tez/query65.q.out +++ b/ql/src/test/results/clientpositive/perf/tez/query65.q.out @@ -67,42 +67,38 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### Plan optimized by CBO. Vertex dependency in root stage -Map 1 <- Reducer 14 (BROADCAST_EDGE), Reducer 16 (BROADCAST_EDGE), Reducer 8 (BROADCAST_EDGE) -Map 12 <- Reducer 11 (BROADCAST_EDGE), Reducer 14 (BROADCAST_EDGE) -Reducer 10 <- Reducer 9 (SIMPLE_EDGE) -Reducer 11 <- Map 7 (CUSTOM_SIMPLE_EDGE) -Reducer 14 <- Map 13 (CUSTOM_SIMPLE_EDGE) -Reducer 16 <- Map 15 (CUSTOM_SIMPLE_EDGE) -Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE) +Map 1 <- Reducer 11 (BROADCAST_EDGE) +Reducer 11 <- Map 10 (CUSTOM_SIMPLE_EDGE) +Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE) -Reducer 4 <- Map 13 (SIMPLE_EDGE), Reducer 10 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE) -Reducer 5 <- Map 15 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE) +Reducer 4 <- Map 10 (SIMPLE_EDGE), Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE) +Reducer 5 <- Map 12 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE) Reducer 6 <- Reducer 5 (SIMPLE_EDGE) -Reducer 8 <- Map 7 (CUSTOM_SIMPLE_EDGE) -Reducer 9 <- Map 12 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE) +Reducer 7 <- Map 1 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE) +Reducer 8 <- Reducer 7 (SIMPLE_EDGE) Stage-0 Fetch Operator limit:100 Stage-1 Reducer 6 vectorized - File Output Operator [FS_176] - Limit [LIM_175] (rows=100 width=88) + File Output Operator [FS_160] + Limit [LIM_159] (rows=100 width=88) Number of rows:100 - Select Operator [SEL_174] (rows=255550079 width=88) + Select Operator [SEL_158] (rows=255550079 width=88) Output:["_col0","_col1","_col2","_col3","_col4","_col5"] <-Reducer 5 [SIMPLE_EDGE] SHUFFLE [RS_48] Select Operator [SEL_47] (rows=255550079 width=88) Output:["_col0","_col1","_col2","_col3","_col4","_col5"] Merge Join Operator [MERGEJOIN_129] (rows=255550079 width=88) - Conds:RS_44._col1=RS_153._col0(Inner),Output:["_col2","_col6","_col8","_col9","_col10","_col11"] - <-Map 15 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_153] + Conds:RS_44._col1=RS_157._col0(Inner),Output:["_col2","_col6","_col8","_col9","_col10","_col11"] + <-Map 12 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_157] PartitionCols:_col0 - Select Operator [SEL_152] (rows=462000 width=1436) + Select Operator [SEL_156] (rows=462000 width=1436) Output:["_col0","_col1","_col2","_col3","_col4"] - Filter Operator [FIL_151] (rows=462000 width=1436) + Filter Operator [FIL_155] (rows=462000 width=1436) predicate:i_item_sk is not null TableScan [TS_36] (rows=462000 width=1436) default@item,item,Tbl:COMPLETE,Col:NONE,Output:["i_item_sk","i_item_desc","i_current_price","i_wholesale_cost","i_brand"] @@ -112,78 +108,20 @@ Stage-0 Filter Operator [FIL_43] (rows=232318249 width=88) predicate:(_col2 <= (0.1 * _col4)) Merge Join Operator [MERGEJOIN_128] (rows=696954748 width=88) - Conds:RS_163._col0=RS_173._col0(Inner),RS_163._col0=RS_144._col0(Inner),Output:["_col1","_col2","_col4","_col6"] - <-Map 13 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_144] + Conds:RS_149._col0=RS_154._col0(Inner),RS_149._col0=RS_132._col0(Inner),Output:["_col1","_col2","_col4","_col6"] + <-Map 10 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_132] PartitionCols:_col0 - Select Operator [SEL_143] (rows=1704 width=1910) + Select Operator [SEL_131] (rows=1704 width=1910) Output:["_col0","_col1"] - Filter Operator [FIL_142] (rows=1704 width=1910) + Filter Operator [FIL_130] (rows=1704 width=1910) predicate:s_store_sk is not null TableScan [TS_33] (rows=1704 width=1910) default@store,store,Tbl:COMPLETE,Col:NONE,Output:["s_store_sk","s_store_name"] - <-Reducer 10 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_173] - PartitionCols:_col0 - Select Operator [SEL_172] (rows=158398803 width=88) - Output:["_col0","_col1"] - Group By Operator [GBY_171] (rows=158398803 width=88) - Output:["_col0","_col1","_col2"],aggregations:["sum(_col2)","count(_col2)"],keys:_col1 - Select Operator [SEL_170] (rows=316797606 width=88) - Output:["_col1","_col2"] - Group By Operator [GBY_169] (rows=316797606 width=88) - Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1 - <-Reducer 9 [SIMPLE_EDGE] - SHUFFLE [RS_25] - PartitionCols:_col0 - Group By Operator [GBY_24] (rows=633595212 width=88) - Output:["_col0","_col1","_col2"],aggregations:["sum(_col3)"],keys:_col2, _col1 - Merge Join Operator [MERGEJOIN_127] (rows=633595212 width=88) - Conds:RS_168._col0=RS_134._col0(Inner),Output:["_col1","_col2","_col3"] - <-Map 7 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_134] - PartitionCols:_col0 - Select Operator [SEL_131] (rows=73049 width=1119) - Output:["_col0"] - Filter Operator [FIL_130] (rows=73049 width=1119) - predicate:(d_date_sk is not null and d_month_seq BETWEEN 1212 AND 1223) - TableScan [TS_3] (rows=73049 width=1119) - default@date_dim,date_dim,Tbl:COMPLETE,Col:NONE,Output:["d_date_sk","d_month_seq"] - <-Map 12 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_168] - PartitionCols:_col0 - Select Operator [SEL_167] (rows=575995635 width=88) - Output:["_col0","_col1","_col2","_col3"] - Filter Operator [FIL_166] (rows=575995635 width=88) - predicate:((ss_sold_date_sk BETWEEN DynamicValue(RS_21_date_dim_d_date_sk_min) AND DynamicValue(RS_21_date_dim_d_date_sk_max) and in_bloom_filter(ss_sold_date_sk, DynamicValue(RS_21_date_dim_d_date_sk_bloom_filter))) and (ss_store_sk BETWEEN DynamicValue(RS_41_store_s_store_sk_min) AND DynamicValue(RS_41_store_s_store_sk_max) and in_bloom_filter(ss_store_sk, DynamicValue(RS_41_store_s_store_sk_bloom_filter))) and ss_sold_date_sk is not null and ss_store_sk is not null) - TableScan [TS_14] (rows=575995635 width=88) - default@store_sales,store_sales,Tbl:COMPLETE,Col:NONE,Output:["ss_sold_date_sk","ss_item_sk","ss_store_sk","ss_sales_price"] - <-Reducer 14 [BROADCAST_EDGE] vectorized - BROADCAST [RS_150] - Group By Operator [GBY_148] (rows=1 width=12) - Output:["_col0","_col1","_col2"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","bloom_filter(VALUE._col2, expectedEntries=1000000)"] - <-Map 13 [CUSTOM_SIMPLE_EDGE] vectorized - SHUFFLE [RS_147] - Group By Operator [GBY_146] (rows=1 width=12) - Output:["_col0","_col1","_col2"],aggregations:["min(_col0)","max(_col0)","bloom_filter(_col0, expectedEntries=1000000)"] - Select Operator [SEL_145] (rows=1704 width=1910) - Output:["_col0"] - Please refer to the previous Select Operator [SEL_143] - <-Reducer 11 [BROADCAST_EDGE] vectorized - BROADCAST [RS_165] - Group By Operator [GBY_164] (rows=1 width=12) - Output:["_col0","_col1","_col2"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","bloom_filter(VALUE._col2, expectedEntries=1000000)"] - <-Map 7 [CUSTOM_SIMPLE_EDGE] vectorized - SHUFFLE [RS_139] - Group By Operator [GBY_137] (rows=1 width=12) - Output:["_col0","_col1","_col2"],aggregations:["min(_col0)","max(_col0)","bloom_filter(_col0, expectedEntries=1000000)"] - Select Operator [SEL_135] (rows=73049 width=1119) - Output:["_col0"] - Please refer to the previous Select Operator [SEL_131] <-Reducer 3 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_163] + SHUFFLE [RS_149] PartitionCols:_col0 - Group By Operator [GBY_162] (rows=316797606 width=88) + Group By Operator [GBY_148] (rows=316797606 width=88) Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1 <-Reducer 2 [SIMPLE_EDGE] SHUFFLE [RS_11] @@ -191,43 +129,64 @@ Stage-0 Group By Operator [GBY_10] (rows=633595212 width=88) Output:["_col0","_col1","_col2"],aggregations:["sum(_col3)"],keys:_col2, _col1 Merge Join Operator [MERGEJOIN_126] (rows=633595212 width=88) - Conds:RS_161._col0=RS_132._col0(Inner),Output:["_col1","_col2","_col3"] - <-Map 7 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_132] - PartitionCols:_col0 - Please refer to the previous Select Operator [SEL_131] + Conds:RS_142._col0=RS_146._col0(Inner),Output:["_col1","_col2","_col3"] <-Map 1 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_161] + SHUFFLE [RS_142] PartitionCols:_col0 - Select Operator [SEL_160] (rows=575995635 width=88) + Select Operator [SEL_140] (rows=575995635 width=88) Output:["_col0","_col1","_col2","_col3"] - Filter Operator [FIL_159] (rows=575995635 width=88) - predicate:((ss_item_sk BETWEEN DynamicValue(RS_45_item_i_item_sk_min) AND DynamicValue(RS_45_item_i_item_sk_max) and in_bloom_filter(ss_item_sk, DynamicValue(RS_45_item_i_item_sk_bloom_filter))) and (ss_sold_date_sk BETWEEN DynamicValue(RS_7_date_dim_d_date_sk_min) AND DynamicValue(RS_7_date_dim_d_date_sk_max) and in_bloom_filter(ss_sold_date_sk, DynamicValue(RS_7_date_dim_d_date_sk_bloom_filter))) and (ss_store_sk BETWEEN DynamicValue(RS_41_store_s_store_sk_min) AND DynamicValue(RS_41_store_s_store_sk_max) and in_bloom_filter(ss_store_sk, DynamicValue(RS_41_store_s_store_sk_bloom_filter))) and ss_item_sk is not null and ss_sold_date_sk is not null and ss_store_sk is not null) + Filter Operator [FIL_138] (rows=575995635 width=88) + predicate:(ss_item_sk is not null and ss_sold_date_sk is not null and ss_store_sk is not null) TableScan [TS_0] (rows=575995635 width=88) default@store_sales,store_sales,Tbl:COMPLETE,Col:NONE,Output:["ss_sold_date_sk","ss_item_sk","ss_store_sk","ss_sales_price"] - <-Reducer 14 [BROADCAST_EDGE] vectorized - BROADCAST [RS_149] - Please refer to the previous Group By Operator [GBY_148] - <-Reducer 16 [BROADCAST_EDGE] vectorized - BROADCAST [RS_158] - Group By Operator [GBY_157] (rows=1 width=12) + <-Reducer 11 [BROADCAST_EDGE] vectorized + BROADCAST [RS_137] + Group By Operator [GBY_136] (rows=1 width=12) Output:["_col0","_col1","_col2"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","bloom_filter(VALUE._col2, expectedEntries=1000000)"] - <-Map 15 [CUSTOM_SIMPLE_EDGE] vectorized - SHUFFLE [RS_156] - Group By Operator [GBY_155] (rows=1 width=12) + <-Map 10 [CUSTOM_SIMPLE_EDGE] vectorized + SHUFFLE [RS_135] + Group By Operator [GBY_134] (rows=1 width=12) Output:["_col0","_col1","_col2"],aggregations:["min(_col0)","max(_col0)","bloom_filter(_col0, expectedEntries=1000000)"] - Select Operator [SEL_154] (rows=462000 width=1436) - Output:["_col0"] - Please refer to the previous Select Operator [SEL_152] - <-Reducer 8 [BROADCAST_EDGE] vectorized - BROADCAST [RS_141] - Group By Operator [GBY_140] (rows=1 width=12) - Output:["_col0","_col1","_col2"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","bloom_filter(VALUE._col2, expectedEntries=1000000)"] - <-Map 7 [CUSTOM_SIMPLE_EDGE] vectorized - SHUFFLE [RS_138] - Group By Operator [GBY_136] (rows=1 width=12) - Output:["_col0","_col1","_col2"],aggregations:["min(_col0)","max(_col0)","bloom_filter(_col0, expectedEntries=1000000)"] - Select Operator [SEL_133] (rows=73049 width=1119) + Select Operator [SEL_133] (rows=1704 width=1910) Output:["_col0"] Please refer to the previous Select Operator [SEL_131] + <-Map 9 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_146] + PartitionCols:_col0 + Select Operator [SEL_145] (rows=73049 width=1119) + Output:["_col0"] + Filter Operator [FIL_144] (rows=73049 width=1119) + predicate:(d_date_sk is not null and d_month_seq BETWEEN 1212 AND 1223) + TableScan [TS_3] (rows=73049 width=1119) + default@date_dim,date_dim,Tbl:COMPLETE,Col:NONE,Output:["d_date_sk","d_month_seq"] + <-Reducer 8 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_154] + PartitionCols:_col0 + Select Operator [SEL_153] (rows=158398803 width=88) + Output:["_col0","_col1"] + Group By Operator [GBY_152] (rows=158398803 width=88) + Output:["_col0","_col1","_col2"],aggregations:["sum(_col2)","count(_col2)"],keys:_col1 + Select Operator [SEL_151] (rows=316797606 width=88) + Output:["_col1","_col2"] + Group By Operator [GBY_150] (rows=316797606 width=88) + Output:["_col0","_col1","_col2"],aggregations:["sum(VALUE._col0)"],keys:KEY._col0, KEY._col1 + <-Reducer 7 [SIMPLE_EDGE] + SHUFFLE [RS_25] + PartitionCols:_col0 + Group By Operator [GBY_24] (rows=633595212 width=88) + Output:["_col0","_col1","_col2"],aggregations:["sum(_col3)"],keys:_col2, _col1 + Merge Join Operator [MERGEJOIN_127] (rows=633595212 width=88) + Conds:RS_143._col0=RS_147._col0(Inner),Output:["_col1","_col2","_col3"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_143] + PartitionCols:_col0 + Select Operator [SEL_141] (rows=575995635 width=88) + Output:["_col0","_col1","_col2","_col3"] + Filter Operator [FIL_139] (rows=575995635 width=88) + predicate:(ss_sold_date_sk is not null and ss_store_sk is not null) + Please refer to the previous TableScan [TS_0] + <-Map 9 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_147] + PartitionCols:_col0 + Please refer to the previous Select Operator [SEL_145]