Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1103989)
+++ conf/hive-default.xml (working copy)
@@ -1043,6 +1043,18 @@
+ hive.index.compact.query.max.size
+ 10737418240
+ The maximum number of bytes that a query using the compact index can read. Negative value is equivalent to infinity.
+
+
+
+ hive.index.compact.query.max.entries
+ 10000000
+ The maximum number of index entries to read during a query that uses the compact index. Negative value is equivalent to infinity.
+
+
+
hive.exim.uri.scheme.whitelist
hdfs,pfile
A comma separated list of acceptable URI schemes for import and export.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1103989)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -372,6 +372,8 @@
// Indexes
HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G
HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity
+ HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES("hive.index.compact.query.max.entries", (long) 10000000), // 10M
+ HIVE_INDEX_COMPACT_QUERY_MAX_SIZE("hive.index.compact.query.max.size", (long) 10 * 1024 * 1024 * 1024), // 10G
// Statistics
HIVESTATSAUTOGATHER("hive.stats.autogather", true),
Index: ql/src/test/results/clientnegative/index_compact_size_limit.q.out
===================================================================
--- ql/src/test/results/clientnegative/index_compact_size_limit.q.out (revision 0)
+++ ql/src/test/results/clientnegative/index_compact_size_limit.q.out (revision 0)
@@ -0,0 +1,35 @@
+PREHOOK: query: drop index src_index on src
+PREHOOK: type: DROPINDEX
+POSTHOOK: query: drop index src_index on src
+POSTHOOK: type: DROPINDEX
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+POSTHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+POSTHOOK: query: ALTER INDEX src_index ON src REBUILD
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@default__src_src_index__
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Output: /tmp/index_result
+POSTHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@default__src_src_index__
+POSTHOOK: Output: /tmp/index_result
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: SELECT key, value FROM src WHERE key=100 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/wgaluba/hive_2011-05-09_20-23-26_023_6902661313242990836/-mr-10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
Index: ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
===================================================================
--- ql/src/test/results/clientnegative/index_compact_entry_limit.q.out (revision 0)
+++ ql/src/test/results/clientnegative/index_compact_entry_limit.q.out (revision 0)
@@ -0,0 +1,35 @@
+PREHOOK: query: drop index src_index on src
+PREHOOK: type: DROPINDEX
+POSTHOOK: query: drop index src_index on src
+POSTHOOK: type: DROPINDEX
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+POSTHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+POSTHOOK: query: ALTER INDEX src_index ON src REBUILD
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@default__src_src_index__
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Output: /tmp/index_result
+POSTHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@default__src_src_index__
+POSTHOOK: Output: /tmp/index_result
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: SELECT key, value FROM src WHERE key=100 ORDER BY key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/tmp/wgaluba/hive_2011-05-09_20-23-26_023_6902661313242990836/-mr-10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
Index: ql/src/test/queries/clientnegative/index_compact_entry_limit.q
===================================================================
--- ql/src/test/queries/clientnegative/index_compact_entry_limit.q (revision 0)
+++ ql/src/test/queries/clientnegative/index_compact_entry_limit.q (revision 0)
@@ -0,0 +1,11 @@
+drop index src_index on src;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key<1000;
+SET hive.index.compact.file=/tmp/index_result;
+SET hive.input.format=org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat;
+SET hive.index.compact.query.max.entries=5;
+SELECT key, value FROM src WHERE key=100 ORDER BY key;
Index: ql/src/test/queries/clientnegative/index_compact_size_limit.q
===================================================================
--- ql/src/test/queries/clientnegative/index_compact_size_limit.q (revision 0)
+++ ql/src/test/queries/clientnegative/index_compact_size_limit.q (revision 0)
@@ -0,0 +1,12 @@
+drop index src_index on src;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key<1000;
+SET hive.index.compact.file=/tmp/index_result;
+SET hive.input.format=org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat;
+SET hive.index.compact.query.max.size=1024;
+SELECT key, value FROM src WHERE key=100 ORDER BY key;
+
Index: ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java (revision 1103989)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java (working copy)
@@ -88,7 +88,7 @@
bytesRef[0] = new BytesRefWritable();
bytesRef[1] = new BytesRefWritable();
- ignoreHdfsLoc = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INDEX_IGNORE_HDFS_LOC);
+ ignoreHdfsLoc = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INDEX_IGNORE_HDFS_LOC);
if (indexFile != null) {
Path indexFilePath = new Path(indexFile);
@@ -104,12 +104,22 @@
paths.add(indexFilePath);
}
+ long maxEntriesToLoad = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES);
+ if (maxEntriesToLoad < 0) {
+ maxEntriesToLoad=Long.MAX_VALUE;
+ }
+
+ long lineCounter = 0;
for (Path indexFinalPath : paths) {
FSDataInputStream ifile = fs.open(indexFinalPath);
LineReader lr = new LineReader(ifile, conf);
try {
Text line = new Text();
while (lr.readLine(line) > 0) {
+ if (++lineCounter > maxEntriesToLoad) {
+ throw new HiveException("Number of compact index entries loaded during the query exceeded the maximum of " + maxEntriesToLoad
+ + " set in " + HiveConf.ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES.varname);
+ }
add(line);
}
}
@@ -140,7 +150,7 @@
+ line.toString());
}
String bucketFileName = new String(bytes, 0, firstEnd);
-
+
if (ignoreHdfsLoc) {
Path tmpPath = new Path(bucketFileName);
bucketFileName = tmpPath.toUri().getPath();
Index: ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java (revision 1103989)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java (working copy)
@@ -26,6 +26,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -129,6 +131,13 @@
ArrayList newSplits = new ArrayList(
numSplits);
+
+ long maxInputSize = HiveConf.getLongVar(job, ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE);
+ if (maxInputSize < 0) {
+ maxInputSize=Long.MAX_VALUE;
+ }
+
+ long sumSplitLengths = 0;
for (HiveInputSplit split : splits) {
l4j.info("split start : " + split.getStart());
l4j.info("split end : " + (split.getStart() + split.getLength()));
@@ -140,13 +149,19 @@
if (split.inputFormatClassName().contains("RCFile")
|| split.inputFormatClassName().contains("SequenceFile")) {
if (split.getStart() > SequenceFile.SYNC_INTERVAL) {
- newSplit = new HiveInputSplit(new FileSplit(split.getPath(), split
- .getStart()
- - SequenceFile.SYNC_INTERVAL, split.getLength()
- + SequenceFile.SYNC_INTERVAL, split.getLocations()), split
- .inputFormatClassName());
+ newSplit = new HiveInputSplit(new FileSplit(split.getPath(),
+ split.getStart() - SequenceFile.SYNC_INTERVAL,
+ split.getLength() + SequenceFile.SYNC_INTERVAL,
+ split.getLocations()),
+ split.inputFormatClassName());
}
}
+ sumSplitLengths += newSplit.getLength();
+ if (sumSplitLengths > maxInputSize) {
+ throw new IOException(
+ "Size of data to read during a compact-index-based query exceeded the maximum of "
+ + maxInputSize + " set in " + ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE.varname);
+ }
newSplits.add(newSplit);
}
} catch (HiveException e) {
@@ -156,7 +171,7 @@
}
InputSplit retA[] = newSplits.toArray((new FileSplit[newSplits.size()]));
l4j.info("Number of input splits: " + splits.length + " new input splits: "
- + retA.length);
+ + retA.length + ", sum of split lengths: " + sumSplitLengths);
return retA;
}
}