diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index cb010fb..96ccc6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -68,6 +68,8 @@ private String defaultPartitionName; + private transient Map scanPlan; + public TableDesc getTableDesc() { return tableDesc; } @@ -267,6 +269,14 @@ public void setReferencedColumns(List referencedColumns) { return conf.getReferencedColumns(); } + public void setScanPlan(Map scanPlan) { + this.scanPlan = scanPlan; + } + + public Map getScanPlan() { + return scanPlan; + } + @Override public OperatorType getType() { return OperatorType.TABLESCAN; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 5c4459b..38c9629 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -416,6 +416,13 @@ protected static PartitionDesc getPartitionDescFromPath( public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { + Map scanPlan = tableScan.getScanPlan(); + if (scanPlan != null && !scanPlan.isEmpty()) { + for (Map.Entry entry : scanPlan.entrySet()) { + jobConf.set(entry.getKey(), entry.getValue()); + } + } + TableScanDesc scanDesc = tableScan.getConf(); if (scanDesc == null) { return; diff --git ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java index 37f18f6..851ccf6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java +++ ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lib; +import java.util.List; import java.util.Stack; /** @@ -64,4 +65,16 @@ public static Node getNthAncestor(Stack st, int n) { } return null; } + + public static void traverse(List nodes, Function function) { + for (T node : nodes) { + if (function.apply(node) && node.getChildren() != null) { + traverse((List) node.getChildren(), function); + } + } + } + + public interface Function { + T apply(F input); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageSubQueryHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageSubQueryHandler.java new file mode 100644 index 0000000..603f66d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageSubQueryHandler.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import org.antlr.runtime.TokenRewriteStream; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.QBParseInfo; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +public interface HiveStorageSubQueryHandler { + + /** + * Converts input subquery into single TS operator if possible. + * The TS should be configured with valid rowSchema and scanPlan which is optional + * + * @param parseInfo + * @param table + * @param stream + * @param source + * @return + * @throws SemanticException + */ + TableScanOperator handleSubQuery(QBParseInfo parseInfo, Table table, + TokenRewriteStream stream, ASTNode source) throws SemanticException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java index dee7d7e..9c9ff7d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java @@ -107,9 +107,9 @@ private static void doSemanticAnalysis(SemanticAnalyzer sem, ASTNode ast, Context ctx) throws SemanticException { QB qb = new QB(null, null, false); ASTNode child = ast; - ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext(); + ParseContext subPCtx = sem.getParseContext(); subPCtx.setContext(ctx); - ((SemanticAnalyzer) sem).initParseCtx(subPCtx); + sem.initParseCtx(subPCtx); LOG.info("Starting Sub-query Semantic Analysis"); sem.doPhase1(child, qb, sem.initPhase1Ctx()); @@ -119,7 +119,7 @@ private static void doSemanticAnalysis(SemanticAnalyzer sem, LOG.info("Completed getting MetaData in Sub-query Semantic Analysis"); LOG.info("Sub-query Abstract syntax tree: " + ast.toStringTree()); - sem.genPlan(qb); + sem.genPlan(ast, qb); LOG.info("Sub-query Completed plan generation"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/QBExpr.java ql/src/java/org/apache/hadoop/hive/ql/parse/QBExpr.java index e923bca..1fbe979 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBExpr.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBExpr.java @@ -38,6 +38,8 @@ NULLOP, UNION, INTERSECT, DIFF }; + private ASTNode source; + private Opcode opcode; private QBExpr qbexpr1; private QBExpr qbexpr2; @@ -52,6 +54,14 @@ public void setAlias(String alias) { this.alias = alias; } + public ASTNode getSource() { + return source; + } + + public void setSource(ASTNode source) { + this.source = source; + } + public QBExpr(String alias) { this.alias = alias; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java index 33b8a21..4d2898c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java @@ -67,6 +67,13 @@ public RowResolver() { isExprResolver = false; } + public RowResolver(String tab_alias, RowSchema schema) { + this(); + for (ColumnInfo info : schema.getSignature()) { + put(tab_alias, info.getAlias(), info); + } + } + /** * Puts a resolver entry corresponding to a source expression which is to be * used for identical expression recognition (e.g. for matching expressions diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 7a71ec7..2114758 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -37,6 +37,7 @@ import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import org.antlr.runtime.TokenRewriteStream; import org.antlr.runtime.tree.Tree; import org.antlr.runtime.tree.TreeWizard; import org.antlr.runtime.tree.TreeWizard.ContextVisitor; @@ -98,6 +99,7 @@ import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageSubQueryHandler; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -367,6 +369,8 @@ public ParseContext getParseContext() { public void doPhase1QBExpr(ASTNode ast, QBExpr qbexpr, String id, String alias) throws SemanticException { + qbexpr.setSource(ast); + assert (ast.getToken() != null); switch (ast.getToken().getType()) { case HiveParser.TOK_QUERY: { @@ -2144,7 +2148,7 @@ private Operator genPlanForSubQueryPredicate( Phase1Ctx ctx_1 = initPhase1Ctx(); doPhase1(subQueryPredicate.getSubQueryAST(), qbSQ, ctx_1); getMetaData(qbSQ); - Operator op = genPlan(qbSQ); + Operator op = genPlan(null, qbSQ); return op; } @@ -9063,7 +9067,7 @@ private void setupStats(TableScanDesc tsDesc, QBParseInfo qbp, Table tab, String private Operator genPlan(QBExpr qbexpr) throws SemanticException { if (qbexpr.getOpcode() == QBExpr.Opcode.NULLOP) { - return genPlan(qbexpr.getQB()); + return genPlan(qbexpr.getSource(), qbexpr.getQB()); } if (qbexpr.getOpcode() == QBExpr.Opcode.UNION) { Operator qbexpr1Ops = genPlan(qbexpr.getQBExpr1()); @@ -9076,11 +9080,45 @@ private Operator genPlan(QBExpr qbexpr) throws SemanticException { } @SuppressWarnings("nls") - public Operator genPlan(QB qb) throws SemanticException { + public Operator genPlan(ASTNode source, QB qb) throws SemanticException { // First generate all the opInfos for the elements in the from clause Map aliasToOpInfo = new HashMap(); + if (qb.getSubqAliases().isEmpty() && qb.getTabAliases().size() == 1 && source != null) { + String tabAlias = qb.getTabAliases().iterator().next(); + Table table = qb.getMetaData().getSrcForAlias(tabAlias); + if (table.getStorageHandler() instanceof HiveStorageSubQueryHandler) { + HiveStorageSubQueryHandler handler = (HiveStorageSubQueryHandler) table.getStorageHandler(); + TokenRewriteStream rewriter = ctx.getTokenRewriteStream(); + + QBParseInfo parseInfo = qb.getParseInfo(); + TableScanOperator operator = handler.handleSubQuery(parseInfo, table, rewriter, source); + if (operator != null && operator.getSchema() != null) { + List cols = new ArrayList(); + for (ColumnInfo column : operator.getSchema().getSignature()) { + cols.add(new FieldSchema(column.getAlias(), column.getType().getTypeName(), null)); + } + table.getTTable().getSd().setCols(cols); + topToTable.put(operator, table); + topOps.put(getAliasId(tabAlias, qb), operator); + + Operator curr = operator; + if (parseInfo.getIsSubQ()) { + tabAlias = parseInfo.getAlias(); + } + RowResolver rr = new RowResolver(tabAlias, operator.getSchema()); + putOpInsertMap(operator, rr); + if (!parseInfo.getIsSubQ()) { + for (String dest : new TreeSet(parseInfo.getClauseNames())) { + curr = genFileSinkPlan(dest, qb, operator); + } + } + return curr; + } + } + } + // Recurse over the subqueries to fill the subquery part of the plan for (String alias : qb.getSubqAliases()) { QBExpr qbexpr = qb.getSubqForAlias(alias); @@ -9429,7 +9467,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // by genPlan. This has the correct column names, which clients // such as JDBC would prefer instead of the c0, c1 we'll end // up with later. - Operator sinkOp = genPlan(qb); + Operator sinkOp = genPlan(ast, qb); if (createVwDesc != null) resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());