diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 48f02ea..2f22880 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -41,8 +41,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; @@ -56,6 +59,8 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; + +import com.google.common.primitives.Booleans; /** * A MapReduce/Hive input format for ORC files. */ @@ -165,7 +170,8 @@ static void includeColumnRecursive(List types, public static SearchArgument createSarg(List types, Configuration conf) { String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); if (serializedPushdown == null - || conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) == null) { + || (conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) == null + && conf.get(serdeConstants.LIST_COLUMNS) == null)) { LOG.info("No ORC pushdown predicate"); return null; } @@ -531,9 +537,44 @@ void createSplit(long offset, long length) throws IOException { public void run() { try { Reader orcReader = OrcFile.createReader(fs, file.getPath()); + Configuration conf = context.conf; + List types = orcReader.getTypes(); + SearchArgument sarg = createSarg(types, conf); + List sargLeaves = null; + int[] filterColumns = null; + String[] columnNames = conf.get(serdeConstants.LIST_COLUMNS).split(","); + if (sarg != null) { + sargLeaves = sarg.getLeaves(); + filterColumns = new int[sargLeaves.size()]; + for (int i = 0; i < filterColumns.length; ++i) { + String colName = sargLeaves.get(i).getColumnName(); + filterColumns[i] = RecordReaderImpl.findColumns(columnNames, colName); + } + } + + Metadata metadata = orcReader.getMetadata(); + List stripeStats = metadata.getStripeStatisticsList(); + int idx = -1; long currentOffset = -1; long currentLength = 0; for(StripeInformation stripe: orcReader.getStripes()) { + idx++; + + // eliminate stripes that doesn't satisfy the predicate condition + if (!isStripeSatisfyPredicate(stripeStats.get(idx), sarg, filterColumns)) { + // if a stripe doesn't satisfy predicate condition then skip it + if (LOG.isDebugEnabled()) { + LOG.debug("Eliminating ORC stripe-" + idx + " of file '" + file.getPath() + + "' as it did not satisfy predicate condition."); + } + // create split for the previous unfinished stripe + if (currentOffset != -1) { + createSplit(currentOffset, currentLength); + currentOffset = -1; + } + continue; + } + // if we are working on a stripe, over the min stripe size, and // crossed a block boundary, cut the input split here. if (currentOffset != -1 && currentLength > context.minSize && @@ -562,6 +603,58 @@ public void run() { } } } + + private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, + SearchArgument sarg, int[] filterColumns) { + if (sarg != null && filterColumns != null) { + List predLeaves = sarg.getLeaves(); + TruthValue[] truthValues = new TruthValue[predLeaves.size()]; + boolean[] results = new boolean[filterColumns.length]; + for (int i = 0; i < filterColumns.length; i++) { + // column statistics at index 0 contains only the number of rows + ColumnStatistics stats = stripeStatistics.getColumnStatistics()[filterColumns[i] + 1]; + Object minValue = getMin(stats); + Object maxValue = getMax(stats); + for (int pred = 0; pred < truthValues.length; pred++) { + truthValues[pred] = RecordReaderImpl.evaluatePredicateRange(predLeaves.get(pred), + minValue, maxValue); + } + results[i] = sarg.evaluate(truthValues).isNotNeeded(); + } + // if the boolean array contains atleast one true value then we should return true + return Booleans.asList(results).contains(true); + } + return true; + } + + private Object getMax(ColumnStatistics index) { + if (index instanceof IntegerColumnStatistics) { + return ((IntegerColumnStatistics) index).getMaximum(); + } else if (index instanceof DoubleColumnStatistics) { + return ((DoubleColumnStatistics) index).getMaximum(); + } else if (index instanceof StringColumnStatistics) { + return ((StringColumnStatistics) index).getMaximum(); + } else if (index instanceof DateColumnStatistics) { + return ((DateColumnStatistics) index).getMaximum(); + } else { + return null; + } + } + + private Object getMin(ColumnStatistics index) { + if (index instanceof IntegerColumnStatistics) { + return ((IntegerColumnStatistics) index).getMinimum(); + } else if (index instanceof DoubleColumnStatistics) { + return ((DoubleColumnStatistics) index).getMinimum(); + } else if (index instanceof StringColumnStatistics) { + return ((StringColumnStatistics) index).getMinimum(); + } else if (index instanceof DateColumnStatistics) { + return ((DateColumnStatistics) index).getMinimum(); + } else { + return null; + } + } + } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 71484a3..bd6b2be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -132,7 +132,7 @@ advanceToNextRow(0L); } - private static int findColumns(String[] columnNames, + static int findColumns(String[] columnNames, String columnName) { for(int i=0; i < columnNames.length; ++i) { if (columnName.equals(columnNames[i])) { @@ -2025,6 +2025,11 @@ static TruthValue evaluatePredicate(OrcProto.ColumnStatistics index, } } Object maxValue = getMax(index); + return evaluatePredicateRange(predicate, minValue, maxValue); + } + + static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object minValue, + Object maxValue) { Location loc; switch (predicate.getOperator()) { case NULL_SAFE_EQUALS: