diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 823d404..7ba58b9 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -321,6 +322,12 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job TableDesc currentTable = null; TableScanOperator currentTableScan = null; + boolean pushDownProjection = false; + //Buffers to hold filter pushdown information + StringBuilder readColumnsBuffer = new StringBuilder(newjob. + get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, ""));; + StringBuilder readColumnNamesBuffer = new StringBuilder(newjob. + get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "")); // for each dir, get the InputFormat, and do getSplits. for (Path dir : dirs) { PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); @@ -336,9 +343,13 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job Operator op = mrwork.getAliasToWork().get(aliases.get(0)); if ((op != null) && (op instanceof TableScanOperator)) { tableScan = (TableScanOperator) op; + //Reset buffers to store filter push down columns + readColumnsBuffer.setLength(0); + readColumnNamesBuffer.setLength(0); // push down projections. - ColumnProjectionUtils.appendReadColumns( - newjob, tableScan.getNeededColumnIDs(), tableScan.getNeededColumns()); + ColumnProjectionUtils.appendReadColumns(readColumnsBuffer, readColumnNamesBuffer, + tableScan.getNeededColumnIDs(), tableScan.getNeededColumns()); + pushDownProjection = true; // push down filters pushFilters(newjob, tableScan); } @@ -366,6 +377,13 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job currentTable = table; currentInputFormatClass = inputFormatClass; } + if (pushDownProjection) { + newjob.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + newjob.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColumnsBuffer.toString()); + newjob.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColumnNamesBuffer.toString()); + LOG.info(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR + "=" + readColumnsBuffer.toString()); + LOG.info(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR + "=" + readColumnNamesBuffer.toString()); + } if (dirs.length != 0) { LOG.info("Generating splits"); diff --git serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java index cfd98f2..e403ad9 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java @@ -103,6 +103,36 @@ public static void appendReadColumns( appendReadColumnNames(conf, names); } + public static void appendReadColumns( + StringBuilder readColumnsBuffer, StringBuilder readColumnNamesBuffer, List ids, + List names) { + appendReadColumns(readColumnsBuffer, ids); + appendReadColumnNames(readColumnNamesBuffer, names); + } + + public static void appendReadColumns(StringBuilder readColumnsBuffer, List ids) { + String id = toReadColumnIDString(ids); + String newConfStr = id; + if (readColumnsBuffer.length() > 0) { + readColumnsBuffer.append(StringUtils.COMMA_STR).append(newConfStr); + } + if (readColumnsBuffer.length() == 0) { + readColumnsBuffer.append(READ_COLUMN_IDS_CONF_STR_DEFAULT); + } + } + + private static void appendReadColumnNames(StringBuilder readColumnNamesBuffer, List cols) { + boolean first = readColumnNamesBuffer.length() > 0; + for(String col: cols) { + if (first) { + first = false; + } else { + readColumnNamesBuffer.append(','); + } + readColumnNamesBuffer.append(col); + } + } + /** * Returns an array of column ids(start from zero) which is set in the given * parameter conf.