diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java index 9834e73..036e339 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java @@ -75,6 +75,14 @@ // big tables that should be streamed private List streamAliases; + /* + * when a QBJoinTree is merged into this one, its left(pos =0) filters can + * refer to any of the srces in this QBJoinTree. If a particular filterForPushing refers + * to multiple srces in this QBJoinTree, we collect them into 'filtersFromMergedJoinTrees' + * We then add a Filter Operator after the Join Operator for this QBJoinTree. + */ + private List filtersFromMergedJoinTrees; + /** * constructor. */ @@ -346,4 +354,15 @@ public String getId() { public void setId(String id) { this.id = id; } + + public void addFiltersFromMergingJoinTrees(ASTNode filter) { + if ( filtersFromMergedJoinTrees == null ) { + filtersFromMergedJoinTrees = new ArrayList(); + } + filtersFromMergedJoinTrees.add(filter); + } + + public List getFiltersFromMergedJoinTrees() { + return filtersFromMergedJoinTrees; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 063aa65..92cf538 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1377,6 +1377,54 @@ private boolean isPresent(String[] list, String elem) { return false; } + private int getIndex(String[] list, String elem) { + for(int i=0; i < list.length; i++) { + if (list[i].toLowerCase().equals(elem)) { + return i; + } + } + return -1; + } + + /* + * if the given filterCondn refers to only 1 table alias in the QBJoinTree, + * we return that alias's position. Otherwise we return -1 + */ + private int checkJoinFilterRefersOneAlias(QBJoinTree joinTree, ASTNode filterCondn) { + + switch(filterCondn.getType()) { + case HiveParser.TOK_TABLE_OR_COL: + String tableOrCol = unescapeIdentifier(filterCondn.getChild(0).getText() + .toLowerCase()); + return getIndex(joinTree.getBaseSrc(), tableOrCol); + case HiveParser.Identifier: + case HiveParser.Number: + case HiveParser.StringLiteral: + case HiveParser.BigintLiteral: + case HiveParser.SmallintLiteral: + case HiveParser.TinyintLiteral: + case HiveParser.DecimalLiteral: + case HiveParser.TOK_STRINGLITERALSEQUENCE: + case HiveParser.TOK_CHARSETLITERAL: + case HiveParser.KW_TRUE: + case HiveParser.KW_FALSE: + return -1; + default: + int idx = -1; + int i = filterCondn.getType() == HiveParser.TOK_FUNCTION ? 1 : 0; + for (; i < filterCondn.getChildCount(); i++) { + int cIdx = checkJoinFilterRefersOneAlias(joinTree, (ASTNode) filterCondn.getChild(i)); + if ( cIdx != idx ) { + if ( idx != -1 && cIdx != -1 ) { + return -1; + } + idx = idx == -1 ? cIdx : idx; + } + } + return idx; + } + } + @SuppressWarnings("nls") private void parseJoinCondPopulateAlias(QBJoinTree joinTree, ASTNode condn, ArrayList leftAliases, ArrayList rightAliases, @@ -6091,7 +6139,14 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree, joinSrcOp, srcOps, omitOpts); joinContext.put(joinOp, joinTree); - return joinOp; + + Operator op = joinOp; + if ( joinTree.getFiltersFromMergedJoinTrees() != null ) { + for(ASTNode condn : joinTree.getFiltersFromMergedJoinTrees() ) { + op = genFilterPlan(qb, condn, op); + } + } + return op; } /** @@ -6674,9 +6729,23 @@ private void mergeJoins(QB qb, QBJoinTree node, QBJoinTree target, int pos) { } if (node.getFiltersForPushing().get(0).size() != 0) { - ArrayList filterPos = filter.get(pos); - filterPos.addAll(node.getFiltersForPushing().get(0)); - } + /* + * for each predicate: + * - does it refer to one or many aliases + * - if one: add it to the filterForPushing list of that alias + * - if many: add as a filter from merging trees. + */ + + for(ASTNode nodeFilter : node.getFiltersForPushing().get(0) ) { + int fPos = checkJoinFilterRefersOneAlias(target, nodeFilter); + + if ( fPos != - 1 ) { + filter.get(fPos).add(nodeFilter); + } else { + target.addFiltersFromMergingJoinTrees(nodeFilter); + } + } + } if (node.getNoOuterJoin() && target.getNoOuterJoin()) { target.setNoOuterJoin(true); diff --git ql/src/test/queries/clientpositive/join_merging.q ql/src/test/queries/clientpositive/join_merging.q new file mode 100644 index 0000000..a0046db --- /dev/null +++ ql/src/test/queries/clientpositive/join_merging.q @@ -0,0 +1,25 @@ + + +CREATE TABLE part( + p_partkey INT, + p_name STRING, + p_mfgr STRING, + p_brand STRING, + p_type STRING, + p_size INT, + p_container STRING, + p_retailprice DOUBLE, + p_comment STRING +); + +explain select p1.p_size, p2.p_size +from part p1 left outer join part p2 on p1.p_partkey = p2.p_partkey + right outer join part p3 on p2.p_partkey = p3.p_partkey and + p1.p_size > 10 +; + +explain select p1.p_size, p2.p_size +from part p1 left outer join part p2 on p1.p_partkey = p2.p_partkey + right outer join part p3 on p2.p_partkey = p3.p_partkey and + p1.p_size > 10 and p1.p_size > p2.p_size + 10 +; \ No newline at end of file diff --git ql/src/test/results/clientpositive/join_merging.q.out ql/src/test/results/clientpositive/join_merging.q.out new file mode 100644 index 0000000..59aba31 --- /dev/null +++ ql/src/test/results/clientpositive/join_merging.q.out @@ -0,0 +1,225 @@ +PREHOOK: query: CREATE TABLE part( + p_partkey INT, + p_name STRING, + p_mfgr STRING, + p_brand STRING, + p_type STRING, + p_size INT, + p_container STRING, + p_retailprice DOUBLE, + p_comment STRING +) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE part( + p_partkey INT, + p_name STRING, + p_mfgr STRING, + p_brand STRING, + p_type STRING, + p_size INT, + p_container STRING, + p_retailprice DOUBLE, + p_comment STRING +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@part +PREHOOK: query: explain select p1.p_size, p2.p_size +from part p1 left outer join part p2 on p1.p_partkey = p2.p_partkey + right outer join part p3 on p2.p_partkey = p3.p_partkey and + p1.p_size > 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain select p1.p_size, p2.p_size +from part p1 left outer join part p2 on p1.p_partkey = p2.p_partkey + right outer join part p3 on p2.p_partkey = p3.p_partkey and + p1.p_size > 10 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_RIGHTOUTERJOIN (TOK_LEFTOUTERJOIN (TOK_TABREF (TOK_TABNAME part) p1) (TOK_TABREF (TOK_TABNAME part) p2) (= (. (TOK_TABLE_OR_COL p1) p_partkey) (. (TOK_TABLE_OR_COL p2) p_partkey))) (TOK_TABREF (TOK_TABNAME part) p3) (and (= (. (TOK_TABLE_OR_COL p2) p_partkey) (. (TOK_TABLE_OR_COL p3) p_partkey)) (> (. (TOK_TABLE_OR_COL p1) p_size) 10)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL p1) p_size)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL p2) p_size))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + p1 + TableScan + alias: p1 + Filter Operator + predicate: + expr: (p_size > 10) + type: boolean + Reduce Output Operator + key expressions: + expr: p_partkey + type: int + sort order: + + Map-reduce partition columns: + expr: p_partkey + type: int + tag: 0 + value expressions: + expr: p_size + type: int + p2 + TableScan + alias: p2 + Reduce Output Operator + key expressions: + expr: p_partkey + type: int + sort order: + + Map-reduce partition columns: + expr: p_partkey + type: int + tag: 1 + value expressions: + expr: p_size + type: int + p3 + TableScan + alias: p3 + Reduce Output Operator + key expressions: + expr: p_partkey + type: int + sort order: + + Map-reduce partition columns: + expr: p_partkey + type: int + tag: 2 + Reduce Operator Tree: + Join Operator + condition map: + Left Outer Join0 to 1 + Right Outer Join1 to 2 + condition expressions: + 0 {VALUE._col5} + 1 {VALUE._col5} + 2 + handleSkewJoin: false + outputColumnNames: _col5, _col16 + Select Operator + expressions: + expr: _col5 + type: int + expr: _col16 + type: int + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: explain select p1.p_size, p2.p_size +from part p1 left outer join part p2 on p1.p_partkey = p2.p_partkey + right outer join part p3 on p2.p_partkey = p3.p_partkey and + p1.p_size > 10 and p1.p_size > p2.p_size + 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain select p1.p_size, p2.p_size +from part p1 left outer join part p2 on p1.p_partkey = p2.p_partkey + right outer join part p3 on p2.p_partkey = p3.p_partkey and + p1.p_size > 10 and p1.p_size > p2.p_size + 10 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_RIGHTOUTERJOIN (TOK_LEFTOUTERJOIN (TOK_TABREF (TOK_TABNAME part) p1) (TOK_TABREF (TOK_TABNAME part) p2) (= (. (TOK_TABLE_OR_COL p1) p_partkey) (. (TOK_TABLE_OR_COL p2) p_partkey))) (TOK_TABREF (TOK_TABNAME part) p3) (and (and (= (. (TOK_TABLE_OR_COL p2) p_partkey) (. (TOK_TABLE_OR_COL p3) p_partkey)) (> (. (TOK_TABLE_OR_COL p1) p_size) 10)) (> (. (TOK_TABLE_OR_COL p1) p_size) (+ (. (TOK_TABLE_OR_COL p2) p_size) 10))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL p1) p_size)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL p2) p_size))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + p1 + TableScan + alias: p1 + Filter Operator + predicate: + expr: (p_size > 10) + type: boolean + Reduce Output Operator + key expressions: + expr: p_partkey + type: int + sort order: + + Map-reduce partition columns: + expr: p_partkey + type: int + tag: 0 + value expressions: + expr: p_size + type: int + p2 + TableScan + alias: p2 + Reduce Output Operator + key expressions: + expr: p_partkey + type: int + sort order: + + Map-reduce partition columns: + expr: p_partkey + type: int + tag: 1 + value expressions: + expr: p_size + type: int + p3 + TableScan + alias: p3 + Reduce Output Operator + key expressions: + expr: p_partkey + type: int + sort order: + + Map-reduce partition columns: + expr: p_partkey + type: int + tag: 2 + Reduce Operator Tree: + Join Operator + condition map: + Left Outer Join0 to 1 + Right Outer Join1 to 2 + condition expressions: + 0 {VALUE._col5} + 1 {VALUE._col5} + 2 + handleSkewJoin: false + outputColumnNames: _col5, _col16 + Filter Operator + predicate: + expr: (_col5 > (_col16 + 10)) + type: boolean + Select Operator + expressions: + expr: _col5 + type: int + expr: _col16 + type: int + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + +