diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 8c7d99d..e708d58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -76,7 +76,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext TableScanOperator ts = (TableScanOperator) source; // push down projections ColumnProjectionUtils.appendReadColumns( - job, ts.getNeededColumnIDs(), ts.getNeededColumns()); + job, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters HiveInputFormat.pushFilters(job, ts); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 584eff4..7c1e344 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -21,12 +21,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,7 +205,7 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, TableScanOperator ts = (TableScanOperator)aliasToWork.get(alias); // push down projections ColumnProjectionUtils.appendReadColumns( - jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); + jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters HiveInputFormat.pushFilters(jobClone, ts); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 0f02222..68477ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -220,7 +220,7 @@ private void gatherStats(Object row) { ObjectInspectorUtils.partialCopyToStandardObject(rdSize, row, rdSizeColumn, 1, (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); - currentStat.addToStat(StatsSetupConst.RAW_DATA_SIZE, (((LongWritable)rdSize.get(0)).get())); + currentStat.addToStat(StatsSetupConst.RAW_DATA_SIZE, (((LongWritable) rdSize.get(0)).get())); } } @@ -297,6 +297,14 @@ public void setNeededColumns(List columnNames) { conf.setNeededColumns(columnNames); } + public List getNeededNestedColumnPaths() { + return conf.getNeededNestedColumnPaths(); + } + + public void setNeededNestedColumnPaths(List nestedColumnPaths) { + conf.setNeededNestedColumnPaths(nestedColumnPaths); + } + public List getNeededColumns() { return conf.getNeededColumns(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index f81fc71..42861c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -479,7 +479,7 @@ private void initializeOperators(Map fetchOpJobConfMap) TableScanOperator ts = (TableScanOperator)work.getAliasToWork().get(entry.getKey()); // push down projections ColumnProjectionUtils.appendReadColumns( - jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); + jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters HiveInputFormat.pushFilters(jobClone, ts); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index c4b9940..69956ec 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hive.ql.io; -import java.util.Arrays; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -29,11 +26,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.Map.Entry; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -513,7 +508,7 @@ private void pushProjection(final JobConf newjob, final StringBuilder readColumn } } - + protected static PartitionDesc getPartitionDescFromPath( Map pathToPartitionInfo, Path dir) throws IOException { @@ -632,7 +627,7 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass TableScanOperator ts = (TableScanOperator) op; // push down projections. ColumnProjectionUtils.appendReadColumns( - jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns()); + jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters pushFilters(jobConf, ts); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index b058500..68407f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -99,6 +99,9 @@ private void pushProjectionsAndFilters(final JobConf jobConf, boolean allColumnsNeeded = false; boolean noFilters = false; Set neededColumnIDs = new HashSet(); + // To support nested column pruning, we need to track the path from the top to the nested + // fields + Set neededNestedColumnPaths = new HashSet(); List filterExprs = new ArrayList(); RowSchema rowSchema = null; @@ -112,6 +115,7 @@ private void pushProjectionsAndFilters(final JobConf jobConf, allColumnsNeeded = true; } else { neededColumnIDs.addAll(ts.getNeededColumnIDs()); + neededNestedColumnPaths.addAll(ts.getNeededNestedColumnPaths()); } rowSchema = ts.getSchema(); @@ -143,6 +147,8 @@ private void pushProjectionsAndFilters(final JobConf jobConf, if (!allColumnsNeeded) { if (!neededColumnIDs.isEmpty()) { ColumnProjectionUtils.appendReadColumns(jobConf, new ArrayList(neededColumnIDs)); + ColumnProjectionUtils.appendNestedColumnPaths(jobConf, + new ArrayList(neededNestedColumnPaths)); } } else { ColumnProjectionUtils.setReadAllColumns(jobConf); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java index a89aa4d..936b371 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java @@ -85,7 +85,7 @@ private void init(final GroupType selectedGroupType, List selectedFields = selectedGroupType.getFields(); for (int i = 0; i < selectedFieldCount; i++) { Type subtype = selectedFields.get(i); - if (containingGroupType.getFields().contains(subtype)) { + if (isSubType(containingGroupType, subtype)) { int fieldIndex = containingGroupType.getFieldIndex(subtype.getName()); TypeInfo _hiveTypeInfo = getFieldTypeIgnoreCase(hiveTypeInfo, subtype.getName(), fieldIndex); converters[i] = getFieldConverter(subtype, fieldIndex, _hiveTypeInfo); @@ -96,6 +96,33 @@ private void init(final GroupType selectedGroupType, } } + // This method is used to check whether the subType is a sub type of the groupType. + // For nested attribute, we need to check its existence by the root path in a recursive way. + private boolean isSubType( + final GroupType groupType, + final Type subtype) { + if (subtype.isPrimitive() || subtype.isRepetition(Type.Repetition.REPEATED)) { + return groupType.getFields().contains(subtype); + } else { + for (Type g : groupType.getFields()) { + if (!g.isPrimitive() && g.getName().equals(subtype.getName())) { + // check all elements are contained in g + boolean containsAll = false; + for (Type subSubType : subtype.asGroupType().getFields()) { + containsAll = isSubType(g.asGroupType(), subSubType); + if (!containsAll) { + break; + } + } + if (containsAll) { + return containsAll; + } + } + } + return false; + } + } + private TypeInfo getFieldTypeIgnoreCase(TypeInfo hiveTypeInfo, String fieldName, int fieldIndex) { if (hiveTypeInfo == null) { return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 3e38cc7..8d8b0c5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -14,10 +14,13 @@ package org.apache.hadoop.hive.ql.io.parquet.read; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -25,6 +28,8 @@ import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.FieldNode; +import org.apache.hadoop.hive.ql.optimizer.NestedColumnFieldPruningUtils; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; @@ -135,7 +140,7 @@ private static Type getProjectedType(TypeInfo colType, Type fieldType) { ((StructTypeInfo) colType).getAllStructFieldNames(), ((StructTypeInfo) colType).getAllStructFieldTypeInfos() ); - + Type[] typesArray = groupFields.toArray(new Type[0]); return Types.buildGroup(fieldType.getRepetition()) .addFields(typesArray) @@ -164,7 +169,7 @@ private static Type getProjectedType(TypeInfo colType, Type fieldType) { } /** - * Searchs column names by name on a given Parquet message schema, and returns its projected + * Searches column names by name on a given Parquet message schema, and returns its projected * Parquet schema types. * * @param schema Message type schema where to search for column names. @@ -182,7 +187,7 @@ private static MessageType getSchemaByName(MessageType schema, List colN } /** - * Searchs column names by index on a given Parquet file schema, and returns its corresponded + * Searches column names by indexes on a given Parquet file schema, and returns its corresponded * Parquet schema types. * * @param schema Message schema where to search for column names. @@ -200,6 +205,55 @@ private static MessageType getSchemaByIndex(MessageType schema, List col } else { //prefixing with '_mask_' to ensure no conflict with named //columns in the file schema + schemaTypes.add( + Types.optional(PrimitiveTypeName.BINARY).named("_mask_" + colNames.get(i))); + } + } + } + + return new MessageType(schema.getName(), schemaTypes); + } + + /** + * Generate the projected schema from colIndexes and nested column paths. If the column is + * contained by colIndex, it will be added directly, otherwise it will build a group type which + * contains all required sub types using nestedColumnPaths. + * @param schema original schema + * @param colNames + * @param colIndexes the index of needed columns + * @param nestedColumnPaths the paths for nested columns + * @return + */ + public static MessageType getProjectedSchema( + MessageType schema, + List colNames, + List colIndexes, + List nestedColumnPaths) { + List schemaTypes = new ArrayList(); + + Map prunedCols = getPrunedNestedColumns(nestedColumnPaths); + for (Integer i : colIndexes) { + if (i < colNames.size()) { + if (i < schema.getFieldCount()) { + Type t = schema.getType(i); + if (!prunedCols.containsKey(t.getName())) { + schemaTypes.add(schema.getType(i)); + } else { + if (t.isPrimitive()) { + // For primitive type, add directly. + schemaTypes.add(t); + } else { + // For group type, we need to build the projected group type with required leaves + List g = + projectLeafTypes(Arrays.asList(t), Arrays.asList(prunedCols.get(t.getName()))); + if (!g.isEmpty()) { + schemaTypes.addAll(g); + } + } + } + } else { + //prefixing with '_mask_' to ensure no conflict with named + //columns in the file schema schemaTypes.add(Types.optional(PrimitiveTypeName.BINARY).named("_mask_" + colNames.get(i))); } } @@ -209,6 +263,79 @@ private static MessageType getSchemaByIndex(MessageType schema, List col } /** + * Return the columns which contains required nested attribute level + * e.g. + * Given struct a and a is required while y is not, so the method will return a + * who contains the attribute x + * + * @param nestedColPaths the paths for required nested attribute + * @return column list contains required nested attribute + */ + private static Map getPrunedNestedColumns(List nestedColPaths) { + Map resMap = new HashMap<>(); + if (nestedColPaths.isEmpty()) { + return resMap; + } + for (String s : nestedColPaths) { + String c = StringUtils.split(s, '.')[0]; + if (!resMap.containsKey(c)) { + FieldNode f = NestedColumnFieldPruningUtils.addNodeByPath(null, s); + resMap.put(c, f); + } else { + resMap.put(c, NestedColumnFieldPruningUtils.addNodeByPath(resMap.get(c), s)); + } + } + return resMap; + } + + private static GroupType buildProjectedGroupType( + GroupType originalType, + List types) { + if (types == null || types.isEmpty()) { + return null; + } + return new GroupType(originalType.getRepetition(), originalType.getName(), types); + } + + private static List projectLeafTypes( + List types, + List nodes) { + List res = new ArrayList<>(); + if (nodes.isEmpty()) { + return res; + } + Map fieldMap = new HashMap<>(); + for (FieldNode n : nodes) { + fieldMap.put(n.getFieldName(), n); + } + for (Type type : types) { + String tn = type.getName(); + + if (fieldMap.containsKey(tn)) { + FieldNode f = fieldMap.get(tn); + if (f.getNodes().isEmpty()) { + // no child, no need for pruning + res.add(type); + } else { + if (type instanceof GroupType) { + GroupType groupType = type.asGroupType(); + List ts = projectLeafTypes(groupType.getFields(), f.getNodes()); + GroupType g = buildProjectedGroupType(groupType, ts); + if (g != null) { + res.add(g); + } + } else { + throw new RuntimeException( + "Primitive type " + f.getFieldName() + "should not " + "doesn't match type" + f + .toString()); + } + } + } + } + return res; + } + + /** * It creates the readContext for Parquet side with the requested schema during the init phase. * * @param context @@ -246,10 +373,11 @@ private static MessageType getSchemaByIndex(MessageType schema, List col contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(indexAccess)); this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + List groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration); List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { - MessageType requestedSchemaByUser = - getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); + MessageType requestedSchemaByUser = getProjectedSchema(tableSchema, columnNamesList, + indexColumnsWanted, groupPaths); return new ReadContext(requestedSchemaByUser, contextMetadata); } else { return new ReadContext(tableSchema, contextMetadata); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java index 611a6b7..4364298 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java @@ -20,8 +20,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; @@ -37,9 +39,15 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnListDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** * This class implements the processor context for Column Pruner. @@ -50,6 +58,11 @@ private final Map, List> prunedColLists; + /** + * This map stores the pruned nested column path for each operator + */ + private final Map, List> prunedNestedColLists; + private final Map>> joinPrunedColLists; private final Map> unionPrunedColLists; @@ -57,6 +70,7 @@ public ColumnPrunerProcCtx(ParseContext pctx) { this.pctx = pctx; prunedColLists = new HashMap, List>(); + prunedNestedColLists = new HashMap, List>(); joinPrunedColLists = new HashMap>>(); unionPrunedColLists = new HashMap<>(); } @@ -84,6 +98,10 @@ public ParseContext getParseContext() { return prunedColLists; } + public Map, List> getPrunedNestedColLists() { + return prunedNestedColLists; + } + /** * Creates the list of internal column names(these names are used in the * RowResolver and are different from the external column names) that are @@ -138,6 +156,27 @@ public ParseContext getParseContext() { } /** + * Get the path to the root column for the nested column attribute + * + * @param curOp current operator + * @return the nested column paths for current operator and its child operator + */ + public List genNestedColPaths(Operator curOp) { + if (curOp.getChildOperators() == null) { + return null; + } + Set groupPathsList = new HashSet<>(); + + for (Operator child : curOp.getChildOperators()) { + if (prunedNestedColLists.containsKey(child)) { + groupPathsList.addAll(prunedNestedColLists.get(child)); + } + } + + return new ArrayList<>(groupPathsList); + } + + /** * Creates the list of internal column names(these names are used in the * RowResolver and are different from the external column names) that are * needed in the subtree. These columns eventually have to be selected from @@ -239,6 +278,69 @@ public ParseContext getParseContext() { } /** + * Creates the list of internal group paths for select * expressions. + * + * @param op The select operator. + * @param paths The list of nested column paths returned by the children of the + * select operator. + * @return List of the nested column path from leaf to the root. + */ + public List getSelectNestedColPathsFromChildren( + SelectOperator op, + List paths) { + List groups = new ArrayList<>(); + SelectDesc conf = op.getConf(); + + if (paths != null && conf.isSelStarNoCompute()) { + groups.addAll(paths); + return groups; + } + + List selectDescs = conf.getColList(); + + List outputColumnNames = conf.getOutputColumnNames(); + for (int i = 0; i < outputColumnNames.size(); i++) { + if (paths == null || paths.contains(outputColumnNames.get(i))) { + ExprNodeDesc desc = selectDescs.get(i); + List gp = getNestedColPathByDesc(desc); + groups.addAll(gp); + } + } + + return groups; + } + + // Entry method + private List getNestedColPathByDesc(ExprNodeDesc desc) { + List res = new ArrayList<>(); + getNestedColsFromExprNodeDesc(desc, "", res); + return res; + } + + private void getNestedColsFromExprNodeDesc( + ExprNodeDesc desc, + String pathToRoot, + List paths) { + if (desc instanceof ExprNodeColumnDesc) { + String f = ((ExprNodeColumnDesc) desc).getColumn(); + String p = pathToRoot.isEmpty() ? f : f + "." + pathToRoot; + paths.add(p); + } else if (desc instanceof ExprNodeFieldDesc) { + String f = ((ExprNodeFieldDesc) desc).getFieldName(); + String p = pathToRoot.isEmpty() ? f : f + "." + pathToRoot; + getNestedColsFromExprNodeDesc(((ExprNodeFieldDesc) desc).getDesc(), p, paths); + } else { + List children = desc.getChildren(); + if (children == null || children.isEmpty()) { + return; + } + for (ExprNodeDesc c : children) { + getNestedColsFromExprNodeDesc(c, pathToRoot, paths); + } + } + } + + /** * Create the list of internal columns for select tag of LV */ public List getSelectColsFromLVJoin(RowSchema rs, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index a2a7f00..6ca4df9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.UnionDesc; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFInputDef; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; @@ -323,9 +322,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, } else { prunedCols = referencedColumns; } - - List newRS = prunedColumnsList(prunedCols, op.getSchema(), funcDef); - + + List newRS = prunedColumnsList(prunedCols, op.getSchema(), funcDef); + op.getSchema().setSignature(new ArrayList(newRS)); ShapeDetails outputShape = funcDef.getStartOfChain().getInput().getOutputShape(); @@ -333,7 +332,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, return null; } - private List buildPrunedRS(List prunedCols, RowSchema oldRS) + private List buildPrunedRS(List prunedCols, RowSchema oldRS) throws SemanticException { ArrayList sig = new ArrayList(); HashSet prunedColsSet = new HashSet(prunedCols); @@ -355,7 +354,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, } return columns; } - + private RowResolver buildPrunedRR(List prunedCols, RowSchema oldRS) throws SemanticException { RowResolver resolver = new RowResolver(); @@ -403,12 +402,12 @@ private RowResolver buildPrunedRR(List prunedCols, RowSchema oldRS) } else { pDef.getOutputShape().setRr(buildPrunedRR(prunedCols, oldRS)); } - + PTFInputDef input = pDef.getInput(); if (input instanceof PartitionedTableFunctionDef) { return prunedColumnsList(prunedCols, oldRS, (PartitionedTableFunctionDef)input); } - + ArrayList inputColumns = prunedInputList(prunedCols, input); input.getOutputShape().setRr(buildPrunedRR(inputColumns, oldRS)); input.getOutputShape().setColumnNames(inputColumns); @@ -486,12 +485,15 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, } cols = cols == null ? new ArrayList() : cols; + List nestedCols = cppCtx.genNestedColPaths((Operator) nd); - cppCtx.getPrunedColLists().put((Operator) nd, - cols); + cppCtx.getPrunedColLists().put((Operator) nd, cols); + cppCtx.getPrunedNestedColLists().put((Operator) nd, nestedCols); RowSchema inputRS = scanOp.getSchema(); setupNeededColumns(scanOp, inputRS, cols); + scanOp.setNeededNestedColumnPaths(nestedCols); + return null; } } @@ -712,12 +714,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, ((SelectDesc)select.getConf()).setColList(colList); ((SelectDesc)select.getConf()).setOutputColumnNames(outputColNames); pruneOperator(ctx, select, outputColNames); - + Operator udtfPath = op.getChildOperators().get(LateralViewJoinOperator.UDTF_TAG); List lvFCols = new ArrayList(cppCtx.getPrunedColLists().get(udtfPath)); lvFCols = Utilities.mergeUniqElems(lvFCols, outputColNames); pruneOperator(ctx, op, lvFCols); - + return null; } } @@ -772,7 +774,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, // and return the ones which have a marked column cppCtx.getPrunedColLists().put(op, cppCtx.getSelectColsFromChildren(op, cols)); - + cppCtx.getPrunedNestedColLists().put(op, cppCtx.getSelectNestedColPathsFromChildren(op, cols)); if (cols == null || conf.isSelStarNoCompute()) { return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java new file mode 100644 index 0000000..1579797 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class FieldNode { + private String fieldName; + private List nodes; + + public FieldNode(String fieldName) { + this.fieldName = fieldName; + nodes = new ArrayList<>(); + } + + public String getFieldName() { + return fieldName; + } + + public void addFieldNodes(FieldNode... nodes) { + if (nodes != null || nodes.length > 0) { + this.nodes.addAll(Arrays.asList(nodes)); + } + } + + public List getNodes() { + return nodes; + } + + @Override + public String toString() { + String res = fieldName; + if (nodes.size() > 0) { + res += "["; + for (int i = 0; i < nodes.size(); i++) { + if (i == nodes.size() - 1) { + res += nodes.get(i).toString(); + } else { + res += nodes.get(i).toString() + ","; + } + } + res += "]"; + } + return res; + } + + @Override + public boolean equals(Object object) { + FieldNode fieldNode = (FieldNode) object; + if (!fieldName.equals(fieldNode.getFieldName()) || fieldNode.getNodes().size() != fieldNode + .getNodes().size()) { + return false; + } + + for (int i = 0; i < fieldNode.getNodes().size(); i++) { + if (fieldNode.getNodes().get(i).equals(nodes.get(i))) { + return false; + } + } + return true; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/NestedColumnFieldPruningUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/NestedColumnFieldPruningUtils.java new file mode 100644 index 0000000..0f5629d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NestedColumnFieldPruningUtils.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.Arrays; +import java.util.List; + +public class NestedColumnFieldPruningUtils { + + /** + * Add a leaf node to the field tree if the specified path is not contained by + * current tree specified by the passed parameter field node. + * + * @param fieldNode the root of the column tree + * @param path contains the path from root to leaf + * @return the root of the newly built tree + */ + public static FieldNode addNodeByPath( + FieldNode fieldNode, + String path) { + if (path == null || path.isEmpty()) { + return fieldNode; + } + boolean found = false; + int index = 0; + String[] ps = path.split("\\."); + FieldNode c = fieldNode; + if (fieldNode != null) { + List currentList = Arrays.asList(c); + while (index < ps.length) { + found = false; + for (FieldNode n : currentList) { + if (n.getFieldName().equals(ps[index])) { + found = true; + // If the matched field is leaf which means all leaves are required, not need to go + // deeper. + if (n.getNodes().isEmpty()) { + return fieldNode; + } + c = n; + currentList = c.getNodes(); + break; + } + } + if (found) { + index++; + } else { + break; + } + } + } + + if (!found) { + while (index < ps.length) { + FieldNode n = new FieldNode(ps[index]); + if (fieldNode == null) { + // rebuild the tree since original is empty + fieldNode = n; + } + if (c != null) { + c.addFieldNodes(n); + } + c = n; + index++; + } + } else { + if (index == ps.length) { + // Consolidation since all leaves are required. + c.getNodes().clear(); + } + } + return fieldNode; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index ebe613e..2cb0935 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.serde.serdeConstants; - /** * Table Scan Descriptor Currently, data is only read from a base source as part * of map-reduce framework. So, nothing is stored in the descriptor. But, more @@ -81,6 +80,7 @@ // SELECT count(*) FROM t). private List neededColumnIDs; private List neededColumns; + private List neededNestedColumnPaths; // all column names referenced including virtual columns. used in ColumnAccessAnalyzer private transient List referencedColumns; @@ -202,6 +202,14 @@ public void setNeededColumnIDs(List neededColumnIDs) { return neededColumnIDs; } + public List getNeededNestedColumnPaths() { + return neededNestedColumnPaths; + } + + public void setNeededNestedColumnPaths(List neededNestedColumnPaths) { + this.neededNestedColumnPaths = neededNestedColumnPaths; + } + public void setNeededColumns(List neededColumns) { this.neededColumns = neededColumns; } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java new file mode 100644 index 0000000..510d256 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java @@ -0,0 +1,75 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.Type; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HiveParquetSchemaTestUtils { + + public static List createHiveColumnsFrom(final String columnNamesStr) { + List columnNames; + if (columnNamesStr.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNamesStr.split(",")); + } + + return columnNames; + } + + public static List createHiveTypeInfoFrom(final String columnsTypeStr) { + List columnTypes; + + if (columnsTypeStr.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); + } + + return columnTypes; + } + + public static void testConversion( + final String columnNamesStr, + final String columnsTypeStr, + final String actualSchema) throws Exception { + final List columnNames = createHiveColumnsFrom(columnNamesStr); + final List columnTypes = createHiveTypeInfoFrom(columnsTypeStr); + final MessageType messageTypeFound = HiveSchemaConverter.convert(columnNames, columnTypes); + final MessageType expectedMT = MessageTypeParser.parseMessageType(actualSchema); + assertEquals("converting " + columnNamesStr + ": " + columnsTypeStr + " to " + actualSchema, + expectedMT, messageTypeFound); + + // Required to check the original types manually as PrimitiveType.equals does not care about it + List expectedFields = expectedMT.getFields(); + List actualFields = messageTypeFound.getFields(); + for (int i = 0, n = expectedFields.size(); i < n; ++i) { + OriginalType exp = expectedFields.get(i).getOriginalType(); + OriginalType act = actualFields.get(i).getOriginalType(); + assertEquals("Original types of the field do not match", exp, act); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java index 256031e..137c764 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java @@ -13,63 +13,22 @@ */ package org.apache.hadoop.hive.ql.io.parquet; +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.createHiveColumnsFrom; +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.createHiveTypeInfoFrom; +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.testConversion; import static org.junit.Assert.assertEquals; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type.Repetition; import org.junit.Test; -public class TestHiveSchemaConverter { - - private List createHiveColumnsFrom(final String columnNamesStr) { - List columnNames; - if (columnNamesStr.length() == 0) { - columnNames = new ArrayList(); - } else { - columnNames = Arrays.asList(columnNamesStr.split(",")); - } - - return columnNames; - } - - private List createHiveTypeInfoFrom(final String columnsTypeStr) { - List columnTypes; - - if (columnsTypeStr.length() == 0) { - columnTypes = new ArrayList(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); - } - return columnTypes; - } - - private void testConversion(final String columnNamesStr, final String columnsTypeStr, final String expectedSchema) throws Exception { - final List columnNames = createHiveColumnsFrom(columnNamesStr); - final List columnTypes = createHiveTypeInfoFrom(columnsTypeStr); - final MessageType messageTypeFound = HiveSchemaConverter.convert(columnNames, columnTypes); - final MessageType expectedMT = MessageTypeParser.parseMessageType(expectedSchema); - assertEquals("converting " + columnNamesStr + ": " + columnsTypeStr + " to " + expectedSchema, expectedMT, messageTypeFound); - - // Required to check the original types manually as PrimitiveType.equals does not care about it - List expectedFields = expectedMT.getFields(); - List actualFields = messageTypeFound.getFields(); - for (int i = 0, n = expectedFields.size(); i < n; ++i) { - OriginalType exp = expectedFields.get(i).getOriginalType(); - OriginalType act = actualFields.get(i).getOriginalType(); - assertEquals("Original types of the field do not match", exp, act); - } - } +public class TestHiveSchemaConverter { @Test public void testSimpleType() throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java new file mode 100644 index 0000000..b3aaca6 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java @@ -0,0 +1,112 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.read; + +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Test; + +import java.util.Arrays; + +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.testConversion; + +public class TestDataWritableReadSupport { + @Test + public void testGetProjectedSchema1() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + " optional boolean c;\n" + + " optional fixed_len_byte_array(3) d (DECIMAL(5,2));\n" + + " }\n" + + "}\n"); + + testConversion("structCol", "struct", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), + Arrays.asList("structCol.a")).toString()); + } + + @Test + public void testGetProjectedSchema2() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + " }\n" + + "}\n"); + + testConversion("structCol", "struct", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), + Arrays.asList("structCol.a", "structCol.b")).toString()); + } + + @Test + public void testGetProjectedSchema3() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + " }\n" + + " optional boolean c;\n" + + "}\n"); + + testConversion("structCol,c", "struct,boolean", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol", "c"), Arrays.asList(0, 1), + Arrays.asList("structCol.b", "c")).toString()); + } + + @Test + public void testGetProjectedSchema4() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional group subStructCol {\n" + + " optional int64 b;\n" + + " optional boolean c;\n" + + " }\n" + + " }\n" + + " optional boolean d;\n" + + "}\n"); + + testConversion("structCol", "struct>", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), + Arrays.asList("structCol.subStructCol.b")).toString()); + } + + @Test + public void testGetProjectedSchema5() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional group subStructCol {\n" + + " optional int64 b;\n" + + " optional boolean c;\n" + + " }\n" + + " }\n" + + " optional boolean d;\n" + + "}\n"); + + testConversion("structCol", "struct>", + DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), Arrays + .asList("structCol.subStructCol", "structCol.subStructCol.b", + "structCol.subStructCol.c")).toString()); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java new file mode 100644 index 0000000..dfcd154 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFPower; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestColumnPrunerProcCtx { + // struct + private static TypeInfo col1Type; + // double + private static TypeInfo col2Type; + // struct,col2:double> + private static TypeInfo col3Type; + + @BeforeClass + public static void setup(){ + List ns = new ArrayList<>(); + ns.add("a"); + ns.add("b"); + List tis = new ArrayList<>(); + TypeInfo aType = TypeInfoFactory.booleanTypeInfo; + TypeInfo bType = TypeInfoFactory.doubleTypeInfo; + tis.add(aType); + tis.add(bType); + col1Type = TypeInfoFactory.getStructTypeInfo(ns, tis); + col2Type = TypeInfoFactory.doubleTypeInfo; + + List names = new ArrayList<>(); + names.add("col1"); + names.add("col2"); + + List typeInfos = new ArrayList<>(); + typeInfos.add(col1Type); + typeInfos.add(col2Type); + col3Type = TypeInfoFactory.getStructTypeInfo(names, typeInfos); + } + + // Test select root.col1.a from root:struct,col2:double> + @Test + public void testGetSelectNestedColPathsFromChildren1() { + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc col1 = new ExprNodeFieldDesc(col1Type, colDesc, "col1", false); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(TypeInfoFactory.booleanTypeInfo, col1, "a", false); + final List paths = Arrays.asList("_col0"); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(fieldDesc), paths); + List groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col1.a" }, groups.toArray(new String[groups.size()])); + } + + // Test select root.col1 from root:struct,col2:double> + @Test + public void testGetSelectNestedColPathsFromChildren2() { + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(col1Type, colDesc, "col1", false); + final List paths = Arrays.asList("_col0"); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(fieldDesc), paths); + List groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col1" }, groups.toArray(new String[groups.size()])); + } + + // Test select root.col2 from root:struct,col2:double> + @Test + public void testGetSelectNestedColPathsFromChildren3() { + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(col1Type, colDesc, "col2", false); + final List paths = Arrays.asList("_col0"); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(fieldDesc), paths); + List groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col2" }, groups.toArray(new String[groups.size()])); + } + + // Test select root from root:struct,col2:double> + @Test + public void testGetSelectNestedColPathsFromChildren4() { + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + final List paths = Arrays.asList("_col0"); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(colDesc), paths); + List groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root" }, groups.toArray(new String[groups.size()])); + } + + // Test select named_struct from named_struct:struct + @Test + public void testGetSelectNestedColPathsFromChildren5(){ + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeConstantDesc constADesc = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, "a"); + ExprNodeConstantDesc constBDesc = new ExprNodeConstantDesc(TypeInfoFactory.doubleTypeInfo, "b"); + List list = new ArrayList<>(); + list.add(constADesc); + list.add(constBDesc); + GenericUDF udf = mock(GenericUDF.class); + ExprNodeDesc funcDesc = new ExprNodeGenericFuncDesc(col1Type, udf, "named_struct", list); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(TypeInfoFactory.doubleTypeInfo, funcDesc, "foo", + false); + + final List paths = Arrays.asList("_col0"); + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(fieldDesc), paths); + List groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + // Return empty result since only constant Desc exists + assertEquals(0, groups.size()); + } + + // Test select abs(root.col1.b) from table test(root struct, + // col2:double>); + @Test + public void testGetSelectNestedColPathsFromChildren6(){ + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc col1 = new ExprNodeFieldDesc(col1Type, colDesc, "col1", false); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(TypeInfoFactory.doubleTypeInfo, col1, "b", + false); + final List paths = Arrays.asList("_col0"); + + GenericUDF udf = mock(GenericUDFBridge.class); + + List list = new ArrayList<>(); + list.add(fieldDesc); + ExprNodeDesc funcDesc = new ExprNodeGenericFuncDesc(TypeInfoFactory.binaryTypeInfo, udf, "abs", + list); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(funcDesc), paths); + List groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col1.b" }, groups.toArray(new String[groups.size()])); + } + + // Test select pow(root.col1.b, root.col2) from table test(root + // struct, col2:double>); + @Test + public void testGetSelectNestedColPathsFromChildren7(){ + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc col1 = new ExprNodeFieldDesc(col1Type, colDesc, "col1", false); + ExprNodeDesc fieldDesc1 = + new ExprNodeFieldDesc(TypeInfoFactory.doubleTypeInfo, col1, "b", false); + + colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc col2 = new ExprNodeFieldDesc(col2Type, colDesc, "col2", false); + final List paths = Arrays.asList("_col0"); + + GenericUDF udf = mock(GenericUDFPower.class); + + List list = new ArrayList<>(); + list.add(fieldDesc1); + list.add(col2); + ExprNodeDesc funcDesc = new ExprNodeGenericFuncDesc(TypeInfoFactory.doubleTypeInfo, udf, "pow", + list); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(funcDesc), paths); + List groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col1.b", "root.col2" }, groups.toArray(new String[groups + .size()])); + } + + private SelectOperator buildSelectOperator( + List colList, + List outputColumnNames) { + SelectOperator selectOperator = mock(SelectOperator.class); + SelectDesc selectDesc = new SelectDesc(colList, outputColumnNames); + selectDesc.setSelStarNoCompute(false); + when(selectOperator.getConf()).thenReturn(selectDesc); + return selectOperator; + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestNestedColumnFieldPruningUtils.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestNestedColumnFieldPruningUtils.java new file mode 100644 index 0000000..df7d83e --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestNestedColumnFieldPruningUtils.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import org.junit.Assert; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class TestNestedColumnFieldPruningUtils { + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + { "root[a]", new String[] { "root.a.b.c" }, "root[a]" }, + { "root[a[b[d,e]],c]", new String[] { "root.a.b.c" }, "root[a[b[d,e,c]],c]" }, + { "root[a[b[c]]]", new String[] { "root.a.b.c.d" }, "root[a[b[c]]]" }, + { null, new String[] { "a.b.c" }, "a[b[c]]" }, + { null, new String[] { "a.b", "a.c" }, "a[b,c]" }, + { "a[b]", new String[] { "a.b.c" }, "a[b]" } }); + } + + @Parameterized.Parameter(value = 0) + public String origTreeExpr; + @Parameterized.Parameter(value = 1) + public String[] paths; + @Parameterized.Parameter(value = 2) + public String resTreeExpr; + + @org.junit.Test + public void testAddNodeByPath() { + FieldNode root = null; + if (origTreeExpr != null) { + root = buildTreeByExpr(origTreeExpr); + Assert.assertEquals("The original tree is built incorrect", root.toString(), origTreeExpr); + } + for (String p : paths) { + root = NestedColumnFieldPruningUtils.addNodeByPath(root, p); + } + Assert.assertEquals(resTreeExpr, root.toString()); + } + + private static boolean isSpecialChar(char element) { + return (element == '[') || (element == ']') || (element == ','); + } + + private static FieldNode buildTreeByExpr(String expr) { + int index = 0; + LinkedList fieldStack = new LinkedList<>(); + while (index < expr.length()) { + int i = index; + if (isSpecialChar(expr.charAt(i))) { + if ((expr.charAt(index) == ',') || (expr.charAt(index) == ']')) { + FieldNode node = fieldStack.pop(); + FieldNode pre = fieldStack.peek(); + pre.addFieldNodes(node); + } + index++; + } else { + while (i < expr.length() && !isSpecialChar(expr.charAt(i))) { + i++; + } + FieldNode current = new FieldNode(expr.substring(index, i)); + fieldStack.push(current); + index = i; + } + } + return fieldStack.pop(); + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java index 0c7ac30..3978a15 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java @@ -37,9 +37,18 @@ public static final Logger LOG = LoggerFactory.getLogger(ColumnProjectionUtils.class); public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids"; + /** + * the nested column path is the string from the root to the leaf + * e.g. + * c:struct + * the column a's path is c.a and b's path is c.b + */ + public static final String READ_NESTED_COLUMN_PATH_CONF_STR = + "hive.io.file.readNestedColumn.paths"; public static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names"; private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = ""; + private static final String READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT = ""; private static final boolean READ_ALL_COLUMNS_DEFAULT = true; private static final Joiner CSV_JOINER = Joiner.on(",").skipNulls(); @@ -113,6 +122,30 @@ public static void appendReadColumns(Configuration conf, List ids) { } /** + * Appends read nested column's paths. Once a read nested column path + * is included in the list, a underlying record reader of a columnar file format + * (e.g. Parquet and ORC) can know what columns are needed. + */ + public static void appendNestedColumnPaths( + Configuration conf, + List paths) { + if (paths == null || paths.isEmpty()) { + return; + } + String pathsStr = StringUtils.join(StringUtils.COMMA_STR, + paths.toArray(new String[paths.size()])); + String old = conf.get(READ_NESTED_COLUMN_PATH_CONF_STR, null); + String newConfStr = pathsStr; + if (old != null && !old.isEmpty()) { + newConfStr = newConfStr + StringUtils.COMMA_STR + old; + } + setReadNestedColumnPathConf(conf, newConfStr); + // Set READ_ALL_COLUMNS to false + conf.setBoolean(READ_ALL_COLUMNS, false); + } + + + /** * This method appends read column information to configuration to use for PPD. It is * currently called with information from TSOP. Names come from TSOP input RowSchema, and * IDs are the indexes inside the schema (which PPD assumes correspond to indexes inside the @@ -122,13 +155,14 @@ public static void appendReadColumns(Configuration conf, List ids) { * @param names Column names. */ public static void appendReadColumns( - Configuration conf, List ids, List names) { + Configuration conf, List ids, List names, List groupPaths) { if (ids.size() != names.size()) { LOG.warn("Read column counts do not match: " + ids.size() + " ids, " + names.size() + " names"); } appendReadColumns(conf, ids); appendReadColumnNames(conf, names); + appendNestedColumnPaths(conf, groupPaths); } public static void appendReadColumns( @@ -160,6 +194,20 @@ public static void appendReadColumns( return result; } + public static List getNestedColumnPaths(Configuration conf) { + String skips = + conf.get(READ_NESTED_COLUMN_PATH_CONF_STR, READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT); + String[] list = StringUtils.split(skips); + List result = new ArrayList<>(list.length); + for (String element : list) { + // it may contain duplicates, remove duplicates + if (!result.contains(element)) { + result.add(element); + } + } + return result; + } + public static String[] getReadColumnNames(Configuration conf) { String colNames = conf.get(READ_COLUMN_NAMES_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT); if (colNames != null && !colNames.isEmpty()) { @@ -176,6 +224,16 @@ private static void setReadColumnIDConf(Configuration conf, String id) { } } + private static void setReadNestedColumnPathConf( + Configuration conf, + String nestedColumnPaths) { + if (nestedColumnPaths.trim().isEmpty()) { + conf.set(READ_NESTED_COLUMN_PATH_CONF_STR, READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT); + } else { + conf.set(READ_NESTED_COLUMN_PATH_CONF_STR, nestedColumnPaths); + } + } + private static void appendReadColumnNames(Configuration conf, List cols) { String old = conf.get(READ_COLUMN_NAMES_CONF_STR, ""); StringBuilder result = new StringBuilder(old);