From e15c08c21e58420764e8e08991664b4ad32f153c Mon Sep 17 00:00:00 2001 From: Syed Albiz Date: Fri, 1 Jul 2011 05:55:05 -0700 Subject: [PATCH 1/1] Implement index filtering on multiple tables properly diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java index 5e721cf..2aef00c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java @@ -481,6 +481,10 @@ public class ExecDriver extends Task implements Serializable, Hadoop conf.set("hive.index.compact.file", work.getIndexIntermediateFile()); conf.set("hive.index.blockfilter.file", work.getIndexIntermediateFile()); } + + if (work.getIndexedPaths() != null) { + conf.set("hive.index.blockfilter.tables", work.getIndexedPaths()); + } } public boolean mapStarted() { diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java index 617723e..668e63e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexQueryContext.java @@ -38,6 +38,7 @@ public class HiveIndexQueryContext { // merging the index query tasks private String indexInputFormat; // input format to set on the TableScanOperator to activate indexing private String indexIntermediateFile; // name of intermediate file written by the index query for the + private List filteredTableNames;; // List of table names that have an index filter applied // TableScanOperator to use private List> queryTasks; // list of tasks that will execute the index query and write // results to a temporary file @@ -65,13 +66,19 @@ public class HiveIndexQueryContext { public String getIndexInputFormat() { return indexInputFormat; } - public void setIndexInputFormat(String indexInputFormat) { + public void setIndexInputFormat(String indexInputFormat, List origTableNames) { this.indexInputFormat = indexInputFormat; + this.filteredTableNames = origTableNames; } public String getIndexIntermediateFile() { return indexIntermediateFile; } + + public List getFilteredTables() { + return this.filteredTableNames; + } + public void setIndexIntermediateFile(String indexIntermediateFile) { this.indexIntermediateFile = indexIntermediateFile; } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java index f1ee95d..6d635ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.index; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.Iterator; import java.util.Set; @@ -95,6 +96,26 @@ public class HiveIndexedInputFormat extends HiveInputFormat { public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { String indexFileStr = job.get(indexFile); l4j.info("index_file is " + indexFileStr); + String filteredPathString = job.get("hive.index.blockfilter.tables"); + Path[] dirs = FileInputFormat.getInputPaths(job); + List unfilteredPaths = new ArrayList(); + boolean filteredInput = true; + if (filteredPathString != null) { + String[] filteredPaths = filteredPathString.split(","); + if (dirs != null) { + for (Path p : dirs) { + for (String fp : filteredPaths) { + if (!p.toUri().toString().equals(fp)) { + unfilteredPaths.add(p); + filteredInput = false; + } + } + } + } + } + + if (!filteredInput) + return super.getSplits(job, numSplits); HiveIndexResult hiveIndexResult = null; if (indexFileStr != null) { @@ -121,7 +142,6 @@ public class HiveIndexedInputFormat extends HiveInputFormat { } newInputPaths.append(path); } - FileInputFormat.setInputPaths(job, newInputPaths.toString()); } else { return super.getSplits(job, numSplits); diff --git ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java index 8e234bf..d505d72 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java @@ -95,10 +95,12 @@ public class BitmapIndexHandler extends TableBasedIndexHandler { qlCommand.append("(SELECT `_bucketname` AS bucketname , `_offset` AS offset FROM "); List iqs = new ArrayList(indexes.size()); + List origTableNames = new ArrayList(indexes.size()); int i = 0; for (Index index : indexes) { ExprNodeDesc indexPredicate = indexPredicates.get(index); if (indexPredicate != null) { + origTableNames.add(index.getOrigTableName()); iqs.add(new BitmapInnerQuery( index.getIndexTableName(), indexPredicate, @@ -119,7 +121,10 @@ public class BitmapIndexHandler extends TableBasedIndexHandler { driver.compile(qlCommand.toString(), false); // setup TableScanOperator to change input format for original query - queryContext.setIndexInputFormat(HiveIndexedInputFormat.class.getName()); + queryContext.setIndexInputFormat( + HiveIndexedInputFormat.class.getName(), + origTableNames + ); queryContext.setIndexIntermediateFile(tmpFile); queryContext.addAdditionalSemanticInputs(driver.getPlan().getInputs()); diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 7c91946..60a52ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -180,7 +180,12 @@ public class CompactIndexHandler extends TableBasedIndexHandler { driver.compile(qlCommand.toString(), false); // setup TableScanOperator to change input format for original query - queryContext.setIndexInputFormat(HiveCompactIndexInputFormat.class.getName()); + List origTableNames = new ArrayList(); + origTableNames.add(index.getOrigTableName()); + queryContext.setIndexInputFormat( + HiveCompactIndexInputFormat.class.getName(), + origTableNames + ); queryContext.setIndexIntermediateFile(tmpFile); queryContext.addAdditionalSemanticInputs(driver.getPlan().getInputs()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java index dbc489f..731e62b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java @@ -167,6 +167,17 @@ public class IndexWhereProcessor implements NodeProcessor { // prepare the map reduce job to use indexing MapredWork work = currentTask.getWork(); + List filteredTableNames = queryContext.getFilteredTables(); + StringBuilder sb = new StringBuilder(); + for (String tableName : filteredTableNames) { + for (Partition p : queryPartitions) { + if (p.getTable().toString() == tableName) { + sb.append(p.getTable().getPath().toUri().toString()); + sb.append(","); + } + } + } + work.setIndexedPaths(sb.toString()); work.setInputformat(queryContext.getIndexInputFormat()); work.setIndexIntermediateFile(queryContext.getIndexIntermediateFile()); @@ -259,6 +270,9 @@ public class IndexWhereProcessor implements NodeProcessor { private List getIndexTables(Hive hive, Partition part) throws HiveException { List
indexTables = new ArrayList
(); Table partitionedTable = part.getTable(); + if (indexes == null || indexes.get(partitionedTable) == null) { + return indexTables; + } for (Index index : indexes.get(partitionedTable)) { indexTables.add(hive.getTable(index.getIndexTableName())); } @@ -276,6 +290,10 @@ public class IndexWhereProcessor implements NodeProcessor { return true; // empty specs come from non-partitioned tables } + if (indexTables == null || indexTables.size() == 0) { + return false; + } + for (Table indexTable : indexTables) { // get partitions that match the spec List matchingPartitions = hive.getPartitions(indexTable, partSpec); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java index 66c8523..2b2c39d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java @@ -53,6 +53,8 @@ public class MapredWork implements Serializable { private HashMap nameToSplitSample; + private String filteredPaths; + // map<->reduce interface // schema of the map-reduce 'key' object - this is homogeneous private TableDesc keyDesc; @@ -385,10 +387,18 @@ public class MapredWork implements Serializable { return indexIntermediateFile; } + public String getIndexedPaths() { + return this.filteredPaths; + } + public void setIndexIntermediateFile(String fileName) { this.indexIntermediateFile = fileName; } + public void setIndexedPaths(String filteredPaths) { + this.filteredPaths = filteredPaths; + } + public void setGatheringStats(boolean gatherStats) { this.gatheringStats = gatherStats; } -- 1.7.4.4