diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index 9fed1fd4a4..4a18cfef54 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -627,11 +627,11 @@ public static boolean orderRelNode(RelNode rel) { public static Pair getTopLevelSelect(final RelNode rootRel) { RelNode tmpRel = rootRel; RelNode parentOforiginalProjRel = rootRel; - HiveProject originalProjRel = null; + RelNode originalProjRel = null; while (tmpRel != null) { - if (tmpRel instanceof HiveProject) { - originalProjRel = (HiveProject) tmpRel; + if (tmpRel instanceof HiveProject || tmpRel instanceof HiveTableFunctionScan) { + originalProjRel = tmpRel; break; } parentOforiginalProjRel = tmpRel; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index c11ed59012..2390cc2385 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -37,6 +37,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; @@ -46,6 +47,8 @@ import org.apache.hadoop.hive.conf.HiveConf.StrictChecks; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -67,6 +70,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; import org.apache.hadoop.hive.ql.parse.JoinCond; @@ -76,6 +80,7 @@ import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression; import org.apache.hadoop.hive.ql.parse.PTFTranslator; import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -98,6 +103,7 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,6 +130,7 @@ private final UnparseTranslator unparseTranslator; private final Map topOps; private int uniqueCounter; + private long derivedTableCount; public HiveOpConverter(SemanticAnalyzer semanticAnalyzer, HiveConf hiveConf, UnparseTranslator unparseTranslator, Map topOps) { @@ -186,12 +193,67 @@ OpAttr dispatch(RelNode rn) throws SemanticException { return visit((HiveSortExchange) rn); } else if (rn instanceof HiveAggregate) { return visit((HiveAggregate) rn); + } else if (rn instanceof HiveTableFunctionScan) { + return visit((HiveTableFunctionScan) rn); } LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported" + " yet in return path."); return null; } + private OpAttr visit(HiveTableFunctionScan scanRel) throws SemanticException { + if (LOG.isDebugEnabled()) { + LOG.debug("Translating operator rel#" + scanRel.getId() + ":" + + scanRel.getRelTypeName() + " with row type: [" + scanRel.getRowType() + "]"); + } + + RexCall call = (RexCall)scanRel.getCall(); + + String functionName = call.getOperator().getName(); + FunctionInfo fi = FunctionRegistry.getFunctionInfo(functionName); + GenericUDTF genericUDTF = fi.getGenericUDTF(); + + RowResolver rowResolver = new RowResolver(); + List fieldNames = new ArrayList<>(scanRel.getRowType().getFieldNames()); + List exprNames = new ArrayList<>(fieldNames); + List exprCols = new ArrayList<>(); + Map colExprMap = new HashMap<>(); + for (int pos = 0; pos < call.getOperands().size(); pos++) { + ExprNodeConverter converter = new ExprNodeConverter(SemanticAnalyzer.DUMMY_TABLE, fieldNames.get(pos), + scanRel.getRowType(), scanRel.getRowType(), ((HiveTableScan)scanRel.getInput(0)).getPartOrVirtualCols(), + scanRel.getCluster().getTypeFactory(), true); + ExprNodeDesc exprCol = call.getOperands().get(pos).accept(converter); + colExprMap.put(exprNames.get(pos), exprCol); + exprCols.add(exprCol); + + ColumnInfo columnInfo = new ColumnInfo(fieldNames.get(pos), exprCol.getWritableObjectInspector(), null, false); + rowResolver.put(columnInfo.getTabAlias(), columnInfo.getAlias(), columnInfo); + } + + QB qb = new QB(semanticAnalyzer.getQB().getId(), nextAlias(), true); + qb.getMetaData().setSrcForAlias(SemanticAnalyzer.DUMMY_TABLE, semanticAnalyzer.getDummyTable()); + TableScanOperator op = (TableScanOperator) semanticAnalyzer.genTablePlan(SemanticAnalyzer.DUMMY_TABLE, qb); + op.getConf().setRowLimit(1); + qb.addAlias(SemanticAnalyzer.DUMMY_TABLE); + qb.setTabAlias(SemanticAnalyzer.DUMMY_TABLE, SemanticAnalyzer.DUMMY_TABLE); + + Operator output = OperatorFactory.getAndMakeChild(new SelectDesc(exprCols, fieldNames, false), + new RowSchema(rowResolver.getRowSchema()), op); + output.setColumnExprMap(colExprMap); + semanticAnalyzer.putOpInsertMap(output, rowResolver); + + Operator funcOp = semanticAnalyzer.genUDTFPlan(genericUDTF, null, fieldNames, qb, output, false); + + return new OpAttr(null, new HashSet(), funcOp); + } + + + private String nextAlias() { + String tabAlias = String.format("$hdt$_%d", derivedTableCount); + derivedTableCount++; + return tabAlias; + } + /** * TODO: 1. PPD needs to get pushed in to TS * diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 11e35d0de4..036a3b62f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -3193,8 +3193,7 @@ private void extractJoinCondsFromWhereClause(QBJoinTree joinTree, ASTNode predic } @SuppressWarnings("nls") - Operator putOpInsertMap(Operator op, - RowResolver rr) { + public Operator putOpInsertMap(Operator op, RowResolver rr) { OpParseContext ctx = new OpParseContext(rr); opParseCtx.put(op, ctx); op.augmentPlan(); @@ -8524,7 +8523,7 @@ private Operator genLimitPlan(String dest, Operator input, int offset, int limit return limitMap; } - private Operator genUDTFPlan(GenericUDTF genericUDTF, String outputTableAlias, List colAliases, QB qb, + public Operator genUDTFPlan(GenericUDTF genericUDTF, String outputTableAlias, List colAliases, QB qb, Operator input, boolean outerLV) throws SemanticException { // No GROUP BY / DISTRIBUTE BY / SORT BY / CLUSTER BY @@ -11260,7 +11259,7 @@ protected String getAliasId(String alias, QB qb) { } @SuppressWarnings("nls") - private Operator genTablePlan(String alias, QB qb) throws SemanticException { + public Operator genTablePlan(String alias, QB qb) throws SemanticException { String alias_id = getAliasId(alias, qb); Table tab = qb.getMetaData().getSrcForAlias(alias); @@ -11772,7 +11771,7 @@ private void rewriteRRForSubQ(String alias, Operator operator, boolean skipAmbig opParseCtx.get(operator).setRowResolver(newRR); } - protected Table getDummyTable() throws SemanticException { + public Table getDummyTable() throws SemanticException { Path dummyPath = createDummyFile(); Table desc = new Table(DUMMY_DATABASE, DUMMY_TABLE); desc.getTTable().getSd().setLocation(dummyPath.toString());