diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index 183f45698a..0fac1e4b7f 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -20,14 +20,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; -import java.lang.reflect.Field; import java.math.BigDecimal; import java.net.URL; import java.sql.Connection; @@ -36,29 +34,12 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -66,31 +47,18 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.FieldDesc; -import org.apache.hadoop.hive.llap.LlapRowRecordReader; import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.hive.llap.Schema; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.llap.LlapBaseInputFormat; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; - -import org.datanucleus.ClassLoaderResolver; -import org.datanucleus.NucleusContext; -import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; -import org.datanucleus.AbstractNucleusContext; + import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.apache.hadoop.mapred.InputFormat; @@ -530,12 +498,10 @@ protected int processQuery(String currentDatabase, String query, int numSplits, InputSplit[] splits = inputFormat.getSplits(job, numSplits); // Fetch rows from splits - boolean first = true; int rowCount = 0; for (InputSplit split : splits) { System.out.println("Processing split " + split.getLocations()); - int numColumns = 2; RecordReader reader = inputFormat.getRecordReader(split, job, null); Row row = reader.createValue(); while (reader.next(NullWritable.get(), row)) { diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java index 31848c5db5..9ee5a272c7 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java @@ -14,7 +14,6 @@ * limitations under the License. */ - package org.apache.hive.jdbc; import org.apache.hadoop.hive.conf.HiveConf; @@ -33,8 +32,6 @@ import java.util.List; import java.util.UUID; -import static org.junit.Assert.assertTrue; - /** * TestNewGetSplitsFormat. */ diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormatReturnPath.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormatReturnPath.java new file mode 100644 index 0000000000..9929f60692 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormatReturnPath.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.BeforeClass; + +/** + * TestNewGetSplitsFormat. + */ +public class TestNewGetSplitsFormatReturnPath extends TestNewGetSplitsFormat { + + @BeforeClass public static void beforeTest() throws Exception { + HiveConf conf = defaultConf(); + conf.setBoolVar(HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP, true); + BaseJdbcWithMiniLlap.beforeTest(conf); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index 9fed1fd4a4..4a18cfef54 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -627,11 +627,11 @@ public static boolean orderRelNode(RelNode rel) { public static Pair getTopLevelSelect(final RelNode rootRel) { RelNode tmpRel = rootRel; RelNode parentOforiginalProjRel = rootRel; - HiveProject originalProjRel = null; + RelNode originalProjRel = null; while (tmpRel != null) { - if (tmpRel instanceof HiveProject) { - originalProjRel = (HiveProject) tmpRel; + if (tmpRel instanceof HiveProject || tmpRel instanceof HiveTableFunctionScan) { + originalProjRel = tmpRel; break; } parentOforiginalProjRel = tmpRel; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index c11ed59012..b900e89b49 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelCollations; @@ -37,6 +38,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; @@ -46,6 +48,8 @@ import org.apache.hadoop.hive.conf.HiveConf.StrictChecks; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -67,6 +71,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; import org.apache.hadoop.hive.ql.parse.JoinCond; @@ -97,8 +102,15 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.plan.UDTFDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,12 +198,96 @@ OpAttr dispatch(RelNode rn) throws SemanticException { return visit((HiveSortExchange) rn); } else if (rn instanceof HiveAggregate) { return visit((HiveAggregate) rn); + } else if (rn instanceof HiveTableFunctionScan) { + return visit((HiveTableFunctionScan) rn); } LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported" + " yet in return path."); return null; } + private OpAttr visit(HiveTableFunctionScan scanRel) throws SemanticException { + if (LOG.isDebugEnabled()) { + LOG.debug("Translating operator rel#" + scanRel.getId() + ":" + + scanRel.getRelTypeName() + " with row type: [" + scanRel.getRowType() + "]"); + } + + RexCall call = (RexCall)scanRel.getCall(); + + RowResolver rowResolver = new RowResolver(); + List fieldNames = new ArrayList<>(scanRel.getRowType().getFieldNames()); + List exprNames = new ArrayList<>(fieldNames); + List exprCols = new ArrayList<>(); + Map colExprMap = new HashMap<>(); + for (int pos = 0; pos < call.getOperands().size(); pos++) { + ExprNodeConverter converter = new ExprNodeConverter(SemanticAnalyzer.DUMMY_TABLE, fieldNames.get(pos), + scanRel.getRowType(), scanRel.getRowType(), ((HiveTableScan)scanRel.getInput(0)).getPartOrVirtualCols(), + scanRel.getCluster().getTypeFactory(), true); + ExprNodeDesc exprCol = call.getOperands().get(pos).accept(converter); + colExprMap.put(exprNames.get(pos), exprCol); + exprCols.add(exprCol); + + ColumnInfo columnInfo = new ColumnInfo(fieldNames.get(pos), exprCol.getWritableObjectInspector(), null, false); + rowResolver.put(columnInfo.getTabAlias(), columnInfo.getAlias(), columnInfo); + } + + OpAttr inputOpAf = dispatch(scanRel.getInputs().get(0)); + TableScanOperator op = (TableScanOperator)inputOpAf.inputs.get(0); + op.getConf().setRowLimit(1); + + Operator output = OperatorFactory.getAndMakeChild(new SelectDesc(exprCols, fieldNames, false), + new RowSchema(rowResolver.getRowSchema()), op); + output.setColumnExprMap(colExprMap); + + Operator funcOp = genUDTFPlan(call, fieldNames, output, rowResolver); + + return new OpAttr(null, new HashSet(), funcOp); + } + + private Operator genUDTFPlan(RexCall call, List colAliases, Operator input, RowResolver rowResolver) + throws SemanticException { + LOG.debug("genUDTFPlan, Col aliases: {}", colAliases); + + GenericUDTF genericUDTF = createGenericUDTF(call); + StructObjectInspector rowOI = createStructObjectInspector(rowResolver, colAliases); + StructObjectInspector outputOI = genericUDTF.initialize(rowOI); + List columnInfos = createColumnInfos(outputOI); + + // Add the UDTFOperator to the operator DAG + return OperatorFactory.getAndMakeChild(new UDTFDesc(genericUDTF, false), new RowSchema(columnInfos), input); + } + + private GenericUDTF createGenericUDTF(RexCall call) throws SemanticException { + String functionName = call.getOperator().getName(); + FunctionInfo fi = FunctionRegistry.getFunctionInfo(functionName); + return fi.getGenericUDTF(); + } + + private StructObjectInspector createStructObjectInspector(RowResolver rowResolver, List colAliases) + throws SemanticException { + // Create the object inspector for the input columns and initialize the UDTF + List colNames = rowResolver.getColumnInfos().stream().map(ci -> ci.getInternalName()) + .collect(Collectors.toList()); + List colOIs = rowResolver.getColumnInfos().stream().map(ci -> ci.getObjectInspector()) + .collect(Collectors.toList()); + + return ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colOIs); + } + + private List createColumnInfos(StructObjectInspector outputOI) { + // Generate the output column info's / row resolver using internal names. + List columnInfos = new ArrayList<>(); + for (StructField sf : outputOI.getAllStructFieldRefs()) { + + // Since the UDTF operator feeds into a LVJ operator that will rename all the internal names, we can just use + // field name from the UDTF's OI as the internal name + ColumnInfo col = new ColumnInfo(sf.getFieldName(), + TypeInfoUtils.getTypeInfoFromObjectInspector(sf.getFieldObjectInspector()), null, false); + columnInfos.add(col); + } + return columnInfos; + } + /** * TODO: 1. PPD needs to get pushed in to TS *