diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1cf24b4..e0bf06b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -3609,14 +3609,18 @@ public static void unsetSchemaEvolution(Configuration conf) { conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES); } - public static void addTableSchemaToConf(Configuration conf, - TableScanOperator tableScanOp) { - String schemaEvolutionColumns = tableScanOp.getSchemaEvolutionColumns(); + public static void addTableSchemaToConf(Configuration conf, TableScanOperator tableScanOp) { + addTableSchemaToConf(conf, tableScanOp.getSchemaEvolutionColumns(), tableScanOp.getSchemaEvolutionColumnsTypes()); + } + + public static void addTableSchemaToConf(Configuration conf, String schemaEvolutionColumns, + String schemaEvolutionColumnsTypes) { if (schemaEvolutionColumns != null) { - conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, tableScanOp.getSchemaEvolutionColumns()); - conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, tableScanOp.getSchemaEvolutionColumnsTypes()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, schemaEvolutionColumns); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, schemaEvolutionColumnsTypes); } else { - LOG.info("schema.evolution.columns and schema.evolution.columns.types not available"); + LOG.info(IOConstants.SCHEMA_EVOLUTION_COLUMNS + " and " + IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES + + " not available"); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 3ee8fdc..608eb93 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -31,7 +32,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.Map.Entry; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; @@ -53,11 +58,9 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; -import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorDeserializeType; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -584,6 +587,26 @@ protected static PartitionDesc getPartitionDescFromPath( } public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { + ExprNodeGenericFuncDesc filterExpr = null; + Serializable filterObj = null; + String serializedFilterObj = null; + String serializedFilterExpr = null; + + TableScanDesc scanDesc = tableScan.getConf(); + if (scanDesc != null) { + filterExpr = scanDesc.getFilterExpr(); + filterObj = scanDesc.getFilterObject(); + serializedFilterObj = scanDesc.getSerializedFilterObject(); + serializedFilterExpr = scanDesc.getSerializedFilterExpr(); + } + + pushFilters(jobConf, filterExpr, filterObj, serializedFilterObj, serializedFilterExpr, tableScan.getSchema(), + tableScan.getSchemaEvolutionColumns(), tableScan.getSchemaEvolutionColumnsTypes()); + } + + public static void pushFilters(JobConf jobConf, ExprNodeGenericFuncDesc filterExpr, Serializable filterObject, + String serializedFilterObj, String serializedFilterExpr, RowSchema rowSchema, String schemaEvolutionColumns, + String schemaEvolutionColumnsTypes) { // ensure filters are not set from previous pushFilters jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR); @@ -591,27 +614,19 @@ public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { Utilities.unsetSchemaEvolution(jobConf); - TableScanDesc scanDesc = tableScan.getConf(); - if (scanDesc == null) { - return; - } - - Utilities.addTableSchemaToConf(jobConf, tableScan); + Utilities.addTableSchemaToConf(jobConf, schemaEvolutionColumns, schemaEvolutionColumnsTypes); // construct column name list and types for reference by filter push down - Utilities.setColumnNameList(jobConf, tableScan); - Utilities.setColumnTypeList(jobConf, tableScan); + Utilities.setColumnNameList(jobConf, rowSchema); + Utilities.setColumnTypeList(jobConf, rowSchema); + // push down filters - ExprNodeGenericFuncDesc filterExpr = (ExprNodeGenericFuncDesc)scanDesc.getFilterExpr(); if (filterExpr == null) { return; } - String serializedFilterObj = scanDesc.getSerializedFilterObject(); - String serializedFilterExpr = scanDesc.getSerializedFilterExpr(); boolean hasObj = serializedFilterObj != null, hasExpr = serializedFilterExpr != null; if (!hasObj) { - Serializable filterObject = scanDesc.getFilterObject(); if (filterObject != null) { serializedFilterObj = SerializationUtilities.serializeObject(filterObject); } @@ -650,7 +665,80 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass return; } - ArrayList aliases = new ArrayList(); + Set aliases = getAliasesForPath(splitPath, nonNative, splitPathWithNoSchema); + + // Collect the needed columns from all the aliases and create ORed filter + // expression for the table. + boolean allColumnsNeeded = false; + boolean noFilters = false; + Set neededColumnIDs = new HashSet(); + Set neededColumnNames = new HashSet(); + Set neededColumnPaths = new HashSet(); + List filterExprs = new ArrayList(); + Serializable filterObject = null; + String serializedFilterObj = null; + String serializedFilterExpr = null; + RowSchema rowSchema = null; + String schemaEvolutionColumns = null; + String schemaEvolutionColumnsTypes = null; + + // Gather all filters into `filterExprs`. + for (String alias : aliases) { + final Operator op = mrwork.getAliasToWork().get(alias); + if (op != null && op instanceof TableScanOperator) { + final TableScanOperator ts = (TableScanOperator) op; + + if (ts.getNeededColumnIDs() == null) { + allColumnsNeeded = true; + } else { + neededColumnIDs.addAll(ts.getNeededColumnIDs()); + if (ts.getNeededColumns() != null) { + neededColumnNames.addAll(ts.getNeededColumns()); + } + if (ts.getNeededNestedColumnPaths() != null) { + neededColumnPaths.addAll(ts.getNeededNestedColumnPaths()); + } + } + + TableScanDesc scanDesc = ts.getConf(); + if (scanDesc != null) { + filterObject = scanDesc.getFilterObject(); + serializedFilterObj = scanDesc.getSerializedFilterObject(); + serializedFilterExpr = scanDesc.getSerializedFilterExpr(); + } + + ExprNodeGenericFuncDesc filterExpr = scanDesc == null ? null : scanDesc.getFilterExpr(); + if (filterExpr == null) { // No filter if any TS has no filter expression + noFilters = true; + } else { + filterExprs.add(filterExpr); + } + + rowSchema = ts.getSchema(); + schemaEvolutionColumns = ts.getSchemaEvolutionColumns(); + schemaEvolutionColumnsTypes = ts.getSchemaEvolutionColumnsTypes(); + AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); + AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); + } + } + + // OR all filters together + ExprNodeGenericFuncDesc tableFilterExpr = buildTableFilterExpr(noFilters, filterExprs); + + // push down projections + if (allColumnsNeeded) { + ColumnProjectionUtils.setReadAllColumns(jobConf); + } else { + ColumnProjectionUtils.appendReadColumns(jobConf, new ArrayList(neededColumnIDs), + new ArrayList(neededColumnNames), new ArrayList(neededColumnPaths)); + } + + pushFilters(jobConf, tableFilterExpr, filterObject, serializedFilterObj, serializedFilterExpr, rowSchema, + schemaEvolutionColumns, schemaEvolutionColumnsTypes); + } + + private Set getAliasesForPath(Path splitPath, boolean nonNative, Path splitPathWithNoSchema) { + Set aliases = new HashSet(); Iterator>> iterator = this.mrwork .getPathToAliases().entrySet().iterator(); @@ -687,27 +775,28 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass } } if (match) { - ArrayList list = entry.getValue(); - for (String val : list) { - aliases.add(val); - } + aliases.addAll(entry.getValue()); } } + return aliases; + } - for (String alias : aliases) { - Operator op = this.mrwork.getAliasToWork().get( - alias); - if (op instanceof TableScanOperator) { - TableScanOperator ts = (TableScanOperator) op; - // push down projections. - ColumnProjectionUtils.appendReadColumns( - jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); - // push down filters - pushFilters(jobConf, ts); - - AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable()); - AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties()); + private ExprNodeGenericFuncDesc buildTableFilterExpr(boolean noFilters, List filterExprs) { + ExprNodeGenericFuncDesc tableFilterExpr = null; + if (!noFilters) { + try { + for (ExprNodeGenericFuncDesc filterExpr : filterExprs) { + if (tableFilterExpr == null ) { + tableFilterExpr = filterExpr; + } else { + tableFilterExpr = ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPOr(), + Arrays.asList(tableFilterExpr, filterExpr)); + } + } + } catch (UDFArgumentException ex) { + throw new RuntimeException("Creating ExprNodeGenericFuncDesc failed due to: " + ex); } } + return tableFilterExpr; } } diff --git ql/src/test/queries/clientpositive/orc_ppd_same_table_multiple_aliases.q ql/src/test/queries/clientpositive/orc_ppd_same_table_multiple_aliases.q new file mode 100644 index 0000000..a4f773c --- /dev/null +++ ql/src/test/queries/clientpositive/orc_ppd_same_table_multiple_aliases.q @@ -0,0 +1,19 @@ +-- SORT_QUERY_RESULTS; + +set hive.optimize.index.filter=true; + +create table test_table(number int) stored as ORC; + +-- Two insertions will create two files, with one stripe each +insert into table test_table VALUES (1); +insert into table test_table VALUES (2); + +-- This should return 2 records +select * from test_table; + +-- These should each return 1 record +select * from test_table where number = 1; +select * from test_table where number = 2; + +-- This should return 2 records +select * from test_table where number = 1 union all select * from test_table where number = 2; diff --git ql/src/test/results/clientpositive/orc_ppd_same_table_multiple_aliases.q.out ql/src/test/results/clientpositive/orc_ppd_same_table_multiple_aliases.q.out new file mode 100644 index 0000000..b6079f4 --- /dev/null +++ ql/src/test/results/clientpositive/orc_ppd_same_table_multiple_aliases.q.out @@ -0,0 +1,60 @@ +PREHOOK: query: create table test_table(number int) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_table +POSTHOOK: query: create table test_table(number int) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_table +PREHOOK: query: insert into table test_table VALUES (1) +PREHOOK: type: QUERY +PREHOOK: Output: default@test_table +POSTHOOK: query: insert into table test_table VALUES (1) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@test_table +POSTHOOK: Lineage: test_table.number EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: insert into table test_table VALUES (2) +PREHOOK: type: QUERY +PREHOOK: Output: default@test_table +POSTHOOK: query: insert into table test_table VALUES (2) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@test_table +POSTHOOK: Lineage: test_table.number EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: select * from test_table +PREHOOK: type: QUERY +PREHOOK: Input: default@test_table +#### A masked pattern was here #### +POSTHOOK: query: select * from test_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_table +#### A masked pattern was here #### +1 +2 +PREHOOK: query: select * from test_table where number = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_table +#### A masked pattern was here #### +POSTHOOK: query: select * from test_table where number = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_table +#### A masked pattern was here #### +1 +PREHOOK: query: select * from test_table where number = 2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_table +#### A masked pattern was here #### +POSTHOOK: query: select * from test_table where number = 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_table +#### A masked pattern was here #### +2 +PREHOOK: query: select * from test_table where number = 1 union all select * from test_table where number = 2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_table +#### A masked pattern was here #### +POSTHOOK: query: select * from test_table where number = 1 union all select * from test_table where number = 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_table +#### A masked pattern was here #### +1 +2