diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 8c7d99d..e708d58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -76,7 +76,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext TableScanOperator ts = (TableScanOperator) source; // push down projections ColumnProjectionUtils.appendReadColumns( - job, ts.getNeededColumnIDs(), ts.getNeededColumns()); + job, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters HiveInputFormat.pushFilters(job, ts); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 584eff4..7c1e344 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -21,12 +21,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,7 +205,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, TableScanOperator ts = (TableScanOperator)aliasToWork.get(alias); // push down projections ColumnProjectionUtils.appendReadColumns( - jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); + jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters HiveInputFormat.pushFilters(jobClone, ts); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 0f02222..68477ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -220,7 +220,7 @@ private void gatherStats(Object row) { ObjectInspectorUtils.partialCopyToStandardObject(rdSize, row, rdSizeColumn, 1, (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); - currentStat.addToStat(StatsSetupConst.RAW_DATA_SIZE, (((LongWritable)rdSize.get(0)).get())); + currentStat.addToStat(StatsSetupConst.RAW_DATA_SIZE, (((LongWritable) rdSize.get(0)).get())); } } @@ -297,6 +297,14 @@ public void setNeededColumns(List columnNames) { conf.setNeededColumns(columnNames); } + public List getNeededNestedColumnPaths() { + return conf.getNeededNestedColumnPaths(); + } + + public void setNeededNestedColumnPaths(List nestedColumnPaths) { + conf.setNeededNestedColumnPaths(nestedColumnPaths); + } + public List getNeededColumns() { return conf.getNeededColumns(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index f81fc71..42861c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -479,7 +479,7 @@ private void initializeOperators(Map fetchOpJobConfMap) TableScanOperator ts = (TableScanOperator)work.getAliasToWork().get(entry.getKey()); // push down projections ColumnProjectionUtils.appendReadColumns( - jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); + jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters HiveInputFormat.pushFilters(jobClone, ts); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index c4b9940..69956ec 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hive.ql.io; -import java.util.Arrays; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -29,11 +26,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.Map.Entry; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -513,7 +508,7 @@ private void pushProjection(final JobConf newjob, final StringBuilder readColumn } } - + protected static PartitionDesc getPartitionDescFromPath( Map pathToPartitionInfo, Path dir) throws IOException { @@ -632,7 +627,7 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass TableScanOperator ts = (TableScanOperator) op; // push down projections. ColumnProjectionUtils.appendReadColumns( - jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns()); + jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters pushFilters(jobConf, ts); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index b058500..68407f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -99,6 +99,9 @@ private void pushProjectionsAndFilters(final JobConf jobConf, boolean allColumnsNeeded = false; boolean noFilters = false; Set neededColumnIDs = new HashSet(); + // To support nested column pruning, we need to track the path from the top to the nested + // fields + Set neededNestedColumnPaths = new HashSet(); List filterExprs = new ArrayList(); RowSchema rowSchema = null; @@ -112,6 +115,7 @@ private void pushProjectionsAndFilters(final JobConf jobConf, allColumnsNeeded = true; } else { neededColumnIDs.addAll(ts.getNeededColumnIDs()); + neededNestedColumnPaths.addAll(ts.getNeededNestedColumnPaths()); } rowSchema = ts.getSchema(); @@ -143,6 +147,8 @@ private void pushProjectionsAndFilters(final JobConf jobConf, if (!allColumnsNeeded) { if (!neededColumnIDs.isEmpty()) { ColumnProjectionUtils.appendReadColumns(jobConf, new ArrayList(neededColumnIDs)); + ColumnProjectionUtils.appendNestedColumnPaths(jobConf, + new ArrayList(neededNestedColumnPaths)); } } else { ColumnProjectionUtils.setReadAllColumns(jobConf); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java index a89aa4d..f8b788c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java @@ -85,7 +85,7 @@ private void init(final GroupType selectedGroupType, List selectedFields = selectedGroupType.getFields(); for (int i = 0; i < selectedFieldCount; i++) { Type subtype = selectedFields.get(i); - if (containingGroupType.getFields().contains(subtype)) { + if (isSubType(containingGroupType, subtype)) { int fieldIndex = containingGroupType.getFieldIndex(subtype.getName()); TypeInfo _hiveTypeInfo = getFieldTypeIgnoreCase(hiveTypeInfo, subtype.getName(), fieldIndex); converters[i] = getFieldConverter(subtype, fieldIndex, _hiveTypeInfo); @@ -96,6 +96,33 @@ private void init(final GroupType selectedGroupType, } } + // This method is used to check whether the subType is a sub type of the groupType. + // For nested attribution, we need to check its existence by the root path in a recursive way. + private boolean isSubType( + final GroupType groupType, + final Type subtype) { + if (subtype.isPrimitive() && subtype.isRepetition(Type.Repetition.REPEATED)) { + return groupType.getFields().contains(subtype); + } else { + for (Type g : groupType.getFields()) { + if (!g.isPrimitive() && g.getName().equals(subtype.getName())) { + // check all elements are contained in g + boolean containsAll = false; + for (Type subSubType : subtype.asGroupType().getFields()) { + containsAll = isSubType(g.asGroupType(), subSubType); + if (!containsAll) { + break; + } + } + if (containsAll) { + return containsAll; + } + } + } + return false; + } + } + private TypeInfo getFieldTypeIgnoreCase(TypeInfo hiveTypeInfo, String fieldName, int fieldIndex) { if (hiveTypeInfo == null) { return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 3e38cc7..ec0cc9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -15,9 +15,11 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -135,7 +137,7 @@ private static Type getProjectedType(TypeInfo colType, Type fieldType) { ((StructTypeInfo) colType).getAllStructFieldNames(), ((StructTypeInfo) colType).getAllStructFieldTypeInfos() ); - + Type[] typesArray = groupFields.toArray(new Type[0]); return Types.buildGroup(fieldType.getRepetition()) .addFields(typesArray) @@ -200,6 +202,52 @@ private static MessageType getSchemaByIndex(MessageType schema, List col } else { //prefixing with '_mask_' to ensure no conflict with named //columns in the file schema + schemaTypes.add( + Types.optional(PrimitiveTypeName.BINARY).named("_mask_" + colNames.get(i))); + } + } + } + + return new MessageType(schema.getName(), schemaTypes); + } + + /** + * Generate the projected schema from colIndex and nested column paths. If the column is + * contained by colIndex, it will be added directly, otherwise it will build a group type which + * contains all required sub types using nestedColumnPaths. + * @param schema original schema + * @param colNames + * @param colIndexes the index of needed columns + * @param nestedColumnPaths the paths for nested columns + * @return + */ + public static MessageType getProjectedSchema( + MessageType schema, + List colNames, + List colIndexes, + List nestedColumnPaths) { + List schemaTypes = new ArrayList(); + + Set prunedCols = getPrunedNestedColumns(nestedColumnPaths); + for (Integer i : colIndexes) { + if (i < colNames.size()) { + if (i < schema.getFieldCount()) { + Type t = schema.getType(i); + if (!prunedCols.contains(t.getName())) { + schemaTypes.add(schema.getType(i)); + } else { + if (t.isPrimitive()) { + // For primitive type, add directly. + schemaTypes.add(t); + } else { + // For group type, we need to build the projected group type with required leaves + schemaTypes.add(buildProjectedGroupType(t.asGroupType(), + projectLeafTypes(t.asGroupType().getFields(), nestedColumnPaths, t.getName()))); + } + } + } else { + //prefixing with '_mask_' to ensure no conflict with named + //columns in the file schema schemaTypes.add(Types.optional(PrimitiveTypeName.BINARY).named("_mask_" + colNames.get(i))); } } @@ -209,6 +257,57 @@ private static MessageType getSchemaByIndex(MessageType schema, List col } /** + * Return the columns which contains required nested attribution level + * e.g. + * Given struct a and a is required while y is not, so the method will return a + * who contains the attribution x + * + * @param groupPaths the paths for required nested attributions + * @return column list contains required nested attribution + */ + private static Set getPrunedNestedColumns(List groupPaths) { + Set colNames = new HashSet<>(); + if (groupPaths.isEmpty()) { + return colNames; + } + for (String s : groupPaths) { + String c = StringUtils.split(s, '.')[0]; + if (!colNames.contains(c)) { + colNames.add(c); + } + } + return colNames; + } + + private static GroupType buildProjectedGroupType( + GroupType originalType, + List types) { + return new GroupType(originalType.getRepetition(), originalType.getName(), types); + } + + private static List projectLeafTypes( + List types, + List groupPaths, + String pathToRoot) { + List res = new ArrayList<>(); + for (Type type : types) { + String tn = type.getName(); + String c = (pathToRoot.isEmpty()) ? tn : pathToRoot + "." + tn; + + if (type instanceof GroupType) { + GroupType groupType = type.asGroupType(); + List ts = projectLeafTypes(groupType.getFields(), groupPaths, c); + res.add(buildProjectedGroupType(groupType, ts)); + } else { + if (groupPaths.contains(c)) { + res.add(type); + } + } + } + return res; + } + + /** * It creates the readContext for Parquet side with the requested schema during the init phase. * * @param context @@ -246,10 +345,11 @@ private static MessageType getSchemaByIndex(MessageType schema, List col contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(indexAccess)); this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + List groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration); List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { - MessageType requestedSchemaByUser = - getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); + MessageType requestedSchemaByUser = getProjectedSchema(tableSchema, columnNamesList, + indexColumnsWanted, groupPaths); return new ReadContext(requestedSchemaByUser, contextMetadata); } else { return new ReadContext(tableSchema, contextMetadata); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java index 611a6b7..41da2e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java @@ -20,8 +20,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; @@ -38,8 +40,10 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.util.StringUtils; /** * This class implements the processor context for Column Pruner. @@ -50,6 +54,11 @@ private final Map, List> prunedColLists; + /** + * This map stores the pruned nested column path for each operator + */ + private final Map, List> prunedNestedColLists; + private final Map>> joinPrunedColLists; private final Map> unionPrunedColLists; @@ -57,6 +66,7 @@ public ColumnPrunerProcCtx(ParseContext pctx) { this.pctx = pctx; prunedColLists = new HashMap, List>(); + prunedNestedColLists = new HashMap, List>(); joinPrunedColLists = new HashMap>>(); unionPrunedColLists = new HashMap<>(); } @@ -84,6 +94,10 @@ public ParseContext getParseContext() { return prunedColLists; } + public Map, List> getPrunedNestedColLists() { + return prunedNestedColLists; + } + /** * Creates the list of internal column names(these names are used in the * RowResolver and are different from the external column names) that are @@ -138,6 +152,25 @@ public ParseContext getParseContext() { } /** + * Get the path to the root column for the nested column attribution + * + * @param curOp current operator + * @return the nested column paths for current operator and its child operator + */ + public List genNestedColPaths(Operator curOp) { + if (curOp.getChildOperators() == null) { + return null; + } + Set groupPathsList = new HashSet<>(); + + for (Operator child : curOp.getChildOperators()) { + groupPathsList.addAll(prunedNestedColLists.get(child)); + } + + return new ArrayList<>(groupPathsList); + } + + /** * Creates the list of internal column names(these names are used in the * RowResolver and are different from the external column names) that are * needed in the subtree. These columns eventually have to be selected from @@ -239,6 +272,51 @@ public ParseContext getParseContext() { } /** + * Creates the list of internal group paths for select * expressions. + * + * @param op The select operator. + * @param paths The list of nested column paths returned by the children of the + * select operator. + * @return List of the nested column path from leaf to the root. + */ + public List getSelectNestedColPathsFromChildren( + SelectOperator op, + List paths) { + List groups = new ArrayList(); + SelectDesc conf = op.getConf(); + + if (paths != null && conf.isSelStarNoCompute()) { + groups.addAll(paths); + return groups; + } + + List selectExprs = conf.getColList(); + + List outputColumnNames = conf.getOutputColumnNames(); + for (int i = 0; i < outputColumnNames.size(); i++) { + if (paths == null || paths.contains(outputColumnNames.get(i))) { + ExprNodeDesc expr = selectExprs.get(i); + String gp = getNestedColPathByExpr(expr); + if (gp != null) { + groups.add(gp); + } + } + } + + return groups; + } + + private String getNestedColPathByExpr(ExprNodeDesc expr) { + if (expr instanceof ExprNodeFieldDesc && !((ExprNodeFieldDesc) expr).getIsList()) { + ExprNodeFieldDesc exprNodeFieldDesc = (ExprNodeFieldDesc) expr; + List cols = exprNodeFieldDesc.getCols(); + return StringUtils.join(".", new String[] { cols.get(0), exprNodeFieldDesc.getFieldName() }); + } else { + return null; + } + } + + /** * Create the list of internal columns for select tag of LV */ public List getSelectColsFromLVJoin(RowSchema rs, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index a2a7f00..cf22cee 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.UnionDesc; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFInputDef; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; @@ -323,9 +322,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, } else { prunedCols = referencedColumns; } - - List newRS = prunedColumnsList(prunedCols, op.getSchema(), funcDef); - + + List newRS = prunedColumnsList(prunedCols, op.getSchema(), funcDef); + op.getSchema().setSignature(new ArrayList(newRS)); ShapeDetails outputShape = funcDef.getStartOfChain().getInput().getOutputShape(); @@ -333,7 +332,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, return null; } - private List buildPrunedRS(List prunedCols, RowSchema oldRS) + private List buildPrunedRS(List prunedCols, RowSchema oldRS) throws SemanticException { ArrayList sig = new ArrayList(); HashSet prunedColsSet = new HashSet(prunedCols); @@ -355,7 +354,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, } return columns; } - + private RowResolver buildPrunedRR(List prunedCols, RowSchema oldRS) throws SemanticException { RowResolver resolver = new RowResolver(); @@ -403,12 +402,12 @@ private RowResolver buildPrunedRR(List prunedCols, RowSchema oldRS) } else { pDef.getOutputShape().setRr(buildPrunedRR(prunedCols, oldRS)); } - + PTFInputDef input = pDef.getInput(); if (input instanceof PartitionedTableFunctionDef) { return prunedColumnsList(prunedCols, oldRS, (PartitionedTableFunctionDef)input); } - + ArrayList inputColumns = prunedInputList(prunedCols, input); input.getOutputShape().setRr(buildPrunedRR(inputColumns, oldRS)); input.getOutputShape().setColumnNames(inputColumns); @@ -489,9 +488,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, cppCtx.getPrunedColLists().put((Operator) nd, cols); + cppCtx.getPrunedNestedColLists().put((Operator) nd, + cppCtx.genNestedColPaths((Operator) nd)); RowSchema inputRS = scanOp.getSchema(); setupNeededColumns(scanOp, inputRS, cols); + scanOp.setNeededNestedColumnPaths( + cppCtx.genNestedColPaths((Operator) nd)); + return null; } } @@ -712,12 +716,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, ((SelectDesc)select.getConf()).setColList(colList); ((SelectDesc)select.getConf()).setOutputColumnNames(outputColNames); pruneOperator(ctx, select, outputColNames); - + Operator udtfPath = op.getChildOperators().get(LateralViewJoinOperator.UDTF_TAG); List lvFCols = new ArrayList(cppCtx.getPrunedColLists().get(udtfPath)); lvFCols = Utilities.mergeUniqElems(lvFCols, outputColNames); pruneOperator(ctx, op, lvFCols); - + return null; } } @@ -772,7 +776,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, // and return the ones which have a marked column cppCtx.getPrunedColLists().put(op, cppCtx.getSelectColsFromChildren(op, cols)); - + cppCtx.getPrunedNestedColLists().put(op, cppCtx.getSelectNestedColPathsFromChildren(op, cols)); if (cols == null || conf.isSelStarNoCompute()) { return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index ebe613e..2cb0935 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.serde.serdeConstants; - /** * Table Scan Descriptor Currently, data is only read from a base source as part * of map-reduce framework. So, nothing is stored in the descriptor. But, more @@ -81,6 +80,7 @@ // SELECT count(*) FROM t). private List neededColumnIDs; private List neededColumns; + private List neededNestedColumnPaths; // all column names referenced including virtual columns. used in ColumnAccessAnalyzer private transient List referencedColumns; @@ -202,6 +202,14 @@ public void setNeededColumnIDs(List neededColumnIDs) { return neededColumnIDs; } + public List getNeededNestedColumnPaths() { + return neededNestedColumnPaths; + } + + public void setNeededNestedColumnPaths(List neededNestedColumnPaths) { + this.neededNestedColumnPaths = neededNestedColumnPaths; + } + public void setNeededColumns(List neededColumns) { this.neededColumns = neededColumns; } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java new file mode 100644 index 0000000..97fe5f2 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java @@ -0,0 +1,75 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.Type; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HiveParquetSchemaTestUtils { + + public static List createHiveColumnsFrom(final String columnNamesStr) { + List columnNames; + if (columnNamesStr.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNamesStr.split(",")); + } + + return columnNames; + } + + public static List createHiveTypeInfoFrom(final String columnsTypeStr) { + List columnTypes; + + if (columnsTypeStr.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); + } + + return columnTypes; + } + + public static void testConversion( + final String columnNamesStr, + final String columnsTypeStr, + final String expectedSchema) throws Exception { + final List columnNames = createHiveColumnsFrom(columnNamesStr); + final List columnTypes = createHiveTypeInfoFrom(columnsTypeStr); + final MessageType messageTypeFound = HiveSchemaConverter.convert(columnNames, columnTypes); + final MessageType expectedMT = MessageTypeParser.parseMessageType(expectedSchema); + assertEquals("converting " + columnNamesStr + ": " + columnsTypeStr + " to " + expectedSchema, + expectedMT, messageTypeFound); + + // Required to check the original types manually as PrimitiveType.equals does not care about it + List expectedFields = expectedMT.getFields(); + List actualFields = messageTypeFound.getFields(); + for (int i = 0, n = expectedFields.size(); i < n; ++i) { + OriginalType exp = expectedFields.get(i).getOriginalType(); + OriginalType act = actualFields.get(i).getOriginalType(); + assertEquals("Original types of the field do not match", exp, act); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java index 256031e..137c764 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java @@ -13,63 +13,22 @@ */ package org.apache.hadoop.hive.ql.io.parquet; +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.createHiveColumnsFrom; +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.createHiveTypeInfoFrom; +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.testConversion; import static org.junit.Assert.assertEquals; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type.Repetition; import org.junit.Test; -public class TestHiveSchemaConverter { - - private List createHiveColumnsFrom(final String columnNamesStr) { - List columnNames; - if (columnNamesStr.length() == 0) { - columnNames = new ArrayList(); - } else { - columnNames = Arrays.asList(columnNamesStr.split(",")); - } - - return columnNames; - } - - private List createHiveTypeInfoFrom(final String columnsTypeStr) { - List columnTypes; - - if (columnsTypeStr.length() == 0) { - columnTypes = new ArrayList(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); - } - return columnTypes; - } - - private void testConversion(final String columnNamesStr, final String columnsTypeStr, final String expectedSchema) throws Exception { - final List columnNames = createHiveColumnsFrom(columnNamesStr); - final List columnTypes = createHiveTypeInfoFrom(columnsTypeStr); - final MessageType messageTypeFound = HiveSchemaConverter.convert(columnNames, columnTypes); - final MessageType expectedMT = MessageTypeParser.parseMessageType(expectedSchema); - assertEquals("converting " + columnNamesStr + ": " + columnsTypeStr + " to " + expectedSchema, expectedMT, messageTypeFound); - - // Required to check the original types manually as PrimitiveType.equals does not care about it - List expectedFields = expectedMT.getFields(); - List actualFields = messageTypeFound.getFields(); - for (int i = 0, n = expectedFields.size(); i < n; ++i) { - OriginalType exp = expectedFields.get(i).getOriginalType(); - OriginalType act = actualFields.get(i).getOriginalType(); - assertEquals("Original types of the field do not match", exp, act); - } - } +public class TestHiveSchemaConverter { @Test public void testSimpleType() throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java new file mode 100644 index 0000000..d63fa81 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java @@ -0,0 +1,72 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.read; + +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Test; + +import java.util.Arrays; + +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.testConversion; + +public class TestDataWritableReadSupport { + @Test + public void testGetProjectedSchema1() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + " optional boolean c;\n" + + " optional fixed_len_byte_array(3) d (DECIMAL(5,2));\n" + + " }\n" + + "}\n"); + + testConversion("structCol", "struct", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), + Arrays.asList("structCol.a")).toString()); + } + + @Test + public void testGetProjectedSchema2() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + " }\n" + + "}\n"); + + testConversion("structCol", "struct", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), + Arrays.asList("structCol.a", "structCol.b")).toString()); + } + + @Test + public void testGetProjectedSchema3() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + " }\n" + + " optional boolean c;\n" + + "}\n"); + + testConversion("structCol,c", "struct,boolean", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol", "c"), Arrays.asList(0, 1), + Arrays.asList("structCol.b", "c")).toString()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java new file mode 100644 index 0000000..8beaf60 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import static org.mockito.Mockito.*; + +import junit.framework.Assert; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestColumnPrunerProcCtx { + @Test + public void testGetSelectNestedColPathsFromChildren() { + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + SelectOperator selectOperator = mock(SelectOperator.class); + + // build the ,col2:string> + // build struct type + List ns = new ArrayList<>(); + ns.add("a"); + ns.add("b"); + List tis = new ArrayList<>(); + TypeInfo ti1 = TypeInfoFactory.booleanTypeInfo; + TypeInfo ti2 = TypeInfoFactory.doubleTypeInfo; + tis.add(ti1); + tis.add(ti2); + TypeInfo col1Type = TypeInfoFactory.getStructTypeInfo(ns, tis); + + // build typeInfo + List names = new ArrayList<>(); + names.add("col1"); + names.add("col2"); + + List typeInfos = new ArrayList<>(); + TypeInfo col2Type = TypeInfoFactory.stringTypeInfo; + typeInfos.add(col1Type); + typeInfos.add(col2Type); + + TypeInfo dataType = TypeInfoFactory.getStructTypeInfo(names, typeInfos); + + ExprNodeDesc desc = new ExprNodeColumnDesc(dataType, "col1", "", false); + ExprNodeDesc column1 = new ExprNodeFieldDesc(TypeInfoFactory.stringTypeInfo, desc, "a", false); + final List colList = Arrays.asList(column1); + final List outputColumnNames = Arrays.asList("_col0"); + final List paths = Arrays.asList("_col0"); + SelectDesc selectDesc = new SelectDesc(colList, outputColumnNames); + selectDesc.setSelStarNoCompute(false); + when(selectOperator.getConf()).thenReturn(selectDesc); + List groups = + ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + Assert.assertTrue( + Arrays.equals(new String[] { "col1.a" }, groups.toArray(new String[groups.size()]))); + } + +} diff --git ql/src/test/queries/clientpositive/parquet_nested_field_pruning.q ql/src/test/queries/clientpositive/parquet_nested_field_pruning.q new file mode 100644 index 0000000..79e41e8 --- /dev/null +++ ql/src/test/queries/clientpositive/parquet_nested_field_pruning.q @@ -0,0 +1,26 @@ +CREATE TABLE s_tbl(id int, name string, info struct) STORED AS PARQUET; +INSERT OVERWRITE TABLE s_tbl SELECT key, value, named_struct('a',key,'b',value) FROM src; +EXPLAIN SELECT info.a FROM s_tbl; +SELECT info.a FROM s_tbl; + +DROP TABLE s_tbl --skipTrash; + + +CREATE TABLE nestedcomplex ( +simple_int int, +nested_struct array>, +simple_string string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +WITH SERDEPROPERTIES ( + 'line.delim'='\n' +); + +load data local inpath '../../data/files/nested_complex.txt' overwrite into table nestedcomplex; + +CREATE TABLE parquet_nested_complex STORED AS PARQUET AS SELECT * FROM nestedcomplex; + +SELECT nested_struct.s FROM parquet_nested_complex; + +DROP TABLE nestedcomplex --skipTrash; +DROP TABLE parquet_nested_complex --skipTrash; \ No newline at end of file diff --git ql/src/test/results/clientpositive/parquet_nested_field_pruning.q.out ql/src/test/results/clientpositive/parquet_nested_field_pruning.q.out new file mode 100644 index 0000000..fbbb80b --- /dev/null +++ ql/src/test/results/clientpositive/parquet_nested_field_pruning.q.out @@ -0,0 +1,639 @@ +PREHOOK: query: CREATE TABLE s_tbl(id int, name string, info struct) STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@s_tbl +POSTHOOK: query: CREATE TABLE s_tbl(id int, name string, info struct) STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@s_tbl +PREHOOK: query: INSERT OVERWRITE TABLE s_tbl SELECT key, value, named_struct('a',key,'b',value) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@s_tbl +POSTHOOK: query: INSERT OVERWRITE TABLE s_tbl SELECT key, value, named_struct('a',key,'b',value) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@s_tbl +POSTHOOK: Lineage: s_tbl.id EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: s_tbl.info EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: s_tbl.name SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: EXPLAIN SELECT info.a FROM s_tbl +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT info.a FROM s_tbl +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: s_tbl + Statistics: Num rows: 500 Data size: 1500 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: info.a (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 1500 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 500 Data size: 1500 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT info.a FROM s_tbl +PREHOOK: type: QUERY +PREHOOK: Input: default@s_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT info.a FROM s_tbl +POSTHOOK: type: QUERY +POSTHOOK: Input: default@s_tbl +#### A masked pattern was here #### +238 +86 +311 +27 +165 +409 +255 +278 +98 +484 +265 +193 +401 +150 +273 +224 +369 +66 +128 +213 +146 +406 +429 +374 +152 +469 +145 +495 +37 +327 +281 +277 +209 +15 +82 +403 +166 +417 +430 +252 +292 +219 +287 +153 +193 +338 +446 +459 +394 +237 +482 +174 +413 +494 +207 +199 +466 +208 +174 +399 +396 +247 +417 +489 +162 +377 +397 +309 +365 +266 +439 +342 +367 +325 +167 +195 +475 +17 +113 +155 +203 +339 +0 +455 +128 +311 +316 +57 +302 +205 +149 +438 +345 +129 +170 +20 +489 +157 +378 +221 +92 +111 +47 +72 +4 +280 +35 +427 +277 +208 +356 +399 +169 +382 +498 +125 +386 +437 +469 +192 +286 +187 +176 +54 +459 +51 +138 +103 +239 +213 +216 +430 +278 +176 +289 +221 +65 +318 +332 +311 +275 +137 +241 +83 +333 +180 +284 +12 +230 +181 +67 +260 +404 +384 +489 +353 +373 +272 +138 +217 +84 +348 +466 +58 +8 +411 +230 +208 +348 +24 +463 +431 +179 +172 +42 +129 +158 +119 +496 +0 +322 +197 +468 +393 +454 +100 +298 +199 +191 +418 +96 +26 +165 +327 +230 +205 +120 +131 +51 +404 +43 +436 +156 +469 +468 +308 +95 +196 +288 +481 +457 +98 +282 +197 +187 +318 +318 +409 +470 +137 +369 +316 +169 +413 +85 +77 +0 +490 +87 +364 +179 +118 +134 +395 +282 +138 +238 +419 +15 +118 +72 +90 +307 +19 +435 +10 +277 +273 +306 +224 +309 +389 +327 +242 +369 +392 +272 +331 +401 +242 +452 +177 +226 +5 +497 +402 +396 +317 +395 +58 +35 +336 +95 +11 +168 +34 +229 +233 +143 +472 +322 +498 +160 +195 +42 +321 +430 +119 +489 +458 +78 +76 +41 +223 +492 +149 +449 +218 +228 +138 +453 +30 +209 +64 +468 +76 +74 +342 +69 +230 +33 +368 +103 +296 +113 +216 +367 +344 +167 +274 +219 +239 +485 +116 +223 +256 +263 +70 +487 +480 +401 +288 +191 +5 +244 +438 +128 +467 +432 +202 +316 +229 +469 +463 +280 +2 +35 +283 +331 +235 +80 +44 +193 +321 +335 +104 +466 +366 +175 +403 +483 +53 +105 +257 +406 +409 +190 +406 +401 +114 +258 +90 +203 +262 +348 +424 +12 +396 +201 +217 +164 +431 +454 +478 +298 +125 +431 +164 +424 +187 +382 +5 +70 +397 +480 +291 +24 +351 +255 +104 +70 +163 +438 +119 +414 +200 +491 +237 +439 +360 +248 +479 +305 +417 +199 +444 +120 +429 +169 +443 +323 +325 +277 +230 +478 +178 +468 +310 +317 +333 +493 +460 +207 +249 +265 +480 +83 +136 +353 +172 +214 +462 +233 +406 +133 +175 +189 +454 +375 +401 +421 +407 +384 +256 +26 +134 +67 +384 +379 +18 +462 +492 +100 +298 +9 +341 +498 +146 +458 +362 +186 +285 +348 +167 +18 +273 +183 +281 +344 +97 +469 +315 +84 +28 +37 +448 +152 +348 +307 +194 +414 +477 +222 +126 +90 +169 +403 +400 +200 +97 +PREHOOK: query: DROP TABLE s_tbl --skipTrash +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@s_tbl +PREHOOK: Output: default@s_tbl +POSTHOOK: query: DROP TABLE s_tbl --skipTrash +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@s_tbl +POSTHOOK: Output: default@s_tbl +PREHOOK: query: CREATE TABLE nestedcomplex ( +simple_int int, +nested_struct array>, +simple_string string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +WITH SERDEPROPERTIES ( + 'line.delim'='\n' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@nestedcomplex +POSTHOOK: query: CREATE TABLE nestedcomplex ( +simple_int int, +nested_struct array>, +simple_string string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +WITH SERDEPROPERTIES ( + 'line.delim'='\n' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@nestedcomplex +PREHOOK: query: load data local inpath '../../data/files/nested_complex.txt' overwrite into table nestedcomplex +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@nestedcomplex +POSTHOOK: query: load data local inpath '../../data/files/nested_complex.txt' overwrite into table nestedcomplex +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@nestedcomplex +PREHOOK: query: CREATE TABLE parquet_nested_complex STORED AS PARQUET AS SELECT * FROM nestedcomplex +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@nestedcomplex +PREHOOK: Output: database:default +PREHOOK: Output: default@parquet_nested_complex +POSTHOOK: query: CREATE TABLE parquet_nested_complex STORED AS PARQUET AS SELECT * FROM nestedcomplex +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@nestedcomplex +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_nested_complex +POSTHOOK: Lineage: parquet_nested_complex.nested_struct SIMPLE [(nestedcomplex)nestedcomplex.FieldSchema(name:nested_struct, type:array>, comment:null), ] +POSTHOOK: Lineage: parquet_nested_complex.simple_int SIMPLE [(nestedcomplex)nestedcomplex.FieldSchema(name:simple_int, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_nested_complex.simple_string SIMPLE [(nestedcomplex)nestedcomplex.FieldSchema(name:simple_string, type:string, comment:null), ] +PREHOOK: query: SELECT nested_struct.s FROM parquet_nested_complex +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_nested_complex +#### A masked pattern was here #### +POSTHOOK: query: SELECT nested_struct.s FROM parquet_nested_complex +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_nested_complex +#### A masked pattern was here #### +["0\u001d1\u001d2"] +["0\u001d3\u001d2"] +PREHOOK: query: DROP TABLE nestedcomplex --skipTrash +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@nestedcomplex +PREHOOK: Output: default@nestedcomplex +POSTHOOK: query: DROP TABLE nestedcomplex --skipTrash +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@nestedcomplex +POSTHOOK: Output: default@nestedcomplex +PREHOOK: query: DROP TABLE parquet_nested_complex --skipTrash +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@parquet_nested_complex +PREHOOK: Output: default@parquet_nested_complex +POSTHOOK: query: DROP TABLE parquet_nested_complex --skipTrash +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@parquet_nested_complex +POSTHOOK: Output: default@parquet_nested_complex diff --git serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java index 0c7ac30..097dac8 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java @@ -37,9 +37,18 @@ public static final Logger LOG = LoggerFactory.getLogger(ColumnProjectionUtils.class); public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids"; + /** + * the nested column path is the string from the root to the leaf + * e.g. + * c:struct + * the column a's path is c.a and b's path is c.b + */ + public static final String READ_NESTED_COLUMN_PATH_CONF_STR = + "hive.io.file.readNestedColumn.paths"; public static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names"; private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = ""; + private static final String READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT = ""; private static final boolean READ_ALL_COLUMNS_DEFAULT = true; private static final Joiner CSV_JOINER = Joiner.on(",").skipNulls(); @@ -113,6 +122,27 @@ public static void appendReadColumns(Configuration conf, List ids) { } /** + * Appends read nested column's paths. Once a read nested column path + * is included in the list, a underlying record reader of a columnar file format + * (e.g. Parquet and ORC) can know what columns are needed. + */ + public static void appendNestedColumnPaths( + Configuration conf, + List paths) { + String pathsStr = StringUtils.join(StringUtils.COMMA_STR, + paths.toArray(new String[paths.size()])); + String old = conf.get(READ_NESTED_COLUMN_PATH_CONF_STR, null); + String newConfStr = pathsStr; + if (old != null && !old.isEmpty()) { + newConfStr = newConfStr + StringUtils.COMMA_STR + old; + } + setReadNestedColumnPathConf(conf, newConfStr); + // Set READ_ALL_COLUMNS to false + conf.setBoolean(READ_ALL_COLUMNS, false); + } + + + /** * This method appends read column information to configuration to use for PPD. It is * currently called with information from TSOP. Names come from TSOP input RowSchema, and * IDs are the indexes inside the schema (which PPD assumes correspond to indexes inside the @@ -122,13 +152,14 @@ public static void appendReadColumns(Configuration conf, List ids) { * @param names Column names. */ public static void appendReadColumns( - Configuration conf, List ids, List names) { + Configuration conf, List ids, List names, List groupPaths) { if (ids.size() != names.size()) { LOG.warn("Read column counts do not match: " + ids.size() + " ids, " + names.size() + " names"); } appendReadColumns(conf, ids); appendReadColumnNames(conf, names); + appendNestedColumnPaths(conf, groupPaths); } public static void appendReadColumns( @@ -160,6 +191,24 @@ public static void appendReadColumns( return result; } + public static List getNestedColumnPaths(Configuration conf) { + String skips = + conf.get(READ_NESTED_COLUMN_PATH_CONF_STR, READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT); + String[] list = StringUtils.split(skips); + List result = new ArrayList<>(list.length); + for (String element : list) { + // it may contain duplicates, remove duplicates + if (!result.contains(element)) { + result.add(element); + } + // NOTE: some code uses this list to correlate with column names, and yet these lists may + // contain duplicates, which this call will remove and the other won't. As far as I can + // tell, no code will actually use these two methods together; all is good if the code + // gets the ID list without relying on this method. Or maybe it just works by magic. + } + return result; + } + public static String[] getReadColumnNames(Configuration conf) { String colNames = conf.get(READ_COLUMN_NAMES_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT); if (colNames != null && !colNames.isEmpty()) { @@ -176,6 +225,16 @@ private static void setReadColumnIDConf(Configuration conf, String id) { } } + private static void setReadNestedColumnPathConf( + Configuration conf, + String nestedColumnPaths) { + if (nestedColumnPaths.trim().isEmpty()) { + conf.set(READ_NESTED_COLUMN_PATH_CONF_STR, READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT); + } else { + conf.set(READ_NESTED_COLUMN_PATH_CONF_STR, nestedColumnPaths); + } + } + private static void appendReadColumnNames(Configuration conf, List cols) { String old = conf.get(READ_COLUMN_NAMES_CONF_STR, ""); StringBuilder result = new StringBuilder(old);