diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 2ebb149354..c010b18b61 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -172,4 +173,15 @@ public default StorageHandlerInfo getStorageHandlerInfo(Table table) throws Meta default LockType getLockType(WriteEntity writeEntity){ return LockType.EXCLUSIVE; } + + /** + * Test if the storage handler allows the push-down of join filter predicate to prune further the splits. + * + * @param syntheticFilterPredicate Join filter predicate. + * @return true if supports dynamic split pruning for the given predicate. + */ + + default boolean addDynamicSplitPruningEdge(ExprNodeDesc syntheticFilterPredicate) { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index a1401aac72..3a21ee9d21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.slf4j.Logger; @@ -188,7 +189,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Obje // all partitions have been statically removed LOG.debug("No partition pruning necessary."); } - } else { + } else if (table.isNonNative() && + table.getStorageHandler().addDynamicSplitPruningEdge(desc.getPredicate())){ + generateEventOperatorPlan(ctx, parseContext, ts, column, + table.getCols().stream().filter(e -> e.getName().equals(column)). + map(e -> e.getType()).findFirst().get()); + } else { // semijoin LOG.debug("Column " + column + " is not a partition column"); if (semiJoin && !disableSemiJoinOptDueToExternalTable(parseContext.getConf(), ts, ctx) && ts.getConf().getFilterExpr() != null) { @@ -291,7 +297,7 @@ private boolean disableSemiJoinOptDueToExternalTable(HiveConf conf, TableScanOpe disableSemiJoin = true; } else { // Check the other side of the join, using the DynamicListContext - ExprNodeDesc exprNodeDesc = ctx.generator.getConf().getKeyCols().get(ctx.desc.getKeyIndex()); + ExprNodeDesc exprNodeDesc = ctx.getKeyCol(); ExprNodeColumnDesc colExpr = ExprNodeDescUtils.getColumnExpr(exprNodeDesc); if (colExpr != null) { @@ -317,7 +323,7 @@ private boolean disableSemiJoinOptDueToExternalTable(HiveConf conf, TableScanOpe // Given a key, find the corresponding column name. private boolean getColumnInfo(DynamicListContext ctx, StringBuilder internalColName, StringBuilder colName, StringBuilder tabAlias) { - ExprNodeDesc exprNodeDesc = ctx.generator.getConf().getKeyCols().get(ctx.desc.getKeyIndex()); + ExprNodeDesc exprNodeDesc = ctx.getKeyCol(); ExprNodeColumnDesc colExpr = ExprNodeDescUtils.getColumnExpr(exprNodeDesc); if (colExpr == null) { @@ -435,7 +441,7 @@ private void generateEventOperatorPlan(DynamicListContext ctx, ParseContext pars Operator parentOfRS = ctx.generator.getParentOperators().get(0); // we need the expr that generated the key of the reduce sink - ExprNodeDesc key = ctx.generator.getConf().getKeyCols().get(ctx.desc.getKeyIndex()); + ExprNodeDesc key = ctx.getKeyCol(); // we also need the expr for the partitioned table ExprNodeDesc partKey = ctx.parent.getChildren().get(0); @@ -524,11 +530,17 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex TableScanOperator ts, String keyBaseAlias, String internalColName, String colName, SemiJoinHint sjHint) throws SemanticException { + // Semijoin reduction for non-equi join not yet supported, check for it + ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) ctx.parent; + if (!(funcDesc.getGenericUDF() instanceof GenericUDFIn)) { + return false; + } + // we will put a fork in the plan at the source of the reduce sink Operator parentOfRS = ctx.generator.getParentOperators().get(0); // we need the expr that generated the key of the reduce sink - ExprNodeDesc key = ctx.generator.getConf().getKeyCols().get(ctx.desc.getKeyIndex()); + ExprNodeDesc key = ctx.getKeyCol(); assert colName != null; // Fetch the TableScan Operator. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index f8c7e18eb1..94879c9529 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -779,6 +779,15 @@ public DynamicListContext(ExprNodeDynamicListDesc desc, ExprNodeDesc parent, this.grandParent = grandParent; this.generator = generator; } + + public ExprNodeDesc getKeyCol() { + ExprNodeDesc keyCol = desc.getTarget(); + if (keyCol != null) { + return keyCol; + } + + return generator.getConf().getKeyCols().get(desc.getKeyIndex()); + } } public static class DynamicPartitionPrunerContext implements NodeProcessorCtx, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java index 676dfc9421..9bc5796d83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java @@ -31,15 +31,27 @@ Operator source; int keyIndex; + // Target name, if key is not set + ExprNodeDesc target; public ExprNodeDynamicListDesc() { } + // keyIndex is not used public ExprNodeDynamicListDesc(TypeInfo typeInfo, Operator source, - int keyIndex) { + int keyIndex, ExprNodeDesc target) { super(typeInfo); this.source = source; this.keyIndex = keyIndex; + this.target = target; + } + + public ExprNodeDynamicListDesc(TypeInfo typeInfo, Operator source, + int keyIndex) { + super(typeInfo); + this.source = source; + this.keyIndex = keyIndex; + this.target = null; } public void setSource(Operator source) { @@ -58,9 +70,13 @@ public int getKeyIndex() { return this.keyIndex; } + public ExprNodeDesc getTarget() { + return target; + } + @Override public ExprNodeDesc clone() { - return new ExprNodeDynamicListDesc(typeInfo, source, keyIndex); + return new ExprNodeDynamicListDesc(typeInfo, source, keyIndex, target); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java index e97e44796f..8f441a3236 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -179,66 +180,173 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List sourceKeys = source.getConf().getKeyCols(); List targetKeys = target.getConf().getKeyCols(); - if (sourceKeys.size() < 1) { - continue; - } - ExprNodeDesc syntheticExpr = null; - for (int i = 0; i < sourceKeys.size(); ++i) { - final ExprNodeDesc sourceKey = sourceKeys.get(i); + if (sourceKeys.size() > 0) { - List inArgs = new ArrayList<>(); - inArgs.add(sourceKey); + //if (sourceKeys.size() < 1) continue; - ExprNodeDynamicListDesc dynamicExpr = + for (int i = 0; i < sourceKeys.size(); ++i) { + final ExprNodeDesc sourceKey = sourceKeys.get(i); + + List inArgs = new ArrayList<>(); + inArgs.add(sourceKey); + + ExprNodeDynamicListDesc dynamicExpr = new ExprNodeDynamicListDesc(targetKeys.get(i).getTypeInfo(), target, i); - inArgs.add(dynamicExpr); + inArgs.add(dynamicExpr); - ExprNodeDesc syntheticInExpr = + ExprNodeDesc syntheticInExpr = ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo("in") - .getGenericUDF(), inArgs); - if (LOG.isDebugEnabled()) { - LOG.debug("Synthetic predicate in " + join + ": " + srcPos + " --> " + targetPos + " (" + syntheticInExpr + ")"); - } + .getGenericUDF(), inArgs); + if (LOG.isDebugEnabled()) { + LOG.debug("Synthetic predicate in " + join + ": " + srcPos + " --> " + targetPos + " (" + syntheticInExpr + ")"); + } - List andArgs = new ArrayList<>(); - if (syntheticExpr != null) { - andArgs.add(syntheticExpr); - } - andArgs.add(syntheticInExpr); - - if(sCtx.isExtended()) { - // Backtrack - List newExprs = createDerivatives(target.getParentOperators().get(0), targetKeys.get(i), sourceKey); - if (!newExprs.isEmpty()) { - if (LOG.isDebugEnabled()) { - for (ExprNodeDesc expr : newExprs) { - LOG.debug("Additional synthetic predicate in " + join + ": " + srcPos + " --> " + targetPos + " (" + expr + ")"); + List andArgs = new ArrayList<>(); + if (syntheticExpr != null) { + andArgs.add(syntheticExpr); + } + andArgs.add(syntheticInExpr); + + if (sCtx.isExtended()) { + // Backtrack + List newExprs = createDerivatives(target.getParentOperators().get(0), targetKeys.get(i), sourceKey); + if (!newExprs.isEmpty()) { + if (LOG.isDebugEnabled()) { + for (ExprNodeDesc expr : newExprs) { + LOG.debug("Additional synthetic predicate in " + join + ": " + srcPos + " --> " + targetPos + " (" + expr + ")"); + } } + andArgs.addAll(newExprs); } - andArgs.addAll(newExprs); } - } - if (andArgs.size() < 2) { - syntheticExpr = syntheticInExpr; - } else { - // Create AND expression - syntheticExpr = + if (andArgs.size() < 2) { + syntheticExpr = syntheticInExpr; + } else { + // Create AND expression + syntheticExpr = ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo("and") - .getGenericUDF(), andArgs); + .getGenericUDF(), andArgs); + } + } + + } + + // Handle non-equi joins like <, <=, >, and >= + List residualFilters = join.getConf().getResidualFilterExprs(); + if (residualFilters != null && residualFilters.size() != 0) { + // have non-equi join conditions + for (ExprNodeDesc filter : residualFilters) { + if (!(filter instanceof ExprNodeGenericFuncDesc)) { + continue; + } + + ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) filter; + // filter should be of type <, >, <= or >= + if (getFuncText(funcDesc.getFuncText(), 1) == null) { + // unsupported + continue; + } + // Currently only simple cases like col1 < col2 are supported. + ExprNodeDesc sourceChild = funcDesc.getChildren().get(srcPos); + ExprNodeDesc targetChild = funcDesc.getChildren().get(targetPos); + if (!(sourceChild instanceof ExprNodeColumnDesc && + targetChild instanceof ExprNodeColumnDesc)) { + continue; + } + List funcArgs = new ArrayList<>(); + funcArgs.add(getRSColExprFromResidualFilter(sourceChild, join)); + ExprNodeDynamicListDesc dynamicExpr = + new ExprNodeDynamicListDesc(targetChild.getTypeInfo(), target, 0, + getRSColExprFromResidualFilter(targetChild, join)); + funcArgs.add(dynamicExpr); + ExprNodeDesc funcExpr = + ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo(getFuncText(funcDesc.getFuncText(), srcPos)).getGenericUDF(), funcArgs); + + if (LOG.isDebugEnabled()) { + LOG.debug(" Non-Equi Join Predicate " + funcExpr); + } + + List andArgs = new ArrayList<>(); + if (syntheticExpr != null) { + andArgs.add(syntheticExpr); + } + andArgs.add(funcExpr); + + if (andArgs.size() < 2) { + syntheticExpr = funcExpr; + } else { + syntheticExpr = + ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo("and").getGenericUDF(), andArgs); + } } } - Operator newFilter = createFilter(source, parent, parentRS, syntheticExpr); - parent = newFilter; + if (syntheticExpr != null) { + Operator newFilter = createFilter(source, parent, parentRS, syntheticExpr); + parent = newFilter; + } } return null; } + private ExprNodeDesc getRSColExprFromResidualFilter(ExprNodeDesc childExpr, CommonJoinOperator join) { + ExprNodeColumnDesc colExpr = ExprNodeDescUtils.getColumnExpr(childExpr); + + if (colExpr == null) { + LOG.info("Deepak : colExpr is NULL!"); + LOG.info("Deepak : childExpr = " + childExpr.getExprString()); + assert false; + } + + String joinColName = colExpr.getColumn(); + // use name to get the alias pos of parent and name in parent + int aliasPos = join.getConf().getReversedExprs().get(joinColName); + ExprNodeDesc rsColExpr = join.getColumnExprMap().get(joinColName); + + // Get the correct parent + ReduceSinkOperator parentRS = (ReduceSinkOperator) (join.getParentOperators().get(aliasPos)); + + // Fetch the colExpr from parent + String rsColName = ExprNodeDescUtils.extractColName(rsColExpr); + Map colExprMap = parentRS.getColumnExprMap(); + + for (Map.Entry entry : colExprMap.entrySet()) { + if (entry.getKey().equals(rsColName)) { + return entry.getValue(); + } + } + // Should never happen + return null; + } + + // TODO : Maybe need a better way of doing it? + String getFuncText(String funcText, final int srcPos) { + if (srcPos == 0) { + return funcText; + } + + // Reverse the text + // Better way to do it? + switch (funcText) { + case "<": + return ">"; + case "<=": + return ">="; + case ">": + return "<"; + case ">=": + return "<="; + default: + return null; // helps identify unsupported functions + } + } + + // calculate filter propagation directions for each alias // L<->R for inner/semi join, L<-R for left outer join, R<-L for right outer // join